併發Redis的控制方法

背景

開發活動報名業務,涉及到活動人數限制的問題,當併發量上來的時候,多人同時提交報名信息,將會導致活動已報名人數的不準確,對業務造成影響,如下圖:

併發Redis的控制方法

分析出現問題的原因是,設置操作發生的時候,並沒有確保當前人數的準確性,即沒有確保當前查詢出來的已報名人數與數據庫的一致性,導致客戶端併發的兩次操作有被覆蓋的情況發生

傳統數據庫 VS NoSql

mysql

針對如上場景,若報名人數字段保存在mysql數據庫中,可以使用一種常見的降低讀寫鎖衝突,保證數據一致性的樂觀鎖機制(Compare and Set CAS),實現方案如下

將原來的操作sql代碼

update act set num=#{numNew} where actId=#{actId}

改為

update act set num=#{numNew} where actId=#{actId} and num=#{numOld}

即只有當查詢出來的數據與當前數據庫的數據一致時,才可以進行賦值操作,否則失敗

redis

若使用redis,則活動報名人數以鍵值對的形式存在內存中,業務代碼將會對內存中的人數進行操作,相比mysql,redis的效率更高,不會造成很大的延遲(若當併發量很大時,使用mysql進行報名人數記錄,CAS的方案將會導致很多客戶端操作失敗,用戶體驗不好),但使用redis,其沒有很好的事務支持,以上mysql的解決方案不能很好的運用在redis上,因此如何設計redis鎖,進行共享資源(已報名活動人數)的操作,是需要解決的問題

使用到的命令說明

設計Redis鎖之前,需要介紹下即將用到的幾個命令

SETNX

將key設置值為value,如果key不存在,這種情況下等同SET命令,返回值1。 當key存在時,什麼也不做,返回值0。

watch && MULTI

watch:標記所有指定的key 被監視起來,在事務中有條件的執行(樂觀鎖)

MULTI:標記一個事務塊的開始。 隨後的指令將在執行EXEC時作為一個原子執行

當兩者一起使用的時候,首先key被watch監視,若在調用 EXEC 命令執行事務時, 如果任意一個被監視的鍵被其他客戶端修改了, 那麼整個事務不再執行, 直接返回失敗。如下表:

併發Redis的控制方法

在時間 T4 ,客戶端 B 修改了 name 鍵的值, 當客戶端 A 在 T5 執行 EXEC 時,Redis 會發現 name 這個被監視的鍵已經被修改, 因此客戶端 A 的事務不會被執行,而是直接返回失敗。

GETSET

GETSET key value 返回之前的舊值value,之後設置key的新值

Redis基本解決思路以及遇到的問題

以下列舉使用redis鎖的基本思路

注:例子使用spring-data-redis庫,setnx命令變為setIfAbsent,並且返回true or false

private StringRedisTemplate stringRedisTemplate;public Boolean setConcurrentLock(String key) throws InterruptedException { ValueOperations ops = stringRedisTemplate.opsForValue(); while (!ops.setIfAbsent(key, "lock"))) { TimeUnit.MILLISECONDS.sleep(3); } return true;}public void deleteConcurrentLock(String key) { stringRedisTemplate.delete(key);}

如上獲取redis鎖使用了setnx命令,若lock被佔用,則返回false,一直循環,直到lock被刪除後可以賦值成功,才能獲得鎖,實現對共享資源加鎖。

但是,很明顯,while存在死循環死鎖的可能,當如下場景:

線程1獲取到lock,線程2,線程3在執行while循環等待lock刪除,若線程1突然掛掉,沒能刪除lock,則導致線程2,線程3死循環,死鎖

想到解決方案為對鎖設置超時,防止無限制循環,代碼如下:

