Java高級架構——Redisson是如何實現分布式鎖的?

Java高級架構——Redisson是如何實現分佈式鎖的?

針對項目中使用的分佈式鎖進行簡單的示例配置以及源碼解析,並列舉源碼中使用到的一些基礎知識點,但是沒有對redisson中使用到的netty知識進行解析。

本篇主要是對以下幾個方面進行了探索

·Maven配置

·RedissonLock簡單示例

·源碼中使用到的Redis命令

·源碼中使用到的lua腳本語義

·源碼分析

Maven配置

Java高級架構——Redisson是如何實現分佈式鎖的?

RedissonLock簡單示例

redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。

redis服務器不在本地的同學請注意權限問題。

Sentinel配置

Configconfig= new Config();

config.useSentinelServers().addSentinelAddress("127.0.0.1:6479","127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);

RedissonClient redisson = Redisson.create(config);`

簡單使用

Java高級架構——Redisson是如何實現分佈式鎖的?

源碼中使用到的Redis命令

分佈式鎖主要需要以下redis命令,這裡列舉一下。在源碼分析部分可以繼續參照命令的操作含義。

1.EXISTS key :當 key 存在,返回1;若給定的 key 不存在,返回0。

2.GETSET key value:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字符串類型時,返回一個錯誤,當key不存在時,返回nil。

3.GET key:返回 key 所關聯的字符串值,如果 key 不存在那麼返回 nil。

4.DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。

5.HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值並返回1,如果field已有,那麼就更新value的值,並返回0.

6.HEXISTS key field:當key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。

7.HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創建。如果字段field不存在,在進行當前操作前,其將被創建,且對應的值被置為0,返回值是增量之後的值

8.PEXPIRE key milliseconds:設置存活時間,單位是毫秒。expire操作單位是秒。

9.PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。

源碼中使用到的lua腳本語義

Redisson源碼中,執行redis命令的是lua腳本,其中主要用到如下幾個概念。

redis.call() 是執行redis命令.

KEYS[1] 是指腳本中第1個參數

ARGV[1] 是指腳本中第一個參數的值

返回值中nil與false同一個意思。

需要注意的是,在redis執行lua腳本時,相當於一個redis級別的鎖,不能執行其他操作,類似於原子操作,也是redisson實現的一個關鍵點。

另外,如果lua腳本執行過程中出現了異常或者redis服務器直接宕掉了,執行redis的根據日誌回覆的命令,會將腳本中已經執行的命令在日誌中刪除。

源碼分析

RLOCK結構

publicinterfaceRLockextendsLock,RExpirable{

voidlockInterruptibly(longleaseTime, TimeUnit unit)throwsInterruptedException;

booleantryLock(longwaitTime,longleaseTime, TimeUnit unit)throwsInterruptedException;

voidlock(longleaseTime, TimeUnit unit);

voidforceUnlock();

booleanisLocked();

booleanisHeldByCurrentThread();

intgetHoldCount();

FutureunlockAsync();

FuturetryLockAsync();

FuturelockAsync();

FuturelockAsync(longleaseTime, TimeUnit unit);

FuturetryLockAsync(longwaitTime, TimeUnit unit);

FuturetryLockAsync(longwaitTime,longleaseTime, TimeUnit unit);

}

該接口主要繼承了Lock接口, 並擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認時間是30s

RedissonLock獲取鎖 tryLock源碼

FuturetryLockInnerAsync(longleaseTime, TimeUnit unit,longthreadId){

internalLockLeaseTime = unit.toMillis(leaseTime);

returncommandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,

"if (redis.call('exists', KEYS[1]) == 0) then "+

"redis.call('hset', KEYS[1], ARGV[2], 1); "+

"redis.call('pexpire', KEYS[1], ARGV[1]); "+

"return nil; "+

"end; "+

"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "+

"redis.call('hincrby', KEYS[1], ARGV[2], 1); "+

"redis.call('pexpire', KEYS[1], ARGV[1]); "+

"return nil; "+

"end; "+

"return redis.call('pttl', KEYS[1]);",

Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

}

其中,

KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock

ARGV[1] 表示的是 internalLockLeaseTime 默認值是30s

ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當前訪問線程,用於區分不同服務器上的線程.

逐句分析:

Java高級架構——Redisson是如何實現分佈式鎖的?

if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖名稱不存在

then redis.call(‘hset’, KEYS[1], ARGV[2],1) 則向redis中添加一個key為test_lock的set,並且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數為1

redis.call(‘pexpire’, KEYS[1], ARGV[1]) 設置set的過期時間,防止當前服務器出問題後導致死鎖,return nil; end;返回nil 結束

Java高級架構——Redisson是如何實現分佈式鎖的?

if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1) 如果鎖是存在的,檢測是否是當前線程持有鎖,如果是當前線程持有鎖

then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數++

redis.call(‘pexpire’, KEYS[1], ARGV[1]) 並且重新設置該鎖的有效時間

return nil; end;返回nil,結束

Java高級架構——Redisson是如何實現分佈式鎖的?

鎖存在, 但不是當前線程加的鎖,則返回鎖的過期時間。

RedissonLock解鎖 unlock源碼

@Override

publicvoidunlock(){

Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

"if (redis.call('exists', KEYS[1]) == 0) then "+

"redis.call('publish', KEYS[2], ARGV[1]); "+

"return 1; "+

"end;"+

"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "+

"return nil;"+

"end; "+

"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "+

"if (counter > 0) then "+

"redis.call('pexpire', KEYS[1], ARGV[2]); "+

"return 0; "+

"else "+

"redis.call('del', KEYS[1]); "+

"redis.call('publish', KEYS[2], ARGV[1]); "+

"return 1; "+

"end; "+

"return nil;",

Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));

if(opStatus ==null) {

thrownewIllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "

+ id +" thread-id: "+ Thread.currentThread().getId());

}

if(opStatus) {

cancelExpirationRenewal();

}

}

其中,

KEYS[1] 表是的是getName() 代表鎖名test_lock

KEYS[2] 表示getChanelName() 表示的是發佈訂閱過程中使用的Chanel

ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數字 0,代表解鎖消息

ARGV[2] 表示的是internalLockLeaseTime 默認的有效時間 30s

ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當前鎖id+線程id

語義分析:

if(redis.call('exists', KEYS[1]) == 0) then

redis.call('publish', KEYS[2], ARGV[1]);

return1;

end;

if (redis.call(‘exists’, KEYS[1]) == 0) 如果鎖已經不存在(可能是因為過期導致不存在,也可能是因為已經解鎖)

then redis.call(‘publish’, KEYS[2], ARGV[1]) 則發佈鎖解除的消息

return 1; end 返回1結束

if(redis.call('hexists', KEYS[1], ARGV[3]) ==0)then

returnnil;

end;

if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0) 如果鎖存在,但是若果當前線程不是加鎖的線

then return nil;end 則直接返回nil 結束

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);

if(counter >0)then

redis.call('pexpire', KEYS[1], ARGV[2]);

return0;

else

redis.call('del', KEYS[1]);

redis.call('publish', KEYS[2], ARGV[1]);

return1;

end;

local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1) 如果是鎖是當前線程所添加,定義變量counter,表示當前線程的重入次數-1,即直接將重入次數-1

if (counter > 0)如果重入次數大於0,表示該線程還有其他任務需要執行

then redis.call(‘pexpire’, KEYS[1], ARGV[2]) 則重新設置該鎖的有效時間

return 0 返回0結束

else redis.call(‘del’, KEYS[1]) 否則表示該線程執行結束,刪除該鎖

redis.call(‘publish’, KEYS[2], ARGV[1]) 並且發佈該鎖解除的消息

return 1; end;返回1結束

returnnil;

其他情況返回nil並結束

if(opStatus ==null) {

thrownewIllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "

+ id +" thread-id: "+ Thread.currentThread().getId());

}

腳本執行結束之後,如果返回值不是0或1,即當前線程去解鎖其他線程的加鎖時,拋出異常。

RedissonLock強制解鎖源碼

@Override

publicvoidforceUnlock()

{

get(forceUnlockAsync());

}

FutureforceUnlockAsync(){

cancelExpirationRenewal();

returncommandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

"if (redis.call('del', KEYS[1]) == 1) then "

+"redis.call('publish', KEYS[2], ARGV[1]); "

+"return 1 "

+"else "

+"return 0 "

+"end",

Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage);

}

以上是強制解鎖的源碼,在源碼中並沒有找到forceUnlock()被調用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則併發布鎖被刪除的消息,返回1結束,否則返回0結束。

總結

這裡只是簡單的一個redisson分佈式鎖的測試用例,並分析了執行lua腳本這部分,如果要繼續分析執行結束之後的操作,需要進行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。


分享到:


相關文章: