本文來自社群這周的討論話題—— 技術專題討論第四期:漫談分散式鎖,也總結了我對分散式鎖的認知和使用經驗。
應用場景
當多個機器(多個行程)會對同一條資料進行修改時,並且要求這個修改是原子性的。這裡有兩個限定:(1)多個行程之間的競爭,意味著JDK自帶的鎖失效;(2)原子性修改,意味著資料是有狀態的,修改前後有依賴。
實現方式
-
基於Redis實現,主要基於redis的setnx(set if not exist)命令;
-
基於Zookeeper實現;
-
基於version欄位實現,樂觀鎖,兩個執行緒可以同時讀取到原有的version值,但是最終只有一個可以完成操作;
這三種方式中,我接觸過第一和第三種。基於redis的分散式鎖功能更加強大,可以實現阻塞和非阻塞鎖。
基於Redis的實踐
鎖的實現
-
鎖的key為標的資料的唯一鍵,value為鎖的期望超時時間點;
-
首先進行一次setnx命令,嘗試獲取鎖,如果獲取成功,則設定鎖的最終超時時間(以防在當前行程獲取鎖後奔潰導致鎖無法釋放);如果獲取鎖失敗,則檢查當前的鎖是否超時,如果發現沒有超時,則獲取鎖失敗;如果發現鎖已經超時(即鎖的超時時間小於等於當前時間),則再次嘗試獲取鎖,取到後判斷下當前的超時時間和之前的超時時間是否相等,如果相等則說明當前的客戶端是排隊等待的執行緒裡的第一個嘗試獲取鎖的,讓它獲取成功即可。
public class RedisDistributionLock {
private static final Logger logger = LoggerFactory.getLogger(RedisDistributionLock.class);
//key的TTL,一天
private static final int finalDefaultTTLwithKey = 24 * 3600;
//鎖預設超時時間,20秒
private static final long defaultExpireTime = 20 * 1000;
private static final boolean Success = true;
@Resource( name = "redisTemplate")
private RedisTemplate<String, String> redisTemplateForGeneralize;
/**
* 加鎖,鎖預設超時時間20秒
* @param resource
* @return
*/
public boolean lock(String resource) {
return this.lock(resource, defaultExpireTime);
}
/**
* 加鎖,同時設定鎖超時時間
* @param key 分散式鎖的key
* @param expireTime 單位是ms
* @return
*/
public boolean lock(String key, long expireTime) {
logger.debug("redis lock debug, start. key:[{}], expireTime:[{}]",key,expireTime);
long now = Instant.now().toEpochMilli();
long lockExpireTime = now + expireTime;
//setnx
boolean executeResult = redisTemplateForGeneralize.opsForValue().setIfAbsent(key,String.valueOf(lockExpireTime));
logger.debug("redis lock debug, setnx. key:[{}], expireTime:[{}], executeResult:[{}]", key, expireTime,executeResult);
//取鎖成功,為key設定expire
if (executeResult == Success) {
redisTemplateForGeneralize.expire(key,finalDefaultTTLwithKey, TimeUnit.SECONDS);
return true;
}
//沒有取到鎖,繼續流程
else{
Object valueFromRedis = this.getKeyWithRetry(key, 3);
// 避免獲取鎖失敗,同時對方釋放鎖後,造成NPE
if (valueFromRedis != null) {
//已存在的鎖超時時間
long oldExpireTime = Long.parseLong((String)valueFromRedis);
logger.debug("redis lock debug, key already seted. key:[{}], oldExpireTime:[{}]",key,oldExpireTime);
//鎖過期時間小於當前時間,鎖已經超時,重新取鎖
if (oldExpireTime <= now) {
logger.debug("redis lock debug, lock time expired. key:[{}], oldExpireTime:[{}], now:[{}]", key, oldExpireTime, now);
String valueFromRedis2 = redisTemplateForGeneralize.opsForValue().getAndSet(key, String.valueOf(lockExpireTime));
long currentExpireTime = Long.parseLong(valueFromRedis2);
//判斷currentExpireTime與oldExpireTime是否相等
if(currentExpireTime == oldExpireTime){
//相等,則取鎖成功
logger.debug("redis lock debug, getSet. key:[{}], currentExpireTime:[{}], oldExpireTime:[{}], lockExpireTime:[{}]", key, currentExpireTime, oldExpireTime, lockExpireTime);
redisTemplateForGeneralize.expire(key, finalDefaultTTLwithKey, TimeUnit.SECONDS);
return true;
}else{
//不相等,取鎖失敗
return false;
}
}
}
else {
logger.warn("redis lock,lock have been release. key:[{}]", key);
return false;
}
}
return false;
}
private Object getKeyWithRetry(String key, int retryTimes) {
int failTime = 0;
while (failTime < retryTimes) {
try {
return redisTemplateForGeneralize.opsForValue().get(key);
} catch (Exception e) {
failTime++;
if (failTime >= retryTimes) {
throw e;
}
}
}
return null;
}
/**
* 解鎖
* @param key
* @return
*/
public boolean unlock(String key) {
logger.debug("redis unlock debug, start. resource:[{}].",key);
redisTemplateForGeneralize.delete(key);
return Success;
}
}
自定義註解使用分散式鎖
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface RedisLockAnnoation {
String keyPrefix() default "";
/**
* 要鎖定的key中包含的屬性
*/
String[] keys() default {};
/**
* 是否阻塞鎖;
* 1. true:獲取不到鎖,阻塞一定時間;
* 2. false:獲取不到鎖,立即傳回
*/
boolean isSpin() default true;
/**
* 超時時間
*/
int expireTime() default 10000;
/**
* 等待時間
*/
int waitTime() default 50;
/**
* 獲取不到鎖的等待時間
*/
int retryTimes() default 20;
}
實現分散式鎖的邏輯
@Component
@Aspect
public class RedisLockAdvice {
private static final Logger logger = LoggerFactory.getLogger(RedisLockAdvice.class);
@Resource
private RedisDistributionLock redisDistributionLock;
@Around("@annotation(RedisLockAnnoation)")
public Object processAround(ProceedingJoinPoint pjp) throws Throwable {
//獲取方法上的註解物件
String methodName = pjp.getSignature().getName();
Class> classTarget = pjp.getTarget().getClass();
Class>[] par = ((MethodSignature) pjp.getSignature()).getParameterTypes();
Method objMethod = classTarget.getMethod(methodName, par);
RedisLockAnnoation redisLockAnnoation = objMethod.getDeclaredAnnotation(RedisLockAnnoation.class);
//拼裝分散式鎖的key
String[] keys = redisLockAnnoation.keys();
Object[] args = pjp.getArgs();
Object arg = args[0];
StringBuilder temp = new StringBuilder();
temp.append(redisLockAnnoation.keyPrefix());
for (String key : keys) {
String getMethod = "get" + StringUtils.capitalize(key);
temp.append(MethodUtils.invokeExactMethod(arg, getMethod)).append("_");
}
String redisKey = StringUtils.removeEnd(temp.toString(), "_");
//執行分散式鎖的邏輯
if (redisLockAnnoation.isSpin()) {
//阻塞鎖
int lockRetryTime = 0;
try {
while (!redisDistributionLock.lock(redisKey, redisLockAnnoation.expireTime())) {
if (lockRetryTime++ > redisLockAnnoation.retryTimes()) {
logger.error("lock exception. key:{}, lockRetryTime:{}", redisKey, lockRetryTime);
throw ExceptionUtil.geneException(CommonExceptionEnum.SYSTEM_ERROR);
}
ThreadUtil.holdXms(redisLockAnnoation.waitTime());
}
return pjp.proceed();
} finally {
redisDistributionLock.unlock(redisKey);
}
} else {
//非阻塞鎖
try {
if (!redisDistributionLock.lock(redisKey)) {
logger.error("lock exception. key:{}", redisKey);
throw ExceptionUtil.geneException(CommonExceptionEnum.SYSTEM_ERROR);
}
return pjp.proceed();
} finally {
redisDistributionLock.unlock(redisKey);
}
}
}
}
參考資料
-
Java分散式鎖三種實現方案
-
Java註解的基礎與高階應用
-
基於 AOP 和 Redis 實現的分散式鎖
-
如何高效排查系統故障?一分錢引發的系統設計“踩坑”案例
-
用redis實現分散式鎖