private StringRedisTemplate stringRedisTemplate;public Boolean setConcurrentLock(String key, long expireTime) throws InterruptedException { ValueOperations ops = stringRedisTemplate.opsForValue(); //expireTime 為鎖超時時間 while (!ops.setIfAbsent(key, String.valueOf(System.currentTimeMillis() + expireTime))) { Long expire = Long.parseLong(ops.get(key)); //判斷是否超時 if (expire != null && expire < System.currentTimeMillis()) { //getset獲取舊的時間,並且設置新的超時時間 Long oldExpire = Long.parseLong(ops.getAndSet(key, String.valueOf(System.currentTimeMillis() + expireTime))); if (oldExpire != null && oldExpire < System.currentTimeMillis()) { break; } } TimeUnit.MILLISECONDS.sleep(3); } return true;}public void deleteConcurrentLock(String key) { stringRedisTemplate.delete(key);}

若獲取鎖失敗,進入while循環,判斷超時時間是否已到,if判斷為真,證明lock已經超時。所以執行getset命令,獲取舊的時間,並設置新的超時時間,若獲取的舊的時間超時了,則證明獲取lock成功,跳出循環

但此處添加超時控制仍然存在問題,如下場景

場景一:

線程1獲取lock並且掛掉,線程2,線程3 進入while循環後,同時判斷出lock已經超時,線程2首先執行getset命令,返回了線程1設置的超時時間,確實超時,線程2獲取鎖;線程3執行getset命令,返回了線程2設置的超時時間,並未超時,但是線程3重新設置了超時時間

場景二:

有關刪除鎖的方法,若線程2持有鎖期間超時,但是操作沒有執行完,鎖被線程3重新設置,變為線程3的鎖,線程2執行完畢後,直接執行del,則會把線程3的鎖刪除,出現問題

Redis最終實踐方案

針對上面列舉的兩個問題,修改代碼的最終實踐版如下:

private StringRedisTemplate stringRedisTemplate;public static ThreadLocal holder = new ThreadLocal<>();public Boolean setConcurrentLock(String key, long expireTime) throws InterruptedException { ValueOperations ops = stringRedisTemplate.opsForValue(); while (!ops.setIfAbsent(key, String.valueOf(System.currentTimeMillis() + expireTime))) { stringRedisTemplate.watch(key); Long expire = Long.parseLong(ops.get(key)); if (expire != null && expire < System.currentTimeMillis()) { stringRedisTemplate.multi(); Long oldExpire = Long.parseLong(ops.getAndSet(key, String.valueOf(System.currentTimeMillis() + expireTime))); if (stringRedisTemplate.exec() != null && oldExpire != null && oldExpire < System.currentTimeMillis()) { break; } } else { stringRedisTemplate.unwatch(); } TimeUnit.MILLISECONDS.sleep(3); } holder.set(ops.get(key)); return true;}public void deleteConcurrentLock(String key) { ValueOperations ops = stringRedisTemplate.opsForValue(); Long expire = Long.valueOf(ops.get(key)); if(exprie.equals(holder.get())){ stringRedisTemplate.delete(key); } holder.remove();}

如上面的場景一,首先線程2,線程3同時判斷出lock超時後,對lock進行watch監視,然後將getset操作放到事務中執行,若線程2執行完事務,修改了lock的時間後,線程3由於執行事務命令lock被修改而失敗,不會覆蓋設置線程2的超時時間,解決場景一問題

對於場景二,為了防止已經超時的線程誤刪其他正在執行的線程lock,引入ThreadLock變量,將本線程設置的超時時間放入ThreadLock中,若刪除的時候,從Redis取出的時間變化了,證明該線程超時,時間被其他線程重新設置過,就不需要刪除lock。最後需要注意的是使用ThreadLocal需要在判斷是夠刪除lock鎖時手動刪除,防止web服務器中的線程池對線程複用,造成ThreadLocal重複使用。

總結

本篇實踐是基於單點redis服務器情況下的鎖(若工程在多機器下部署,可以裝逼的叫redis分佈式鎖)。但在redis集群架構下,如果master節點down機,由於redis主從複製是異步的,會有明顯的race-condition。Redis文檔中提供了一種解決方案:RedLock,後續有機會再去實踐學習吧。。。


分享到:


相關文章: