06.27 使用Guava的RateLimiter做限流

場景:

1.

在日常生活中,我們肯定收到過不少不少這樣的短信,“京東最新優惠卷…”,“天貓送您…”。這種類型的短信是屬於推廣性質的短信。這種短信一般群髮量會到千萬級別。然而,要完成這些短信發送,我們是需要調用服務商的接口來完成的。倘若一次發送的量在200萬條,而我們的服務商接口每秒能處理的短信發送量有限,只能達到200條每秒。那麼這個時候就會產生問題了,我們如何能控制好程序發送短信時的速度暱?於是限流器就得用上了。

2.

提供服務接口的人或多或少遇到這樣的問題,業務負載能力有限,為了防止過多請求湧入造成系統崩潰,如何進行流量控制?

流量控制策略有:分流,降級,限流等。這裡我們討論限流策略,他的作用是限制請求訪問頻率,換取系統高可用,是比較保守方便的策略。

3.常用的限流算法由:漏桶算法

令牌桶算法

一、漏桶算法

漏桶作為計量工具(The Leaky Bucket Algorithm as a Meter)時,可以用於流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:

  • 一個固定容量的漏桶,按照常量固定速率流出水滴;
  • 如果桶是空的,則不需流出水滴;
  • 可以以任意速率流入水滴到漏桶;
  • 如果流入水滴超出了桶的容量,則流入的水滴溢出了(被丟棄),而漏桶容量是不變的。

漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶裡,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然後就拒絕請求,可以看出漏桶算法能強行限制數據的傳輸速率.示意圖如下:

使用Guava的RateLimiter做限流

可見這裡有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate)。

因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源衝突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對於存在突發特性的流量來說缺乏效率.

二、令牌桶算法

令牌桶算法是一個存放固定容量令牌的桶,按照固定速率往桶裡添加令牌。令牌桶算法的描述如下:

  • 假設限制2r/s,則按照500毫秒的固定速率往桶中添加令牌;
  • 桶中最多存放b個令牌,當桶滿時,新添加的令牌被丟棄或拒絕;
  • 當一個n個字節大小的數據包到達,將從桶中刪除n個令牌,接著數據包被髮送到網絡上;
  • 如果桶中的令牌不足n個,則不會刪除令牌,且該數據包將被限流(要麼丟棄,要麼緩衝區等待)。

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨著時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶裡加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務

使用Guava的RateLimiter做限流

令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.

三、測試代碼

測試代碼1

package com.xx;
import com.google.common.util.concurrent.RateLimiter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author hanliwei
* @create 2018-06-21 17:10
*/
public class Test2 {
public static void main(String[] args) {
//新建一個每秒限制3個的令牌桶
RateLimiter rateLimiter = RateLimiter.create(3.0);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(100);
for (int i = 0; i < 10; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
//獲取令牌桶中一個令牌,最多等待10秒
if (rateLimiter.tryAcquire(1, 10, TimeUnit.SECONDS)) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
}
}
});
}
executor.shutdown();
}
}

結果:

2018-06-21 17:53:31

2018-06-21 17:53:31

2018-06-21 17:53:32

2018-06-21 17:53:32

2018-06-21 17:53:32

2018-06-21 17:53:33

2018-06-21 17:53:33

2018-06-21 17:53:33

2018-06-21 17:53:34

2018-06-21 17:53:34

測試代碼2

package com.xx;
import com.google.common.util.concurrent.RateLimiter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author hanliwei
* @create 2018-06-21 17:57
*/
public class Test3 {
public static void main(String[] args) {
//線程池
ExecutorService exec = Executors.newCachedThreadPool();
//速率是每秒只有5個許可
final RateLimiter rateLimiter = RateLimiter.create(3.0);
for (int i = 0; i < 10; i++) {
final int no = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {

//獲取許可
rateLimiter.acquire();
System.out.println("Accessing: " + no + ",time:"
+ new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date()));
} catch (Exception e) {
e.printStackTrace();
}
}
};
//執行線程
exec.execute(runnable);
}
//退出線程池
exec.shutdown();
}
}

結果:

Accessing: 0,time:18-06-21 17:58:41

Accessing: 1,time:18-06-21 17:58:41

Accessing: 4,time:18-06-21 17:58:41

Accessing: 8,time:18-06-21 17:58:42

Accessing: 5,time:18-06-21 17:58:42

Accessing: 9,time:18-06-21 17:58:42

Accessing: 3,time:18-06-21 17:58:43

Accessing: 7,time:18-06-21 17:58:43

Accessing: 2,time:18-06-21 17:58:43

Accessing: 6,time:18-06-21 17:58:44

測試代碼3

package com.xx;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ConcurrentMap;
/**
* 單機限速demo

*
* @author hanliwei
* @create 2018-06-21 18:51
*/
public class Test6 {
//key-value (service,Qps) ,接口服務的限制速率
private static final ConcurrentMap<string> resourceMap = Maps.newConcurrentMap();
//userkey-service,limiter ,限制用戶對接口的訪問速率
private static final ConcurrentMap<string> userResourceLimiterMap = Maps.newConcurrentMap();
static {
//init ,初始化方法A的Qps為50
resourceMap.putIfAbsent("methodA",10.0);
}
public static void updateResourceQps(String resource,double qps) {
resourceMap.put(resource,qps);
}
public static void removeResource(String resource) {
resourceMap.remove(resource);
}
public static int enter(String resource, String userKey) {
long t1 = System.currentTimeMillis();
Double qps = resourceMap.get(resource);
//不限流
if (qps == null || qps.doubleValue() == 0.0) {
return 0;
}
String keySer = resource + userKey;
RateLimiter rateLimiter = userResourceLimiterMap.get(keySer);
//if null , new limit
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(qps);
RateLimiter putByOtherThread = userResourceLimiterMap.putIfAbsent(keySer,rateLimiter);
if (putByOtherThread != null) {
rateLimiter = putByOtherThread;
}
rateLimiter.setRate(qps);
}
//非阻塞
if (!rateLimiter.tryAcquire()) {
//限速中,提示用戶
System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; "
+ resource + " visited too frequently by key:" + userKey);
return 99;
} else {
//正常訪問
System.out.println("use :" + (System.currentTimeMillis() - t1) + "ms ; " );

return 0;
}
}
public static void main(String[] args) throws InterruptedException {
// testA();
Test6.updateResourceQps("methodB",5.0);
testB();
}
private static void testA() throws InterruptedException {
int i = 0;
while (true) {
i++;
long t2 = System.currentTimeMillis();
System.out.println(" begin:" + t2 + " , hanchao:" + i);
int result = Test6.enter("methodA","hanchao");
if (result == 99) {
i = 0;
Thread.sleep(1000);
}
}
}
private static void testB() throws InterruptedException {
//測試other
int y = 0;
while (true) {
y++;
long t3 = System.currentTimeMillis();
System.out.println(" begin:" + t3 + " , tom:" + y);
int result2 = Test6.enter("methodB","tom");
if (result2 == 99) {
y = 0;
Thread.sleep(1000);
}
}
}
}
/<string>/<string>

測試結果:

使用Guava的RateLimiter做限流

使用Guava的RateLimiter做限流

四、方法摘要

修飾符和類型方法和描述doubleacquire()

從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求doubleacquire(int permits)

從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求static RateLimitercreate(double permitsPerSecond)

根據指定的穩定吞吐率創建RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)static RateLimitercreate(double permitsPerSecond, long warmupPeriod, TimeUnit unit)

根據指定的穩定吞吐率和預熱期來創建RateLimiter,這裡的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)doublegetRate()

返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數voidsetRate(double permitsPerSecond)

更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。StringtoString()

返回對象的字符表現形式booleantryAcquire()

從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話booleantryAcquire(int permits)

從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話booleantryAcquire(int permits, long timeout, TimeUnit unit)

從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那麼立即返回false (無需等待)booleantryAcquire(long timeout, TimeUnit unit)

從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那麼立即返回false(無需等待)五、Semaphore

 public static void main(String[] args) {
//線程池
ExecutorService exec = Executors.newCachedThreadPool();
//只能5個線程同時訪問
final Semaphore semp = new Semaphore(3);
for (int i = 0; i < 10; i++) {
final int no = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
//獲取許可
semp.acquire();
System.out.println("Accessing: " + no
+ " --- " + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date()));
//睡5s
Thread.sleep(5000);
//訪問完後,釋放許可,如果註釋掉下面的語句,則控制檯只能打印3條記錄,之後線程一直阻塞
semp.release();
} catch (Exception e) {
e.printStackTrace();
}
}
};
//執行線程
exec.execute(runnable);
}
//退出線程池
exec.shutdown();
}

結果:

Accessing: 3 --- 18-06-21 18:21:07

Accessing: 1 --- 18-06-21 18:21:07

Accessing: 2 --- 18-06-21 18:21:07

Accessing: 0 --- 18-06-21 18:21:12

Accessing: 4 --- 18-06-21 18:21:12

Accessing: 9 --- 18-06-21 18:21:12

Accessing: 8 --- 18-06-21 18:21:17

Accessing: 5 --- 18-06-21 18:21:17

Accessing: 6 --- 18-06-21 18:21:17

Accessing: 7 --- 18-06-21 18:21:22

六、Semaphore和RateLimiter的區別

Semaphore:信號量,直譯很難理解。作用是限定只有搶到信號的線程才能執行,其他的都得等待!你可以設置N個信號,這樣最多可以有N個線程同時執行。注意,其他的線程也在,只是掛起了。

RateLimiter:這是guava的,直譯是速率限制器。其作用是 限制一秒內只能有N個線程執行,超過了就只能等待下一秒。注意,N是double類型。

========================================

Semaphore:從線程個數限流

RateLimiter:從速率限流 目前常見的算法是漏桶算法和令牌算法

令牌桶算法:相比漏桶算法而言區別在於,令牌桶是會去勻速的生成令牌,拿到令牌才能夠進行處理,類似於勻速往桶裡放令牌

漏桶算法是:生產者消費者模型,生產者往木桶裡生產數據,消費者按照定義的速度去消費數據

應用場景:

漏桶算法:必須讀寫分流的情況下,限制讀取的速度

令牌桶算法:必須讀寫分離的情況下,限制寫的速率或者小米手機飢餓營銷的場景 只賣1分種搶購1000

實現的方法都是一樣。RateLimiter來實現



分享到:


相關文章: