![Java高級架構——Redisson是如何實現分佈式鎖的?](http://p2.ttnews.xyz/loading.gif)
針對項目中使用的分佈式鎖進行簡單的示例配置以及源碼解析,並列舉源碼中使用到的一些基礎知識點,但是沒有對redisson中使用到的netty知識進行解析。
本篇主要是對以下幾個方面進行了探索
·Maven配置
·RedissonLock簡單示例
·源碼中使用到的Redis命令
·源碼中使用到的lua腳本語義
·源碼分析
Maven配置
![Java高級架構——Redisson是如何實現分佈式鎖的?](http://p2.ttnews.xyz/loading.gif)
RedissonLock簡單示例
redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。
redis服務器不在本地的同學請注意權限問題。
Sentinel配置
Configconfig= new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6479","127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);
RedissonClient redisson = Redisson.create(config);`
簡單使用
源碼中使用到的Redis命令
分佈式鎖主要需要以下redis命令,這裡列舉一下。在源碼分析部分可以繼續參照命令的操作含義。
1.EXISTS key :當 key 存在,返回1;若給定的 key 不存在,返回0。
2.GETSET key value:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。
3.GET key:返回 key 所關聯的字符串值,如果 key 不存在那麼返回 nil。
4.DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。
5.HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值並返回1,如果field已有,那麼就更新value的值,並返回0.
6.HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。
7.HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創建。如果字段field不存在,在進行當前操作前,其將被創建,且對應的值被置為0,返回值是增量之後的值
8.PEXPIRE key milliseconds:設置存活時間,單位是毫秒。expire操作單位是秒。
9.PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。
源碼中使用到的lua腳本語義
Redisson源碼中,執行redis命令的是lua腳本,其中主要用到如下幾個概念。
redis.call() 是執行redis命令.
KEYS[1] 是指腳本中第1個參數
ARGV[1] 是指腳本中第一個參數的值
返回值中nil與false同一個意思。
需要注意的是,在redis執行lua腳本時,相當於一個redis級別的鎖,不能執行其他操作,類似於原子操作,也是redisson實現的一個關鍵點。
另外,如果lua腳本執行過程中出現了異常或者redis服務器直接宕掉了,執行redis的根據日誌回覆的命令,會將腳本中已經執行的命令在日誌中刪除。
源碼分析
RLOCK結構
publicinterfaceRLockextendsLock,RExpirable{
voidlockInterruptibly(longleaseTime, TimeUnit unit)throwsInterruptedException;
booleantryLock(longwaitTime,longleaseTime, TimeUnit unit)throwsInterruptedException;
voidlock(longleaseTime, TimeUnit unit);
voidforceUnlock();
booleanisLocked();
booleanisHeldByCurrentThread();
intgetHoldCount();
FutureunlockAsync();
FuturetryLockAsync();
FuturelockAsync();
FuturelockAsync(longleaseTime, TimeUnit unit);
FuturetryLockAsync(longwaitTime, TimeUnit unit);
FuturetryLockAsync(longwaitTime,longleaseTime, TimeUnit unit);
}
該接口主要繼承了Lock接口, 並擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s
RedissonLock獲取鎖 tryLock源碼
FuturetryLockInnerAsync(longleaseTime, TimeUnit unit,longthreadId){
internalLockLeaseTime = unit.toMillis(leaseTime);
returncommandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
"if (redis.call('exists', KEYS[1]) == 0) then "+
"redis.call('hset', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]); "+
"return nil; "+
"end; "+
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+
"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+
"redis.call('pexpire', KEYS[1], ARGV[1]); "+
"return nil; "+
"end; "+
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
其中,
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock
ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s
ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當前訪問線程,用於區分不同服務器上的線程.
逐句分析:
if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖名稱不存在
then redis.call(‘hset’, KEYS[1], ARGV[2],1) 則向redis中添加一個key為test_lock的set,並且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數為1
redis.call(‘pexpire’, KEYS[1], ARGV[1]) 設置set的過期時間,防止當前服務器出問題後導致死鎖,return nil; end;返回nil 結束
if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) 如果鎖是存在的,檢測是否是當前線程持有鎖,如果是當前線程持有鎖
then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數++
redis.call(‘pexpire’, KEYS[1], ARGV[1]) 並且重新設置該鎖的有效時間
return nil; end;返回nil,結束
鎖存在, 但不是當前線程加的鎖,則返回鎖的過期時間。
RedissonLock解鎖 unlock源碼
@Override
publicvoidunlock(){
Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('exists', KEYS[1]) == 0) then "+
"redis.call('publish', KEYS[2], ARGV[1]); "+
"return 1; "+
"end;"+
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "+
"return nil;"+
"end; "+
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "+
"if (counter > 0) then "+
"redis.call('pexpire', KEYS[1], ARGV[2]); "+
"return 0; "+
"else "+
"redis.call('del', KEYS[1]); "+
"redis.call('publish', KEYS[2], ARGV[1]); "+
"return 1; "+
"end; "+
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
if(opStatus ==null) {
thrownewIllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id +" thread-id: "+ Thread.currentThread().getId());
}
if(opStatus) {
cancelExpirationRenewal();
}
}
其中,
KEYS[1] 表是的是getName() 代表鎖名test_lock
KEYS[2] 表示getChanelName() 表示的是發佈訂閱過程中使用的Chanel
ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數字 0,代表解鎖消息
ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s
ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當前鎖id+線程id
語義分析:
if(redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return1;
end;
if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖已經不存在(可能是因為過期導致不存在,也可能是因為已經解鎖)
then redis.call(‘publish’, KEYS[2], ARGV[1]) 則發佈鎖解除的消息
return 1; end 返回1結束
if(redis.call('hexists', KEYS[1], ARGV[3]) ==0)then
returnnil;
end;
if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當前線程不是加鎖的線
then return nil;end 則直接返回nil 結束
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if(counter >0)then
redis.call('pexpire', KEYS[1], ARGV[2]);
return0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return1;
end;
local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1) 如果是鎖是當前線程所添加,定義變量counter,表示當前線程的重入次數-1,即直接將重入次數-1
if (counter > 0)如果重入次數大於0,表示該線程還有其他任務需要執行
then redis.call(‘pexpire’, KEYS[1], ARGV[2]) 則重新設置該鎖的有效時間
return 0 返回0結束
else redis.call(‘del’, KEYS[1]) 否則表示該線程執行結束,刪除該鎖
redis.call(‘publish’, KEYS[2], ARGV[1]) 並且發佈該鎖解除的消息
return 1; end;返回1結束
returnnil;
其他情況返回nil並結束
if(opStatus ==null) {
thrownewIllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id +" thread-id: "+ Thread.currentThread().getId());
}
腳本執行結束之後,如果返回值不是0或1,即當前線程去解鎖其他線程的加鎖時,拋出異常。
RedissonLock強制解鎖源碼
@Override
publicvoidforceUnlock()
{
get(forceUnlockAsync());
}
FutureforceUnlockAsync(){
cancelExpirationRenewal();
returncommandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then "
+"redis.call('publish', KEYS[2], ARGV[1]); "
+"return 1 "
+"else "
+"return 0 "
+"end",
Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage);
}
以上是強制解鎖的源碼,在源碼中並沒有找到forceUnlock()被調用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則併發布鎖被刪除的消息,返回1結束,否則返回0結束。
總結
這裡只是簡單的一個redisson分佈式鎖的測試用例,並分析了執行lua腳本這部分,如果要繼續分析執行結束之後的操作,需要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。
閱讀更多 以JAVA架構贏天下 的文章