手把手教你實現基於Redis的分佈式鎖

1. 概述

目前,分佈式系統已經是各大公司的標配,它具有高可用、可擴展等特點。在分佈式系統中,由於存在多臺機器上的進程競爭同一份資源的問題,因此需要分佈式鎖來保證同步訪問資源。

一個經典的場景就是淘寶雙11秒殺活動,全國人民的客戶端訪問不同的後端服務器,然後後端服務器再訪問數據庫,此時數據庫就是需要同步訪問的資源。

在介紹基於Redis實現的分佈式鎖之前;以Python語言為例,我們看看根據應用的實現架構,同步鎖可能會有以下幾種類型

  1. 如果處理程序是單進程多線程的,在Python語言中,就可以使用 threading 模塊的 Lock 對象來限制對共享資源的同步訪問,實現多線程安全。
  2. 單機多進程的情況,在Python語言中,可以使用 multiprocessing 的 Lock 對象來保證多進程安全。
  3. 多機多進程部署的情況,需要依賴一個第三方組件(存儲鎖對象)來實現一個分佈式的同步鎖。

2. 分佈式鎖的必要條件

本文主要介紹第三種場景下基於Redis如何實現分佈式鎖。現在我們來看看實現一個分佈式鎖的必要條件

有哪些?

  1. 原子性:加鎖和釋放鎖的操作必須滿足原子性
  2. 無死鎖:不會發生死鎖(PS:例如已獲得鎖的線程/進程在釋放鎖之前突然異常退出,導致其他線程/進程會一直在循環等待鎖被釋放)
  3. 互斥性:同一個時刻只能有一個線程/進程佔有鎖,其他線程/進程必須等待直到鎖被釋放
  4. 可重入性:當前線程/進程獲得鎖之後,還可以繼續調用獲取鎖的操作,第二次以及之後的獲取鎖的操作不會被阻塞等待(PS:釋放鎖的操作也是一樣的,調用多次之後,只有最後一次釋放鎖的時候才會真正地釋放鎖)--- 這個條件根據業務來決定是否需要實現

3. 實現過程

根據分佈式鎖的必要條件,下面將給出幾種實現方式,來觀察任意一個條件不滿足時,會出現什麼樣的問題?在實現的過程中將使用同一份測試用例。測試用例代碼如下:

<code># test.py
'''
啟用多個線程對 redis 中的 test_key 的值進行自增操作,理想情況,test_key 的值應該等於線程的數量,比如開了 10 個線程,test_key的值最終應該是10。

'''
def increase(redis, lock, key):
# 獲得鎖
lock_value = lock.get_lock(key)
value = redis.get(key)
# 模擬實際情況下進行的某些耗時操作
time.sleep(0.1)
value += 1
redis.set(key, value)
thread_name = threading.current_thread().name
# 打印線程名和最新的值
print thread_name, new_value
# 釋放鎖
lock.del_lock(key, lock_value)

# 連接服務端
redis = RedisCli(REDIS_CACHE_HOST_LIST, REDIS_CACHE_MASTER_NAME)
lock = RedisLock(redis)
key = 'test_key'
thread_count = 10
redis.delete(key)
for i in xrange(thread_count):
thread = threading.Thread(target=increase, args=(redis, lock, key))
thread.start()/<code>

Tips:下面的代碼片段中只展示需要修改的部分,其他部分和test.py保持一致。

3.1 原子性

在這個版本中,當線程 A get(lock_key) 的值為空時,set lock_key 的值為 1,並返回,這表示線程 A 獲得了鎖,可以繼續執行後面的操作,否則需要一直循環去獲取鎖,直到 key 的值再次為空,重新獲得鎖,執行任務完成後釋放鎖。

<code>class RedisLock(object):

def __init__(self, rediscli):
self.rediscli = rediscli

def _get_lock_key(self, key):
lock_key = "lock_%s" % key
return lock_key

def get_lock(self, key):
lock_key = self._get_lock_key(key)
while True:
value = self.rediscli.get(lock_key)
if not value:
self.rediscli.set(lock_key, '1')
return True
time.sleep(0.01)

