限流?Python+Redis就能搞定

最近在瀏覽高併發相關內容並試圖點亮技能樹的時候,總是能看到一句話:

保護高併發系統的三大利器:緩存、降級和限流。 那什麼是 限流 呢?用我沒讀過太多書的話來講, 限流 就是限制流量。我們都知道服務器的處理能力是有上限的,如果超過了上限繼續放任請求進來的話,可能會發生不可控的後果。而通過 限流 ,在請求數量超出閾值的時候就排隊等待甚至拒絕服務,就可以使系統在扛不住過高併發的情況下做到 有損服務

而不是不服務。

舉個栗子:chestnut:,最近新型肺炎肆虐,各地都出現口罩緊缺的情況,廣州政府為了緩解市民買不到口罩的狀況,上線了預約服務,只有預約到的市民才能到指定的藥店購買少量口罩。這就是生活中 限流 的情況,說這個也是希望大家這段時間保護好自己,注意防護 :)

限流?Python+Redis就能搞定

接下來就跟大家分享下接口限流的常見玩法吧,部分算法用 python + redis 粗略實現了一下,關鍵是圖解啊!你品,你細品~

固定窗口法

固定窗口法是限流算法裡面最簡單的,比如我想限制1分鐘以內請求為100個,從現在算起的一分鐘內,請求就最多就是100個,這分鐘過完的那一刻把計數器歸零,重新計算,週而復始。

限流?Python+Redis就能搞定

固定窗口法

偽代碼實現

<code>def can_pass_fixed_window(user, action, time_zone=60, times=30):    """    :param user: 用戶唯一標識    :param action: 用戶訪問的接口標識(即用戶在客戶端進行的動作)    :param time_zone: 接口限制的時間段    :param time_zone: 限制的時間段內允許多少請求通過    """    key = '{}:{}'.format(user, action)    # redis_conn 表示redis連接對象    count = redis_conn.get(key)    if not count:        count = 1        redis_conn.setex(key, time_zone, count)    if count < times:        redis_conn.incr(key)        return True    return False複製代碼/<code>

這個方法雖然簡單,但有個大問題是無法應對兩個時間邊界內的突發流量。如上圖所示,如果在計數器清零的前1秒以及清零的後1秒都進來了100個請求,那麼在短時間內服務器就接收到了兩倍的(200個)請求,這樣就有可能壓垮系統。會導致上面的問題是因為我們的統計精度還不夠,為了將臨界問題的影響降低,我們可以使用滑動窗口法。

滑動窗口法

滑動窗口法,簡單來說就是隨著時間的推移,時間窗口也會持續移動,有一個計數器不斷維護著窗口內的請求數量,這樣就可以保證任意時間段內,都不會超過最大允許的請求數。例如當前時間窗口是0s~60s,請求數是40,10s後時間窗口就變成了10s~70s,請求數是60。

時間窗口的滑動和計數器可以使用redis的有序集合(sorted set)來實現。score的值用毫秒時間戳來表示,可以利用 當前時間戳 - 時間窗口的大小 來計算出窗口的邊界,然後根據score的值做一個範圍篩選就可以圈出一個窗口;value的值僅作為用戶行為的唯一標識,也用毫秒時間戳就好。最後統計一下窗口內的請求數再做判斷即可。

限流?Python+Redis就能搞定

滑動窗口法

偽代碼實現

<code>def can_pass_slide_window(user, action, time_zone=60, times=30):    """    :param user: 用戶唯一標識    :param action: 用戶訪問的接口標識(即用戶在客戶端進行的動作)    :param time_zone: 接口限制的時間段    :param time_zone: 限制的時間段內允許多少請求通過    """    key = '{}:{}'.format(user, action)    now_ts = time.time() * 1000    # value是什麼在這裡並不重要,只要保證value的唯一性即可,這裡使用毫秒時間戳作為唯一值    value = now_ts     # 時間窗口左邊界    old_ts = now_ts - (time_zone * 1000)    # 記錄行為    redis_conn.zadd(key, value, now_ts)    # 刪除時間窗口之前的數據    redis_conn.zremrangebyscore(key, 0, old_ts)    # 獲取窗口內的行為數量    count = redis_conn.zcard(key)    # 設置一個過期時間免得佔空間    redis_conn.expire(key, time_zone + 1)    if not count or count < times:        return True    return False複製代碼/<code> 

雖然滑動窗口法避免了時間界限的問題,但是依然無法很好解決細時間粒度上面請求過於集中的問題,就例如限制了1分鐘請求不能超過60次,請求都集中在59s時發送過來,這樣滑動窗口的效果就大打折扣。

為了使流量更加平滑,我們可以使用更加高級的令牌桶算法和漏桶算法。

令牌桶法

令牌桶算法的思路不復雜,它先以固定的速率生成令牌,把令牌放到固定容量的桶裡,超過桶容量的令牌則丟棄,每來一個請求則獲取一次令牌,規定只有獲得令牌的請求才能放行,沒有獲得令牌的請求則丟棄。

限流?Python+Redis就能搞定

偽代碼實現

<code># 令牌桶法,具體步驟:# 請求來了就計算生成的令牌數,生成的速率有限制# 如果生成的令牌太多,則丟棄令牌# 有令牌的請求才能通過,否則拒絕def can_pass_token_bucket(user, action, time_zone=60, times=30):    """    :param user: 用戶唯一標識    :param action: 用戶訪問的接口標識(即用戶在客戶端進行的動作)    :param time_zone: 接口限制的時間段    :param time_zone: 限制的時間段內允許多少請求通過    """    # 請求來了就倒水,倒水速率有限制    key = '{}:{}'.format(user, action)    rate = times / time_zone # 令牌生成速度    capacity = times # 桶容量    tokens = redis_conn.hget(key, 'tokens') # 看桶中有多少令牌    last_time = redis_conn.hget(key, 'last_time') # 上次令牌生成時間    now = time.time()    tokens = int(tokens) if tokens else capacity    last_time = int(last_time) if last_time else now    delta_tokens = (now - last_time) * rate # 經過一段時間後生成的令牌    if delta_tokens > 1:        tokens = tokens + tokens # 增加令牌        if tokens > tokens:            tokens = capacity        last_time = time.time() # 記錄令牌生成時間        redis_conn.hset(key, 'last_time', last_time)    if tokens >= 1:        tokens -= 1 # 請求進來了,令牌就減少1        redis_conn.hset(key, 'tokens', tokens)        return True    return False複製代碼/<code>

令牌桶法限制的是請求的平均流入速率,優點是能應對一定程度上的突發請求,也能在一定程度上保持流量的來源特徵,實現難度不高,適用於大多數應用場景。

漏桶算法

漏桶算法的思路與令牌桶算法有點相反。大家可以將請求想象成是水流,水流可以任意速率流入漏桶中,同時漏桶以固定的速率將水流出。如果流入速度太大會導致水滿溢出,溢出的請求被丟棄。

限流?Python+Redis就能搞定

通過上圖可以看出漏桶法的特點是: 不限制請求流入的速率 ,但是 限制了請求流出的速率

。這樣突發流量可以被整形成一個穩定的流量,不會發生超頻。

關於漏桶算法的實現方式有一點值得注意,我在瀏覽相關內容時發現網上大多數對於漏桶算法的偽代碼實現,都只是實現了

根據維基百科,漏桶算法的實現理論有兩種,分別是 基於 meter 的基於 queue 的 ,他們實現的具體思路不同,我大概介紹一下。

基於meter的漏桶

基於 meter 的實現相對來說比較簡單,其實它就有一個計數器,然後有消息要發送的時候,就看計數器夠不夠,如果計數器沒有滿的話,那麼這個消息就可以被處理,如果計數器不足以發送消息的話,那麼這個消息將會被丟棄。

那麼這個計數器是怎麼來的呢,基於 meter 的形式的計數器就是發送的頻率,例如你設置得頻率是不超過 5條/s ,那麼計數器就是 5,在一秒內你每發送一條消息就減少一個,當你發第 6 條的時候計時器就不夠了,那麼這條消息就被丟棄了。

這種實現有點類似最開始介紹的固定窗口法,只不過時間粒度再小一些,偽代碼就不上了。

基於queue的漏桶

基於 queue 的實現起來比較複雜,但是原理卻比較簡單,它也存在一個計數器,這個計數器卻不表示速率限制,而是表示 queue 的大小,這裡就是當有消息要發送的時候看 queue 中是否還有位置,如果有,那麼就將消息放進 queue 中,這個 queue 以 FIFO 的形式提供服務;如果 queue 沒有位置了,消息將被拋棄。

在消息被放進 queue 之後,還需要維護一個定時器,這個定時器的週期就是我們設置的頻率週期,例如我們設置得頻率是 5條/s,那麼定時器的週期就是 200ms,定時器每 200ms 去 queue 裡獲取一次消息,如果有消息,那麼就發送出去,如果沒有就輪空。

這種實現方式比較複雜,限於篇幅這裡就沒有實現了,但是貼心的我還是為大家找來了參考的栗子:chestnut:。

熟悉python的朋友可以參考aiolimiter的實現 :point_right|type_1_2: python傳送門

熟悉go的朋友可以參考uber的ratelimit的實現 :point_right|type_1_2: go傳送門

注意,網上很多關於漏桶法的偽代碼實現只實現了水流入桶的部分,沒有實現關鍵的水從桶中漏出的部分。如果只實現了前半部分,其實跟令牌桶沒有大的區別噢:hushed:

如果覺得上面的都太難,不好實現,那麼我牆裂建議你嘗試一下redis-cell這個模塊!

redis-cell

Redis 4.0 提供了一個限流 Redis 模塊,它叫 redis-cell。該模塊也使用了漏斗算法,並提供了原子的限流指令。有了這個模塊,限流問題就非常簡單了。

這個模塊需要單獨安裝,安裝教程網上很多,它只有一個指令: CL.THROTTLE

<code>CL.THROTTLE user123 15 30 60 1                ▲    ▲  ▲  ▲ ▲                |    |  |  | └───── apply 1 operation (default if omitted) 每次請求消耗的水滴                |    |  └──┴─────── 30 operations / 60 seconds 漏水的速率                |    └───────────── 15 max_burst 漏桶的容量                └─────────────────── key “user123” 用戶行為複製代碼/<code>

執行以上命令之後,redis會返回如下信息:

<code>> cl.throttle laoqian:reply 15 30 601) (integer) 0   # 0 表示允許,1表示拒絕2) (integer) 16  # 漏桶容量3) (integer) 15  # 漏桶剩餘空間left_quota4) (integer) -1  # 如果拒絕了,需要多長時間後再試(漏桶有空間了,單位秒)5) (integer) 2   # 多長時間後,漏桶完全空出來(單位秒)複製代碼/<code>

有了上面的redis模塊,就可以輕鬆對付大多數的限流場景了,簡直太方便了有木有!

後記&引用

限流算法就大概介紹到這裡了,這些算法圖雖然是參照了好幾篇文章所畫,但也是花了精力在上面的,希望能對大家產生幫助吧。然後我還將上面的代碼整理到了github,需要的朋友請戳 :point_right|type_1_2: https://github.com/luengwaiban/rate_limit_collect/blob/master/rate_limit_collect.py

限流?Python+Redis就能搞定


分享到:


相關文章: