项目紧急上线了一套实时特征平台(Feature Store)的核心计算服务,最初几天流量平稳,一切看起来都很完美。然而,随着运营活动引流,某个核心用户群体的活跃度瞬间飙升,监控系统开始疯狂告警:计算集群的CPU利用率触顶,底层数据源(一个MySQL集群)的连接池被打满,QPS响应时间从50ms飙升到3000ms以上。
问题出乎意料地严重。排查日志发现,对于同一个用户的同一个特征(例如,“用户近1小时内购买总金额”),在缓存失效的瞬间,网关会将大量并发请求路由到不同的计算服务实例上。这导致了多个实例同时从数据库拉取原始数据,执行完全相同的聚合计算,然后几乎同时回写缓存。这种计算资源的极大浪费和对数据源的重复冲击,就是我们面临的“计算风暴”。
最初版的特征计算逻辑大致如下,它存在着典型的并发计算竞争问题:
// V1 - Naive Implementation with Race Condition
@Service
public class FeatureComputationServiceV1 {
private static final Logger log = LoggerFactory.getLogger(FeatureComputationServiceV1.class);
@Autowired
private FeatureCacheClient featureCacheClient;
@Autowired
private UserBehaviorRepository userBehaviorRepository;
public FeatureValue computeUserHourlyPurchase(String userId) {
String featureKey = "feature:user_hourly_purchase:" + userId;
// 1. 尝试从缓存获取
Optional<FeatureValue> cachedValue = featureCacheClient.get(featureKey);
if (cachedValue.isPresent()) {
return cachedValue.get();
}
// 2. 缓存未命中,执行昂贵的计算
// 这是问题的核心:多个线程会同时执行到这里
log.warn("Cache miss for feature key: {}. Triggering expensive computation.", featureKey);
List<PurchaseRecord> records = userBehaviorRepository.findRecentPurchases(userId, Duration.ofHours(1));
BigDecimal totalAmount = records.stream()
.map(PurchaseRecord::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
FeatureValue result = new FeatureValue(userId, "user_hourly_purchase", totalAmount, System.currentTimeMillis());
// 3. 将结果回写缓存
featureCacheClient.set(featureKey, result, Duration.ofMinutes(5));
return result;
}
}
这段代码在低并发下工作正常,但在高并发场景下,log.warn 这行日志会在同一毫秒内被不同实例上的多个线程打印出来,每一个打印都代表着一次对数据库的昂贵查询和重复计算。
一次 Code Review 引发的重构
在问题复盘的 Code Review 会议上,团队迅速定位了问题根源。有人提出用本地锁 synchronized 或 ReentrantLock 解决,但这立刻被否决。我们的计算服务是无状态且水平扩展的,部署在多个 Pod 中,本地锁只能保证单个 JVM 内部的线程安全,无法解决跨实例的竞争。
这时,引入分布式锁的方案被提上日程。技术选型讨论主要围绕 ZooKeeper 和 Redis。
- ZooKeeper: 强一致性,通过创建临时有序节点实现锁。优点是可靠性极高,具备 watch 机制,能有效避免死锁。缺点是性能开销相对较大,其复杂的CP模型对于我们这种高吞吐、低延迟的锁场景来说,有点“杀鸡用牛刀”。
- Redis: 基于其单线程模型和原子操作(如
SETNX)实现。优点是性能极高,部署轻量,非常适合用作高并发场景下的锁服务。缺点是实现一个健壮的锁需要考虑很多细节,比如锁超时、原子性、可重入性等。
考虑到我们的技术栈中已经重度使用了 Redis 作为缓存,且场景追求的是极致性能而非绝对的强一致性(一次计算失败或短暂的锁获取失败可以接受,由下一个请求重试即可),我们最终选择了基于 Redis 实现分布式锁。
我们的目标是:在计算前,先获取一个与“用户ID+特征名”绑定的锁。只有成功获取锁的那个线程,才能执行计算逻辑。其他线程要么快速失败,要么短暂等待后重试。
构建一个生产级的 Redis 分布式锁
直接使用 SETNX 是不够的。一个生产级的分布式锁必须解决以下几个核心问题:
- 原子性: 获取锁(
SETNX)和设置过期时间(EXPIRE``)必须是原子操作,否则SETNX` 成功后服务崩溃,锁将永不释放。 - 防误删: 锁的 value 应该是一个唯一的标识(如 UUID + 线程ID),解锁时必须验证这个标识,防止线程A误删了线程B的锁。
- 防死锁: 必须设置合理的过期时间,即使服务宕机,锁也能在一定时间后自动释放。
- 高可用: Redis 本身需要是高可用的(如 Sentinel 或 Cluster 模式)。
基于这些原则,我们设计了DistributedLock接口和其RedisDistributedLock实现。
// Lock Interface
public interface DistributedLock {
/**
* 尝试获取锁
* @param lockKey 锁的唯一标识
* @param requestId 请求ID,用于防止误解锁
* @param expireTimeMillis 锁的过期时间(毫秒)
* @return true 如果成功获取锁,否则 false
*/
boolean tryLock(String lockKey, String requestId, long expireTimeMillis);
/**
* 释放锁
* @param lockKey 锁的唯一标识
* @param requestId 请求ID,必须与加锁时一致
* @return true 如果成功释放锁,否则 false
*/
boolean unlock(String lockKey, String requestId);
}
使用 Lua 脚本是保证原子性的最佳实践,因为 Redis 会保证单个 Lua 脚本的执行是原子的。
// RedisDistributedLock Implementation
@Component
public class RedisDistributedLock implements DistributedLock {
private static final Logger log = LoggerFactory.getLogger(RedisDistributedLock.class);
private final StringRedisTemplate stringRedisTemplate;
private final DefaultRedisScript<Long> unlockScript;
// LUA script for atomic unlock operation.
// It checks if the key exists and its value matches the provided requestId.
private static final String UNLOCK_LUA_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
public RedisDistributedLock(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
this.unlockScript = new DefaultRedisScript<>(UNLOCK_LUA_SCRIPT, Long.class);
}
@Override
public boolean tryLock(String lockKey, String requestId, long expireTimeMillis) {
try {
// Redis SET command with NX and PX options provides atomicity for lock acquisition and expiration setting.
// SET key value [NX | XX] [GET] [EX seconds | PX milliseconds | EXAT unix-time-seconds | PXAT unix-time-milliseconds | KEEPTTL]
// NX -- Only set the key if it does not already exist.
// PX -- Set the specified expire time, in milliseconds.
Boolean success = stringRedisTemplate.execute(
(RedisConnection connection) -> connection.set(
lockKey.getBytes(StandardCharsets.UTF_8),
requestId.getBytes(StandardCharsets.UTF_8),
Expiration.from(expireTimeMillis, TimeUnit.MILLISECONDS),
RedisStringCommands.SetOption.ifAbsent()
),
true // `execute` in transaction, though for a single command it's not strictly necessary but good practice
);
return Boolean.TRUE.equals(success);
} catch (Exception e) {
log.error("Error acquiring distributed lock for key: {}", lockKey, e);
// In case of Redis connection issues, we treat it as lock acquisition failure.
// This prevents the application from proceeding without a lock.
return false;
}
}
@Override
public boolean unlock(String lockKey, String requestId) {
try {
// Execute the Lua script to ensure atomic check-and-delete.
Long result = stringRedisTemplate.execute(
unlockScript,
Collections.singletonList(lockKey),
requestId
);
// result == 1 means the key was deleted, 0 means it was not (either key not found or value mismatch).
return Long.valueOf(1L).equals(result);
} catch (Exception e) {
log.error("Error releasing distributed lock for key: {}", lockKey, e);
// If unlock fails due to Redis issues, the lock will eventually expire.
// The main risk here is a slight delay in another process acquiring the lock.
return false;
}
}
}
服务发现与架构整合
我们的特征计算服务本身是作为微服务存在的,通过服务发现机制(我们使用的是 Nacos)注册自身,并由上游的网关进行路由。引入分布式锁后,整个架构的交互流程变得更加清晰。
graph TD
A[Client Request] --> B{API Gateway};
B --> C[Feature Computation Service Instance 1];
B --> D[Feature Computation Service Instance 2];
B --> E[... Instance N];
subgraph Feature Computation Cluster
C -- Registers with --> F((Nacos));
D -- Registers with --> F;
E -- Registers with --> F;
end
C -- 1. tryLock(feature_key) --> G((Redis));
D -- 2. tryLock(feature_key) fails --> G;
G -- Lock acquired --> C;
C -- 3. Compute feature --> H[Database];
H -- Data --> C;
C -- 4. Write to cache & unlock --> G;
C --> B;
D -- Waits or returns empty --> B;
这个架构中,Nacos 负责服务实例的注册与发现,保证了计算服务的高可用和可扩展性。Redis 则扮演了“协调者”的角色,确保在任意时刻,只有一个服务实例在为特定特征执行计算。
重构后的特征计算服务
我们将 RedisDistributedLock 注入到 FeatureComputationService 中,并重构了核心计算逻辑。
// V2 - Refactored with Distributed Lock
@Service
public class FeatureComputationServiceV2 {
private static final Logger log = LoggerFactory.getLogger(FeatureComputationServiceV2.class);
private static final long LOCK_EXPIRE_MILLIS = 10_000; // 10 seconds expiration
private static final long LOCK_WAIT_MILLIS = 100; // Wait 100ms before retrying cache
@Autowired
private FeatureCacheClient featureCacheClient;
@Autowired
private UserBehaviorRepository userBehaviorRepository;
@Autowired
private DistributedLock distributedLock;
public FeatureValue computeUserHourlyPurchase(String userId) {
String featureName = "user_hourly_purchase";
String featureKey = "feature:" + featureName + ":" + userId;
String lockKey = "lock:feature_computation:" + featureName + ":" + userId;
// 1. 尝试从缓存获取
Optional<FeatureValue> cachedValue = featureCacheClient.get(featureKey);
if (cachedValue.isPresent()) {
return cachedValue.get();
}
// 2. 缓存未命中,尝试获取分布式锁
String requestId = UUID.randomUUID().toString();
boolean lockAcquired = distributedLock.tryLock(lockKey, requestId, LOCK_EXPIRE_MILLIS);
if (lockAcquired) {
// 2a. 成功获取锁
log.info("Lock acquired for key: {}. Starting feature computation.", lockKey);
try {
// Double-check cache, in case another thread finished computation just before we got the lock.
// This is a crucial optimization to avoid redundant work.
cachedValue = featureCacheClient.get(featureKey);
if (cachedValue.isPresent()) {
log.info("Feature found in cache after acquiring lock. Computation skipped.");
return cachedValue.get();
}
// Execute expensive computation
List<PurchaseRecord> records = userBehaviorRepository.findRecentPurchases(userId, Duration.ofHours(1));
BigDecimal totalAmount = records.stream()
.map(PurchaseRecord::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
FeatureValue result = new FeatureValue(userId, featureName, totalAmount, System.currentTimeMillis());
// Write result to cache
featureCacheClient.set(featureKey, result, Duration.ofMinutes(5));
return result;
} finally {
// 3. 无论如何,必须释放锁
if (!distributedLock.unlock(lockKey, requestId)) {
// This could happen if the lock expired due to long computation.
// It's a signal that LOCK_EXPIRE_MILLIS might be too short.
log.warn("Failed to unlock or lock expired for key: {}. RequestId: {}", lockKey, requestId);
} else {
log.info("Lock released for key: {}", lockKey);
}
}
} else {
// 2b. 获取锁失败,说明有其他实例正在计算
log.warn("Failed to acquire lock for key: {}. Another instance is likely computing.", lockKey);
// Strategy: wait for a short period and then try reading from cache again.
// This is a simple polling mechanism. For more complex scenarios, a pub/sub model could be used.
try {
Thread.sleep(LOCK_WAIT_MILLIS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return featureCacheClient.get(featureKey).orElse(FeatureValue.EMPTY); // Return empty or a default value if still not available
}
}
}
这段重构后的代码有几个关键改进:
- 加锁保护: 整个计算和缓存回写的过程被分布式锁保护起来。
- 双重检查锁定 (Double-Checked Locking): 获取锁后,再次检查缓存。这是因为在线程A等待锁的过程中,持有锁的线程B可能已经完成了计算并填充了缓存。这一步避免了不必要的重复计算。
- finally 块中释放锁: 保证了即使计算过程中抛出异常,锁也能被正确释放,避免死锁。
- 锁获取失败策略: 当获取锁失败时,我们没有直接返回失败,而是选择等待一小段时间后再次尝试从缓存读取。这是一种优雅降级,虽然会增加一点点响应延迟,但大大提高了数据获取的成功率,避免了将计算压力传递给调用方。
- 唯一请求ID: 使用
UUID作为requestId,确保了解锁操作的安全性,防止一个线程释放了另一个线程持有的锁(例如,由于GC暂停导致锁过期后,旧的解锁命令才到达Redis)。
遗留的挑战与未来优化路径
当前的实现已经解决了最核心的计算风暴问题,让系统在高并发下恢复了稳定。但在真实项目中,没有完美的方案,只有不断的权衡与迭代。
锁的公平性与性能: 当前的实现是非公平锁,任何客户端都可以抢占。在高竞争环境下,可能导致某些请求“饿死”。同时,失败的请求会进行短暂的
Thread.sleep,这在超高并发下会消耗大量线程资源。更优化的方式可以引入异步模型,或者使用 Redis 的 Pub/Sub 机制,让等待的线程订阅一个 channel,计算完成后由持锁线程发布消息来唤醒它们,避免无效轮询。锁的可重入性: 我们实现的锁是不可重入的。如果未来的业务逻辑变得复杂,一个持有锁的方法需要调用另一个也需要相同锁的方法,就会导致死锁。实现可重入锁需要维护一个持有计数,这会增加锁实现的复杂度。
Redisson 的启发: 像 Redisson 这样的成熟框架提供了更完善的分布式锁实现,例如它内置了“看门狗”(Watchdog)机制。客户端在获取锁后,会有一个后台线程定时延长锁的过期时间,只要客户端存活,锁就不会因为业务执行时间过长而过期。这解决了手动估算过期时间的难题。在未来的迭代中,与其重复造轮子,不如考虑直接引入并封装 Redisson,将团队精力聚焦于业务逻辑本身。
服务发现与负载均衡: 虽然 Nacos 解决了服务注册发现,但流量均匀地打到各个实例上,反而加剧了锁的竞争。未来可以探索一种更智能的路由策略,例如基于用户ID的哈希,将同一用户的请求尽可能路由到同一个实例,从而将分布式锁的竞争降级为本地锁,进一步提升性能。但这会破坏服务的无状态性,需要仔细评估其带来的复杂性。