def del_lock(self, key, new_expire_time):
lock_key = self._get_lock_key(key)
return self.rediscli.delete(lock_key)/<code>

執行test.py測試腳本,得到的結果如下:

<code>Thread-1 1
Thread-5 2
Thread-2 2
Thread-6 3
Thread-7 3
Thread-4 3
Thread-9 4
Thread-8 5
Thread-10 5
Thread-3 5/<code>

觀察輸出結果發現,同時有多個線程輸出的結果是一樣的。初看上面加鎖的代碼邏輯似乎沒什麼問題,但是最終的結果卻事與願違,原因是上面的代碼get(lock_key)和set(lock_key, '1')並不是原子性的執行,而是分開執行。A 線程在get(lock_key)的時候發現是空值,於是重新set(lock_key, '1'),但在get操作之後,set操作之前,B 線程恰好執行了get(lock_key),此時B 線程的get操作得到的還是空值,然後也順利獲得鎖,導致數據被兩個或多個線程同時修改,最後出現不一致。

3.2 無死鎖

由於3.1的版本是因為get_lock方法不是原子性操作,造成兩個或多個線程同時獲得鎖的問題,這個版本改成使用 redis 的 setnx 命令來進行鎖的查詢和設置操作,setnx 即 set if not exists,顧名思義就是當key不存在的時候才設置 value,並返回 1,如果 key 已經存在,則不進行任何操作,返回 0。

<code>#只展示需要修改的部分,其他部分還是和3.1的代碼一樣
def get_lock(self, key):
lock_key = self._get_lock_key(key)
thread_name = threading.current_thread().name
while True:
value = self.rediscli.setnx(lock_key, 1)
if value:
return True
time.sleep(0.01)
print "{} waiting...".format(thread_name)/<code>

執行test.py測試腳本,得到的結果如下:

<code>Thread-1 1
Thread-4 2
Thread-2 3
Thread-3 4
Thread-7 5
Thread-6 6
Thread-5 7
Thread-8 8
Thread-9 9
Thread-10 10/<code>

輸出結果是正確的,但是還有潛在的問題。比如假設 A 線程獲得了鎖後,由於某種異常原因導致線程crash了,這個時候鎖將無法被釋放。稍微修改一下測試用例的 increase 函數,模擬某個線程在釋放鎖之前因為異常退出。

<code># test-3-2.py
def increase(redis, lock, key):
thread_name = threading.current_thread().name
lock_value = lock.get_lock(key)
value = redis.get(key)
if not value:
value = 0
# 模擬實際情況下進行的某些耗時操作
time.sleep(0.1)
value = int(value) + 1
redis.set(key, value)
print thread_name, value
# 模擬線程2異常退出
if thread_name == 'Thread-2':
print '{} crash...'.format(thread_name)
import sys
sys.exit(1)
lock.del_lock(key, lock_value)/<code>

執行test-3-2.py測試腳本,得到的結果如下:

<code>Thread-2 3
Thread-2 crash...
Thread-7 waiting...
Thread-3 waiting...
Thread-5 waiting...
Thread-4 waiting...
Thread-9 waiting...
Thread-6 waiting...
Thread-10 waiting.../<code>

此時就會出現問題,當線程2 crash 之後,後續獲取鎖的線程一直獲取不了鎖,一直處於等待鎖的狀態,於是產生了死鎖。如果請求是多線程處理的,比如每來一個請求就開一個線程去處理,那麼堆積的線程會逐漸增多,最終可能會導致系統崩潰。

當獲得鎖的線程異常退出後,無法主動釋放鎖,因此需要找到一種方式即使線程異常退出,線程佔用的鎖也能夠被釋放,顯然我們需要一種被動釋放鎖的機制。從 redis 2.6.12 版本開始,set 命令就已經支持了 nx 和 expire 功能。改進代碼如下:

<code>def get_lock(self, key, timeout=3):
lock_key = self._get_lock_key(key)
while True:
value = self.rediscli.set(lock_key, '1', nx=True, ex=timeout)
if value:
return True
time.sleep(0.01)/<code>

執行test.py測試腳本,得到的結果如下:

<code>Thread-1 1
Thread-9 2
Thread-6 3
Thread-2 4
Thread-4 5
Thread-5 6
Thread-8 7
Thread-3 8
Thread-7 9
Thread-10 10/<code>

執行test-3-2.py測試腳本,模擬 線程2 crash,得到的結果如下:

<code>Thread-1 1
Thread-2 2
Thread-2 crash...
Thread-10 3
Thread-7 4
Thread-4 5
Thread-8 6
Thread-3 7

Thread-9 8
Thread-6 9
Thread-5 10/<code>

從上面的運行結果來看,似乎已經解決了原子性和無死鎖的問題。那第三個條件互斥性是否滿足呢?正常情況下,3.2節的實現方式是滿足互斥性的,但是還有一種場景需要我們考慮:比如假設 A 線程的邏輯還沒處理完,但是鎖由於過期時間到了,導致鎖自動被釋放掉,這時 B 線程獲得了鎖,開始處理 B 的邏輯,然後 A 進程的邏輯處理完了,B 線程還在處理中,就把 B 線程的鎖給刪除了。通過修改一下測試用例,模擬一下這種場景。

<code>def increase(redis, lock, key):
thread_name = threading.current_thread().name
# 設置鎖的過期時間為2s
lock_value = lock.get_lock(key, thread_name, timeout=2)
value = redis.get(key)
if not value:
value = 0
# 模擬實際情況下進行的某些耗時操作, 且執行時間大於鎖過期的時間
time.sleep(2.5)
value = int(value) + 1
print thread_name, value
redis.set(key, value)
lock.del_lock(key, lock_value)/<code>

我們讓線程的執行時間大於鎖的過期時間,導致鎖到期自動釋放。執行上面的測試腳本,得到的結果如下:

<code>Thread-1 1
Thread-3 1
Thread-2 2
Thread-9 2
Thread-5 3
Thread-7 3
Thread-6 4
Thread-4 4
Thread-8 5
Thread-10 5/<code>

既然這種現象是由於鎖過期導致誤刪其他線程的鎖引發的,那我們就順著這個思路,強制線程只能刪除自己設置的鎖。如果是這樣,就需要為每個線程的鎖添加一個唯一標識。在我們的分佈式鎖實現機制中,我們每次添加鎖的時候,都是給 lock_key 設為 1,無論是 key 還是 value,都不具備唯一性,如果把 key 設為唯一的,那麼在分佈式系統中需要產生 N (等於總線程數)個 key 了 ,從直觀性和維護性上來說,這都是不可取的。因此只能將 value 設置為每個線程的唯一標識。這個唯一標識由線程 ID + 進程的 PID + 機器的 IP + 時間戳 + 集群名稱組成,這樣就構成了一個線程鎖的唯一標識。

3.3 互斥性

根據上一節最後的分析,我們設計出了基於Redis實現分佈式鎖的最終版。

<code># 最終版
class RedisLock(object):

def __init__(self, rediscli):

self.rediscli = rediscli.master
# ip 在實例化的時候就獲取,避免過多訪問DNS
self.ip = socket.gethostbyname(socket.gethostname())
self.pid = os.getpid()
self.cluster = "hna"

def _gen_lock_key(self, key):
lock_key = "lock_%s" % key
return lock_key

def _gen_unique_value(self):
thread_name = threading.current_thread().name
time_now = time.time()
unique_value = "{0}-{1}-{2}-{3}-{4}".format(self.ip, self.pid, thread_name, self.cluster, time_now)
return unique_value

def get_lock(self, key, timeout=3):
lock_key = self._gen_lock_key(key)
unique_value = self._gen_unique_value()
logger.info("unique value %s" % unique_value)
while True:
value = self.rediscli.set(lock_key, unique_value, nx=True, ex=timeout)
if value:
# 注意,我們返回了唯一標識,用於後面的delete時檢查是否是當前線程的鎖
return unique_value
# 進入阻塞狀態,避免一直消耗CPU
time.sleep(0.1)

def del_lock(self, key, value):
lock_key = self._gen_lock_key(key)
old_value = self.rediscli.get(lock_key)
# 檢查是否是當前線程持有的鎖
if old_value == value:
return self.rediscli.delete(lock_key)/<code>

執行test.py測試腳本,得到的結果如下:

<code>Thread-1 1
Thread-2 2
Thread-4 3
Thread-5 4
Thread-10 5

Thread-3 6
Thread-9 7
Thread-6 8
Thread-8 9
Thread-7 10/<code>

修改test.py測試腳本,測試一下鎖過期。測試腳本如下:

<code># test-3-3.py
def increase(redis, lock, key):
thread_name = threading.current_thread().name
lock_value = lock.get_lock(key, timeout=1)
value = redis.get(key)
if not value:
value = 0
# 模擬實際情況下進行的某些耗時操作, 且執行時間大於鎖過期的時間
time.sleep(3)
value = int(value) + 1
print thread_name, value
redis.set(key, value)
lock.del_lock(key, lock_value)/<code>

執行test-3-3.py測試腳本,得到的結果如下:

<code>Thread-1 1
Thread-2 1
Thread-5 1
Thread-6 2
Thread-8 2
Thread-10 2
Thread-9 3
Thread-3 3
Thread-4 3
Thread-7 4/<code>

從運行test-3-3.py測試腳本結果來看,問題沒有得到解決。這是為什麼呢?因為我們設置value的唯一性只能確保線程不會誤刪其他線程產生的鎖,不會出現一連串的誤刪鎖的情況,比如 A 刪了 B 的鎖,B 執行完刪了 C 的鎖。使用 redis 的過期機制,只要業務的處理時間大於鎖的過期時間,就沒有一個很好的方式來避免由於鎖過期導致其他線程同時佔有鎖的問題,所以需要熟悉業務的執行時間,來合理地設置鎖的過期時間。(PS:對於這種情況,一般的處理方式是獲得鎖的線程開啟一個守護線程,用來給快要過期的鎖"續航"。比如過去了29秒,線程A還沒執行完,這時候守護線程會執行expire指令,為這把鎖"續航"20秒。守護線程從第29秒開始執行,每20秒執行一次檢查。當線程A執行完任務,會顯式關掉守護線程。線程A的進程或者守護進程異常退出,這把鎖將自動超時釋放,從而不會導致死鎖。)

另外,需要注意的一點是:3.3節的實現方式中,刪除鎖(del_lock)的操作不是原子性的,先是拿到鎖,再判斷鎖的值是否相等,相等的話最後再刪除鎖,既然不是原子性的,就有可能存在這樣一種極端情況:在判斷的那一時刻,鎖正好過期了,被其他線程佔有了鎖,那最後一步的刪除,就可能會造成誤刪其他線程的鎖。因此推薦使用官方提供的 Lua 腳本來確保原子性:

<code>def del_lock(self, key, value):
if redis.call("get",key) == value then
return redis.call("del",key)
else
return 0/<code>
手把手教你實現基於Redis的分佈式鎖

<code>def del_lock(self, key, value):
if redis.call("get",key) == value then
return redis.call("del",key)
else
return 0/<code>

4. 總結

以上就是我們使用 Redis 來實現一個分佈式同步鎖的方式,其特點是:

  1. 加鎖和釋放鎖是原子性的
  2. 滿足互斥性,同一個時刻只能有一個線程可以獲取鎖和釋放鎖
  3. 利用 Redis 的 ttl機制和守護進程的方式來保證不會出現死鎖

以上的方案中,我們是假設 Redis 服務端是單集群且高可用的,忽視了以下的問題:

如果某一時刻 Redis master 節點發生了故障,集群中的某個 slave 節點變成 master 節點,在故障遷移(failover)過程中可能出現原 master 節點上的鎖沒有及時同步到 slave 節點,導致其他線程同時獲得鎖。對於這個問題,可以參考 Redis 官方推出的 redlock 算法,但是比較遺憾的是,該算法也沒有很好地解決鎖過期的問題。(PS:不過這種不安全也僅僅是在主從發生 failover 的情況下才會產生,而且持續時間極短,業務系統多數情況下可以容忍。)

  1. 漫畫:什麼是分佈式鎖?
  2. 基於 redis 的分佈式鎖實現
  3. redis分佈式鎖深度剖析(超時情況)
  4. SET key value
  5. Distributed locks with Redis


分享到:


相關文章: