使用Guava的RateLimiter做限流

場景:

1.

在日常生活中,我們肯定收到過不少不少這樣的短信,“京東最新優惠卷…”,“天貓送您…”。這種類型的短信是屬於推廣性質的短信。這種短信一般群髮量會到千萬級別。然而,要完成這些短信發送,我們是需要調用服務商的接口來完成的。倘若一次發送的量在200萬條,而我們的服務商接口每秒能處理的短信發送量有限,只能達到200條每秒。那麼這個時候就會產生問題了,我們如何能控制好程序發送短信時的速度暱?於是限流器就得用上了。

2.

提供服務接口的人或多或少遇到這樣的問題,業務負載能力有限,為了防止過多請求湧入造成系統崩潰,如何進行流量控制?

流量控制策略有:分流,降級,限流等。這裡我們討論限流策略,他的作用是限制請求訪問頻率,換取系統高可用,是比較保守方便的策略。

3.常用的限流算法由:漏桶算法

令牌桶算法

一、漏桶算法

漏桶作為計量工具(The Leaky Bucket Algorithm as a Meter)時,可以用於流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:

  • 一個固定容量的漏桶,按照常量固定速率流出水滴;
  • 如果桶是空的,則不需流出水滴;
  • 可以以任意速率流入水滴到漏桶;
  • 如果流入水滴超出了桶的容量,則流入的水滴溢出了(被丟棄),而漏桶容量是不變的。

漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然後就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率.示意圖如下:

使用Guava的RateLimiter做限流

可見這裡有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。

因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源衝突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對於存在突發特性的流量來說缺乏效率.

二、令牌桶算法

令牌桶算法是一個存放固定容量令牌的桶,按照固定速率往桶裡添加令牌。令牌桶算法的描述如下:

  • 假設限制2r/s,則按照500毫秒的固定速率往桶中添加令牌;
  • 桶中最多存放b個令牌,當桶滿時,新添加的令牌被丟棄或拒絕;
  • 當一個n個字節大小的數據包到達,將從桶中刪除n個令牌,接著數據包被髮送到網絡上;
  • 如果桶中的令牌不足n個,則不會刪除令牌,且該數據包將被限流(要麼丟棄,要麼緩衝區等待)。

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶裡加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務

使用Guava的RateLimiter做限流

令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.

三、測試代碼

測試代碼1

package com.xx;import com.google.common.util.concurrent.RateLimiter;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @author hanliwei * @create 2018-06-21 17:10 */public class Test2 { public static void main(String[] args) { //新建一個每秒限制3個的令牌桶 RateLimiter rateLimiter = RateLimiter.create(3.0); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100); for (int i = 0; i < 10; i++) { executor.execute(new Runnable() { @Override public void run() { //獲取令牌桶中一個令牌,最多等待10秒 if (rateLimiter.tryAcquire(1, 10, TimeUnit.SECONDS)) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } } }); } executor.shutdown(); }}

結果:

2018-06-21 17:53:31

2018-06-21 17:53:31

2018-06-21 17:53:32

2018-06-21 17:53:32

2018-06-21 17:53:32

2018-06-21 17:53:33

2018-06-21 17:53:33

2018-06-21 17:53:33

2018-06-21 17:53:34

2018-06-21 17:53:34

測試代碼2

package com.xx;import com.google.common.util.concurrent.RateLimiter;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @author hanliwei * @create 2018-06-21 17:57 */public class Test3 { public static void main(String[] args) { //線程池 ExecutorService exec = Executors.newCachedThreadPool(); //速率是每秒只有5個許可 final RateLimiter rateLimiter = RateLimiter.create(3.0); for (int i = 0; i < 10; i++) { final int no = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 rateLimiter.acquire(); System.out.println("Accessing: " + no + ",time:" + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date())); } catch (Exception e) { e.printStackTrace(); } } }; //執行線程 exec.execute(runnable); } //退出線程池 exec.shutdown(); }} 

結果:

Accessing: 0,time:18-06-21 17:58:41

Accessing: 1,time:18-06-21 17:58:41

Accessing: 4,time:18-06-21 17:58:41

Accessing: 8,time:18-06-21 17:58:42

Accessing: 5,time:18-06-21 17:58:42

Accessing: 9,time:18-06-21 17:58:42

Accessing: 3,time:18-06-21 17:58:43

Accessing: 7,time:18-06-21 17:58:43

Accessing: 2,time:18-06-21 17:58:43

Accessing: 6,time:18-06-21 17:58:44

測試代碼3

package com.xx;import com.google.common.collect.Maps;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.ConcurrentMap;/** * 單機限速demo * * @author hanliwei * @create 2018-06-21 18:51 */public class Test6 { //key-value (service,Qps) ,接口服務的限制速率 private static final ConcurrentMap resourceMap = Maps.newConcurrentMap(); //userkey-service,limiter ,限制用戶對接口的訪問速率 private static final ConcurrentMap userResourceLimiterMap = Maps.newConcurrentMap(); static { //init ,初始化方法A的Qps為50 resourceMap.putIfAbsent("methodA",10.0); } public static void updateResourceQps(String resource,double qps) { resourceMap.put(resource,qps); } public static void removeResource(String resource) { resourceMap.remove(resource); } public static int enter(String resource, String userKey) { long t1 = System.currentTimeMillis(); Double qps = resourceMap.get(resource); //不限流 if (qps == null || qps.doubleValue() == 0.0) { return 0; } String keySer = resource + userKey; RateLimiter rateLimiter = userResourceLimiterMap.get(keySer); //if null , new limit if (rateLimiter == null) { rateLimiter = RateLimiter.create(qps); RateLimiter putByOtherThread = userResourceLimiterMap.putIfAbsent(keySer,rateLimiter); if (putByOtherThread != null) { rateLimiter = putByOtherThread; } rateLimiter.setRate(qps); } //非阻塞 if (!rateLimiter.tryAcquire()) { //限速中,提示用戶 System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; " + resource + " visited too frequently by key:" + userKey); return 99; } else { //正常訪問 System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; " ); return 0; } } public static void main(String[] args) throws InterruptedException {// testA(); Test6.updateResourceQps("methodB",5.0); testB(); } private static void testA() throws InterruptedException { int i = 0; while (true) { i++; long t2 = System.currentTimeMillis(); System.out.println(" begin:" + t2 + " , hanchao:" + i); int result = Test6.enter("methodA","hanchao"); if (result == 99) { i = 0; Thread.sleep(1000); } } } private static void testB() throws InterruptedException { //測試other int y = 0; while (true) { y++; long t3 = System.currentTimeMillis(); System.out.println(" begin:" + t3 + " , tom:" + y); int result2 = Test6.enter("methodB","tom"); if (result2 == 99) { y = 0; Thread.sleep(1000); } } }}

測試結果:

使用Guava的RateLimiter做限流

使用Guava的RateLimiter做限流

四、方法摘要

修飾符和類型方法和描述doubleacquire()

從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求doubleacquire(int permits)

從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求static RateLimitercreate(double permitsPerSecond)

根據指定的穩定吞吐率創建RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)static RateLimitercreate(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

根據指定的穩定吞吐率和預熱期來創建RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)doublegetRate()

返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數voidsetRate(double permitsPerSecond)

更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。StringtoString()

返回對象的字符表現形式booleantryAcquire()

從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話booleantryAcquire(int permits)

從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話booleantryAcquire(int permits, long timeout, TimeUnit unit)

從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那麼立即返回false (無需等待)booleantryAcquire(long timeout, TimeUnit unit)

從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那麼立即返回false(無需等待)五、Semaphore

 public static void main(String[] args) { //線程池 ExecutorService exec = Executors.newCachedThreadPool(); //只能5個線程同時訪問 final Semaphore semp = new Semaphore(3); for (int i = 0; i < 10; i++) { final int no = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 semp.acquire(); System.out.println("Accessing: " + no + " --- " + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date())); //睡5s Thread.sleep(5000); //訪問完後,釋放許可,如果註釋掉下面的語句,則控制檯只能打印3條記錄,之後線程一直阻塞 semp.release(); } catch (Exception e) { e.printStackTrace(); } } }; //執行線程 exec.execute(runnable); } //退出線程池 exec.shutdown(); }

結果:

Accessing: 3 --- 18-06-21 18:21:07

Accessing: 1 --- 18-06-21 18:21:07

Accessing: 2 --- 18-06-21 18:21:07

Accessing: 0 --- 18-06-21 18:21:12

Accessing: 4 --- 18-06-21 18:21:12

Accessing: 9 --- 18-06-21 18:21:12

Accessing: 8 --- 18-06-21 18:21:17

Accessing: 5 --- 18-06-21 18:21:17

Accessing: 6 --- 18-06-21 18:21:17

Accessing: 7 --- 18-06-21 18:21:22

六、Semaphore和RateLimiter的區別

Semaphore:信號量,直譯很難理解。作用是限定只有搶到信號的線程才能執行,其他的都得等待!你可以設置N個信號,這樣最多可以有N個線程同時執行。注意,其他的線程也在,只是掛起了。

RateLimiter:這是guava的,直譯是速率限制器。其作用是 限制一秒內只能有N個線程執行,超過了就只能等待下一秒。注意,N是double類型。

========================================

Semaphore:從線程個數限流

RateLimiter:從速率限流 目前常見的算法是漏桶算法和令牌算法

令牌桶算法:相比漏桶算法而言區別在於,令牌桶是會去勻速的生成令牌,拿到令牌才能夠進行處理,類似於勻速往桶裡放令牌

漏桶算法是:生產者消費者模型,生產者往木桶裡生產數據,消費者按照定義的速度去消費數據

應用場景:

漏桶算法:必須讀寫分流的情況下,限制讀取的速度

令牌桶算法:必須讀寫分離的情況下,限制寫的速率或者小米手機飢餓營銷的場景 只賣1分種搶購1000

實現的方法都是一樣。RateLimiter來實現



分享到:


相關文章: