場景:
1.
在日常生活中,我們肯定收到過不少不少這樣的短信,“京東最新優惠卷…”,“天貓送您…”。這種類型的短信是屬於推廣性質的短信。這種短信一般群髮量會到千萬級別。然而,要完成這些短信發送,我們是需要調用服務商的接口來完成的。倘若一次發送的量在200萬條,而我們的服務商接口每秒能處理的短信發送量有限,只能達到200條每秒。那麼這個時候就會產生問題了,我們如何能控制好程序發送短信時的速度暱?於是限流器就得用上了。
2.
提供服務接口的人或多或少遇到這樣的問題,業務負載能力有限,為了防止過多請求湧入造成系統崩潰,如何進行流量控制?
流量控制策略有:分流,降級,限流等。這裡我們討論限流策略,他的作用是限制請求訪問頻率,換取系統高可用,是比較保守方便的策略。
3.常用的限流算法由:漏桶算法 和令牌桶算法。
一、漏桶算法
漏桶作為計量工具(The Leaky Bucket Algorithm as a Meter)時,可以用於流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
- 一個固定容量的漏桶,按照常量固定速率流出水滴;
- 如果桶是空的,則不需流出水滴;
- 可以以任意速率流入水滴到漏桶;
- 如果流入水滴超出了桶的容量,則流入的水滴溢出了(被丟棄),而漏桶容量是不變的。
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然後就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率.示意圖如下:
可見這裡有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。
因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源衝突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對於存在突發特性的流量來說缺乏效率.
二、令牌桶算法
令牌桶算法是一個存放固定容量令牌的桶,按照固定速率往桶裡添加令牌。令牌桶算法的描述如下:
- 假設限制2r/s,則按照500毫秒的固定速率往桶中添加令牌;
- 桶中最多存放b個令牌,當桶滿時,新添加的令牌被丟棄或拒絕;
- 當一個n個字節大小的數據包到達,將從桶中刪除n個令牌,接著數據包被髮送到網絡上;
- 如果桶中的令牌不足n個,則不會刪除令牌,且該數據包將被限流(要麼丟棄,要麼緩衝區等待)。
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶裡加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務
令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.
三、測試代碼
測試代碼1
package com.xx;import com.google.common.util.concurrent.RateLimiter;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.Executors;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/** * @author hanliwei * @create 2018-06-21 17:10 */public class Test2 { public static void main(String[] args) { //新建一個每秒限制3個的令牌桶 RateLimiter rateLimiter = RateLimiter.create(3.0); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100); for (int i = 0; i < 10; i++) { executor.execute(new Runnable() { @Override public void run() { //獲取令牌桶中一個令牌,最多等待10秒 if (rateLimiter.tryAcquire(1, 10, TimeUnit.SECONDS)) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); } } }); } executor.shutdown(); }}
結果:
2018-06-21 17:53:31
2018-06-21 17:53:31
2018-06-21 17:53:32
2018-06-21 17:53:32
2018-06-21 17:53:32
2018-06-21 17:53:33
2018-06-21 17:53:33
2018-06-21 17:53:33
2018-06-21 17:53:34
2018-06-21 17:53:34
測試代碼2
package com.xx;import com.google.common.util.concurrent.RateLimiter;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;/** * @author hanliwei * @create 2018-06-21 17:57 */public class Test3 { public static void main(String[] args) { //線程池 ExecutorService exec = Executors.newCachedThreadPool(); //速率是每秒只有5個許可 final RateLimiter rateLimiter = RateLimiter.create(3.0); for (int i = 0; i < 10; i++) { final int no = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 rateLimiter.acquire(); System.out.println("Accessing: " + no + ",time:" + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date())); } catch (Exception e) { e.printStackTrace(); } } }; //執行線程 exec.execute(runnable); } //退出線程池 exec.shutdown(); }}
結果:
Accessing: 0,time:18-06-21 17:58:41
Accessing: 1,time:18-06-21 17:58:41
Accessing: 4,time:18-06-21 17:58:41
Accessing: 8,time:18-06-21 17:58:42
Accessing: 5,time:18-06-21 17:58:42
Accessing: 9,time:18-06-21 17:58:42
Accessing: 3,time:18-06-21 17:58:43
Accessing: 7,time:18-06-21 17:58:43
Accessing: 2,time:18-06-21 17:58:43
Accessing: 6,time:18-06-21 17:58:44
測試代碼3
package com.xx;import com.google.common.collect.Maps;import com.google.common.util.concurrent.RateLimiter;import java.util.concurrent.ConcurrentMap;/** * 單機限速demo * * @author hanliwei * @create 2018-06-21 18:51 */public class Test6 { //key-value (service,Qps) ,接口服務的限制速率 private static final ConcurrentMapresourceMap = Maps.newConcurrentMap(); //userkey-service,limiter ,限制用戶對接口的訪問速率 private static final ConcurrentMap userResourceLimiterMap = Maps.newConcurrentMap(); static { //init ,初始化方法A的Qps為50 resourceMap.putIfAbsent("methodA",10.0); } public static void updateResourceQps(String resource,double qps) { resourceMap.put(resource,qps); } public static void removeResource(String resource) { resourceMap.remove(resource); } public static int enter(String resource, String userKey) { long t1 = System.currentTimeMillis(); Double qps = resourceMap.get(resource); //不限流 if (qps == null || qps.doubleValue() == 0.0) { return 0; } String keySer = resource + userKey; RateLimiter rateLimiter = userResourceLimiterMap.get(keySer); //if null , new limit if (rateLimiter == null) { rateLimiter = RateLimiter.create(qps); RateLimiter putByOtherThread = userResourceLimiterMap.putIfAbsent(keySer,rateLimiter); if (putByOtherThread != null) { rateLimiter = putByOtherThread; } rateLimiter.setRate(qps); } //非阻塞 if (!rateLimiter.tryAcquire()) { //限速中,提示用戶 System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; " + resource + " visited too frequently by key:" + userKey); return 99; } else { //正常訪問 System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; " ); return 0; } } public static void main(String[] args) throws InterruptedException {// testA(); Test6.updateResourceQps("methodB",5.0); testB(); } private static void testA() throws InterruptedException { int i = 0; while (true) { i++; long t2 = System.currentTimeMillis(); System.out.println(" begin:" + t2 + " , hanchao:" + i); int result = Test6.enter("methodA","hanchao"); if (result == 99) { i = 0; Thread.sleep(1000); } } } private static void testB() throws InterruptedException { //測試other int y = 0; while (true) { y++; long t3 = System.currentTimeMillis(); System.out.println(" begin:" + t3 + " , tom:" + y); int result2 = Test6.enter("methodB","tom"); if (result2 == 99) { y = 0; Thread.sleep(1000); } } }}
測試結果:
四、方法摘要
修飾符和類型方法和描述doubleacquire()
從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求doubleacquire(int permits)
從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求static RateLimitercreate(double permitsPerSecond)
根據指定的穩定吞吐率創建RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)static RateLimitercreate(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根據指定的穩定吞吐率和預熱期來創建RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)doublegetRate()
返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數voidsetRate(double permitsPerSecond)
更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。StringtoString()
返回對象的字符表現形式booleantryAcquire()
從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話booleantryAcquire(int permits)
從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話booleantryAcquire(int permits, long timeout, TimeUnit unit)
從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那麼立即返回false (無需等待)booleantryAcquire(long timeout, TimeUnit unit)
從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那麼立即返回false(無需等待)五、Semaphore
public static void main(String[] args) { //線程池 ExecutorService exec = Executors.newCachedThreadPool(); //只能5個線程同時訪問 final Semaphore semp = new Semaphore(3); for (int i = 0; i < 10; i++) { final int no = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 semp.acquire(); System.out.println("Accessing: " + no + " --- " + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date())); //睡5s Thread.sleep(5000); //訪問完後,釋放許可,如果註釋掉下面的語句,則控制檯只能打印3條記錄,之後線程一直阻塞 semp.release(); } catch (Exception e) { e.printStackTrace(); } } }; //執行線程 exec.execute(runnable); } //退出線程池 exec.shutdown(); }
結果:
Accessing: 3 --- 18-06-21 18:21:07
Accessing: 1 --- 18-06-21 18:21:07
Accessing: 2 --- 18-06-21 18:21:07
Accessing: 0 --- 18-06-21 18:21:12
Accessing: 4 --- 18-06-21 18:21:12
Accessing: 9 --- 18-06-21 18:21:12
Accessing: 8 --- 18-06-21 18:21:17
Accessing: 5 --- 18-06-21 18:21:17
Accessing: 6 --- 18-06-21 18:21:17
Accessing: 7 --- 18-06-21 18:21:22
六、Semaphore和RateLimiter的區別
Semaphore:信號量,直譯很難理解。作用是限定只有搶到信號的線程才能執行,其他的都得等待!你可以設置N個信號,這樣最多可以有N個線程同時執行。注意,其他的線程也在,只是掛起了。
RateLimiter:這是guava的,直譯是速率限制器。其作用是 限制一秒內只能有N個線程執行,超過了就只能等待下一秒。注意,N是double類型。
========================================
Semaphore:從線程個數限流
RateLimiter:從速率限流 目前常見的算法是漏桶算法和令牌算法
令牌桶算法:相比漏桶算法而言區別在於,令牌桶是會去勻速的生成令牌,拿到令牌才能夠進行處理,類似於勻速往桶裡放令牌
漏桶算法是:生產者消費者模型,生產者往木桶裡生產數據,消費者按照定義的速度去消費數據
應用場景:
漏桶算法:必須讀寫分流的情況下,限制讀取的速度
令牌桶算法:必須讀寫分離的情況下,限制寫的速率或者小米手機飢餓營銷的場景 只賣1分種搶購1000
實現的方法都是一樣。RateLimiter來實現
閱讀更多 IT技術之家 的文章