P8大牛帶你細談架構中的限流與計數器的實現方式

1. 先談談Nginx分流

對,要考慮限流先得假設訪問量達到了一定的程度,在高併發的前提下,請求過多很有可能導致某天服務器承受不了導致死機。

P8大牛帶你細談架構中的限流與計數器的實現方式

在這個前提下,我相信你最先想到的一定是nginx,使用nginx分流讓所有的請求不要直接到達某一個服務器,當併發量繼續上升的時候,我提供更多的服務器似乎就能解決問題了,這想法看上去很對,而且很多行業就是這麼做的,比如下面的nginx配置文件,很簡單,我相信你一看就能懂

<code>#user  nobody;

worker_processes 1;

events {
worker_connections 1024;
}

http {

upstream enjoy{
server 127.0.0.1:8080;
server 127.0.0.1:8081;

}
server {
listen 80;
location / {
proxy_pass http://enjoy;
}
}

}

/<code>

在上面案例中我配置了個代理,當高併發的請求過來的時候,會把請求分發給8080與8081兩個端口的服務器。

但現在我們假設一個極端的情況,這個時候由於業務有個秒殺要求,請求過大一下就讓這兩個服務器爆了,這個時候在我繼續增加服務器之前這整個秒殺業務幾乎都處在癱瘓狀態,而這突然的訪問量是你始料未及的,也就是說你根本就沒法事先準備好足夠的服務器來解決這種情況。

這個時候我相信你已經看出瞭如果僅僅做分流依然會出現很嚴重的問題,怎麼辦呢?這個時候你就需要限流了。

2. 再說下限流

限流依然是在高併發的前提下,如果某個服務器承受不了數目過多的請求量的一種限制機制,不同的是分流是讓請求分發的其他服務器,而限流是達到某個閾值後直接不讓你訪問了

如果想完成限流的功能其實是有一些解決方案的(算法),比如來說,基於令牌桶的,程序計數器以及漏桶算法;

今天挑一個最簡單的計數器方式來講講限流

3. 解決方案

計數器的解決方式是最簡單最容易實現的一種解決方案,假設有一個接口,要求1分鐘的訪問量不能超過10次

這樣當有任何請求過來,我可以讓計數器+1;如果這個計數器的值大於10,而且和第一次的請求相比,時間間隔在1分鐘以內,那麼就能說明該請求訪問過多。

如果這個請求與第一次請求的訪問時間之間的間隔超過了1分鐘,那麼該計數器的值久還是限流範圍之內,接下來久只要重置計數器就好;

<code>import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.atomic.AtomicInteger;



public class EnjoyCountLimit {



private int limtCount = 60;// 限制最大訪問的容量

AtomicInteger atomicInteger = new AtomicInteger(0); // 每秒鐘 實際請求的數量

private long start = System.currentTimeMillis();// 獲取當前系統時間

private int interval = 60*1000;// 間隔時間60秒



public boolean acquire() {

long newTime = System.currentTimeMillis();

if (newTime > (start + interval)) {

// 判斷是否是一個週期

start = newTime;

atomicInteger.set(0); // 清理為0

return true;

}


atomicInteger.incrementAndGet();// i++;

return atomicInteger.get() <= limtCount;

}



static EnjoyCountLimit limitService = new EnjoyCountLimit();



public static void main(String[] args) {



ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

for (int i = 1; i < 100; i++) {

final int tempI = i;

newCachedThreadPool.execute(new Runnable() {



public void run() {

if (limitService.acquire()) {

System.out.println("你沒有被限流,可以正常訪問邏輯 i:" + tempI);

} else {

System.out.println("你已經被限流呢 i:" + tempI);

}

}

});

}

}



}

/<code>

這個計數器的限流方式很簡單吧,但這樣問題嗎?好好想想……

還是以60運行訪問10次請求為例,在第一次0-58秒之內,沒有訪問請求,在59秒之內突然來了10次請求,這個時候會做什麼,由於已經到了1分鐘計數器會重置。

這個時候第二次的1秒內(1分0秒)又了10請求,這個時候是不是就在2秒之內有20個請求被放行了呢?(59秒,1分0秒),如果某個服務器的訪問量只能是10次請求,那這種限流方式已經導致服務器掛了;

4. 滑動窗口計數器

前面已經知道簡單的計數器的實現方式,也知道他會出現的一些問題,雖然這些問題舉得有些極端,但還是有更好得解決方案,這方案就是使用滑動窗口計數器

滑動窗口計數器得原理是在沒錯請求過來得時候,先判斷前面N個單位內得總訪問量是否操過得閾值,並且在當前得時間單位得請求數上+1

舉例來說,要求1分鐘的訪問量不能超過10次

可以把1分鐘看成是6個10秒鐘的時間,0-9秒的訪問數記錄到第一個格子,10-19秒的訪問數記錄數記錄到第二個格子以此內推,每次統計將6個格子裡面的數據求和,如果超過了10次就不允許訪問。

<code>import java.util.concurrent.atomic.AtomicInteger;



public class EnjoySlidingWindow {



private AtomicInteger[] timeSlices;

/* 隊列的總長度 */

private final int timeSliceSize;

/* 每個時間片的時長 */

private final long timeMillisPerSlice;

/* 窗口長度 */

private final int windowSize;



/* 當前所使用的時間片位置 */

private AtomicInteger cursor = new AtomicInteger(0);



public static enum Time {

MILLISECONDS(1),

SECONDS(1000),

MINUTES(SECONDS.getMillis() * 60),


HOURS(MINUTES.getMillis() * 60),

DAYS(HOURS.getMillis() * 24),

WEEKS(DAYS.getMillis() * 7);



private long millis;



Time(long millis) {

this.millis = millis;

}



public long getMillis() {

return millis;

}

}



public EnjoySlidingWindow(int windowSize, Time timeSlice) {

this.timeMillisPerSlice = timeSlice.millis;

this.windowSize = windowSize;

// 保證存儲在至少兩個window

this.timeSliceSize = windowSize * 2 + 1;



init();

}



/**


* 初始化

*/

private void init() {

AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];

for (int i = 0; i < timeSliceSize; i++) {

localTimeSlices[i] = new AtomicInteger(0);

}

timeSlices = localTimeSlices;

}



private int locationIndex() {

long time = System.currentTimeMillis();

return (int) ((time / timeMillisPerSlice) % timeSliceSize);

}



/**

*

對時間片計數+1,並返回窗口中所有的計數總和

*

該方法只要調用就一定會對某個時間片進行+1

* @return

*/

public int incrementAndSum() {


int index = locationIndex();

int sum = 0;

// cursor等於index,返回true

// cursor不等於index,返回false,並會將cursor設置為index

int oldCursor = cursor.getAndSet(index);

if (oldCursor == index) {

// 在當前時間片裡繼續+1

sum += timeSlices[index].incrementAndGet();

} else {

//輪到新的時間片,置0,可能有其它線程也置了該值,容許

timeSlices[index].set(0);

// 清零,訪問量不大時會有時間片跳躍的情況

clearBetween(oldCursor, index);



sum += timeSlices[index].incrementAndGet();

}



for (int i = 1; i < windowSize; i++) {

sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();

}

return sum;

}




/**

* 判斷是否允許進行訪問,未超過閾值的話才會對某個時間片+1

* @param threshold

* @return

*/

public boolean allow(int threshold) {

int index = locationIndex();

int sum = 0;

int oldCursor = cursor.getAndSet(index);

if (oldCursor != index) {

timeSlices[index].set(0);

clearBetween(oldCursor, index);

}

for (int i = 0; i < windowSize; i++) {

sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();

}



// 閾值判斷

if (sum < threshold) {

// 未超過閾值才+1

timeSlices[index].incrementAndGet();

return true;

}

return false;

}



/**

*

將fromIndex~toIndex之間的時間片計數都清零

*

極端情況下,當循環隊列已經走了超過1個timeSliceSize以上,這裡的清零並不能如期望的進行

* @param fromIndex 不包含

* @param toIndex 不包含

*/

private void clearBetween(int fromIndex, int toIndex) {

for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {

timeSlices[index].set(0);

}

}



public static void main(String[] args) {

EnjoySlidingWindow window = new EnjoySlidingWindow(5, Time.MILLISECONDS);

for (int i = 0; i < 10; i++) {

System.out.println(window.allow(7));

}



}

}

/<code>


最後,歡迎大家在下方評論區留言發表你的看法!!!

如果覺得本文不錯,別忘了給小編點個關注哦!



分享到:


相關文章: