03.07 談談架構中的限流與計數器的實現方式

往期推薦

談談架構中的限流與計數器的實現方式

先談談Nginx分流

要考慮限流先得假設訪問量達到了一定的程度。在高併發的前提下,請求過多很有可能導致某天服務器承受不了導致死機。

談談架構中的限流與計數器的實現方式

<code>#user  nobody;
worker_processes 1;
events {
worker_connections 1024;
}
http {
\tupstream enjoy{
\t\tserver 127.0.0.1:8080;
\t\tserver 127.0.0.1:8081;
\t}
\tserver {
\t\t\tlisten 80;
\t\t\tlocation / {
\t\t\tproxy_pass http://enjoy;
\t}
\t}
}

/<code>

​​​

談談架構中的限流與計數器的實現方式

但現在我們假設一個極端的情況,這個時候由於業務有個秒殺要求,請求過大一下就讓這兩個服務器爆了,這個時候在我繼續增加服務器之前這整個秒殺業務幾乎都處在癱瘓狀態,而這突然的訪問量是你始料未及的,也就是說你根本就沒法事先準備好足夠的服務器來解決這種情況。

談談架構中的限流與計數器的實現方式

再說下限流

限流依然是再高併發的前提下,如果某個服務器承受不了數目過多的請求量的一種限制機制。與分流不同的是分流是讓請求分發到其他服務器,而限流是達到某個閾值後直接不讓你訪問了。如果想完成限流的功能其實是有一些解決方案的(算法),比如說,基於令牌桶的程序計數器以及漏桶算法。​​

談談架構中的限流與計數器的實現方式

解決方案

計數器的解決方式是最簡單最容易實現的一種解決方案,假設有一個接口,要求1分鐘的訪問量不能超過10次。

這樣當有任何請求過來,我可以讓計數器+1;如果這個計數器的值大於10,而且和第一次的請求相比,時間間隔在1分鐘以內,那麼就能說明該請求訪問過多。

如果這個請求與第一次請求的訪問時間之間的間隔超過了1分鐘,那麼該計數器的值就還是限流範圍之內,接下來就只要重置計數器就好。

<code>import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoyCountLimit {
\tprivate int limtCount = 60;// 限制最大訪問的容量
\tAtomicInteger atomicInteger = new AtomicInteger(0); // 每秒鐘 實際請求的數量
\tprivate long start = System.currentTimeMillis();// 獲取當前系統時間
\tprivate int interval = 60*1000;// 間隔時間60秒
\tpublic boolean acquire() {
\t\tlong newTime = System.currentTimeMillis();
\t\tif (newTime > (start + interval)) {
\t\t\t// 判斷是否是一個週期
\t\t\tstart = newTime;
\t\t\tatomicInteger.set(0); // 清理為0
\t\t\treturn true;
\t\t}
\t\tatomicInteger.incrementAndGet();// i++;
\t\treturn atomicInteger.get() <= limtCount;
\t}
\tstatic EnjoyCountLimit limitService = new EnjoyCountLimit();
\tpublic static void main(String[] args) {
\t\tExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

\t\tfor (int i = 1; i < 100; i++) {
\t \tfinal int tempI = i;
\t\t\tnewCachedThreadPool.execute(new Runnable() {
\t\t\t\tpublic void run() {
\t\t\t\t\tif (limitService.acquire()) {
\t\t\t\t\t\tSystem.out.println("你沒有被限流,可以正常訪問邏輯 i:" + tempI);
\t\t \t\t} else {
\t\t\t\t\t\tSystem.out.println("你已經被限流呢 i:" + tempI);
\t\t\t\t\t}
\t\t\t\t}
\t\t\t});
\t\t}
\t}
}
/<code>
談談架構中的限流與計數器的實現方式

還是以60運行訪問10次請求為例,在第一次0-58秒之內,沒有訪問請求,在59秒之內突然來了10次請求,這個時候會做什麼,由於已經到了1分鐘計數器會重置。

這個時候第二次的1秒內(1分0秒)又有了10請求,這個時候是不是就在2秒之內有20個請求被放行了呢?(59秒,1分0秒),如果某個服務器的訪問量只能是10次請求,那這種限流方式已經導致服務器掛了。

滑動窗口計數器

前面已經知道簡單的計數器的實現方式,也知道他會出現的一些問題,雖然這些問題舉得有些極端,但還是有更好的解決方案,這方案就是使用滑動窗口計數器。

滑動窗口計數器的原理是在請求還沒過來的時候,先判斷前面N個單位內的總訪問量是否超過閾值,並且在當前的時間單位的請求數上+1。

談談架構中的限流與計數器的實現方式

談談架構中的限流與計數器的實現方式

<code>import java.util.concurrent.atomic.AtomicInteger;
public class EnjoySlidingWindow {
private AtomicInteger[] timeSlices;
/* 隊列的總長度 */
private final int timeSliceSize;
/* 每個時間片的時長 */
private final long timeMillisPerSlice;
/* 窗口長度 */
private final int windowSize;
/* 當前所使用的時間片位置 */
private AtomicInteger cursor = new AtomicInteger(0);
public static enum Time {
MILLISECONDS(1),
SECONDS(1000),
MINUTES(SECONDS.getMillis() * 60),

HOURS(MINUTES.getMillis() * 60),
DAYS(HOURS.getMillis() * 24),
WEEKS(DAYS.getMillis() * 7);
private long millis;
Time(long millis) {
this.millis = millis;
}
public long getMillis() {
return millis;
}
}
public EnjoySlidingWindow(int windowSize, Time timeSlice) {
this.timeMillisPerSlice = timeSlice.millis;
this.windowSize = windowSize;
// 保證存儲在至少兩個window
this.timeSliceSize = windowSize * 2 + 1;
init();
}
/**
* 初始化
*/
private void init() {
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}
private int locationIndex() {
long time = System.currentTimeMillis();
return (int) ((time / timeMillisPerSlice) % timeSliceSize);
}
/**
*

對時間片計數+1,並返回窗口中所有的計數總和
*

該方法只要調用就一定會對某個時間片進行+1
* @return
*/
public int incrementAndSum() {
int index = locationIndex();
int sum = 0;
// cursor等於index,返回true
// cursor不等於index,返回false,並會將cursor設置為index


int oldCursor = cursor.getAndSet(index);
if (oldCursor == index) {
// 在當前時間片裡繼續+1
sum += timeSlices[index].incrementAndGet();
} else {
//輪到新的時間片,置0,可能有其它線程也置了該值,容許
timeSlices[index].set(0);
// 清零,訪問量不大時會有時間片跳躍的情況
clearBetween(oldCursor, index);
sum += timeSlices[index].incrementAndGet();
}
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
return sum;
}
/**
* 判斷是否允許進行訪問,未超過閾值的話才會對某個時間片+1
* @param threshold
* @return
*/
public boolean allow(int threshold) {
int index = locationIndex();
int sum = 0;
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
timeSlices[index].set(0);
clearBetween(oldCursor, index);
}
for (int i = 0; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
// 閾值判斷
if (sum < threshold) {
// 未超過閾值才+1
timeSlices[index].incrementAndGet();
return true;
}
return false;
}
/**
*

將fromIndex~toIndex之間的時間片計數都清零
*

極端情況下,當循環隊列已經走了超過1個timeSliceSize以上,這裡的清零並不能如期望的進行
* @param fromIndex 不包含
* @param toIndex 不包含
*/
private void clearBetween(int fromIndex, int toIndex) {
for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {
timeSlices[index].set(0);
}
}
public static void main(String[] args) {
EnjoySlidingWindow window = new EnjoySlidingWindow(5, Time.MILLISECONDS);
for (int i = 0; i < 10; i++) {
System.out.println(window.allow(7));
}
}
}

/<code>
談談架構中的限流與計數器的實現方式


分享到:


相關文章: