02.02 express實現一個RateLimit截流器

RateLimit截流器,其實是一個比較有用的東西。他可以防止某些API被盜刷。舉個例子,我們平常都會需要“發送手機短信驗證碼”的API,這都是需要按次數收費的,而且相對來說價格也挺昂貴。如果不加以限制,那分分鐘被刷爆。有可能會被某些不法分子用於短信炸彈的非法用途,或者有些人只是單純的對網站進行攻擊,增加平臺的支出。當然這些都有很多種方法去防範,最簡單的就是加一個驗證碼了。

不過作為API底層,我們肯定需要做好萬全的準備,不能真的等出問題了才開始防範。我們可以一開始就限制某個API接口等最大請求量。

這有兩個好處。

第一個好處是,可以在前面的防守都失敗以後,保證最後一層“堡壘”不會被攻下,可以把損失降到最低。

第二個好處是,可以保證服務器穩定運行,每個服務器都有一定的承受量,一旦流量過大(可能是CC攻擊,也可能是正常用戶),可能會導致服務器產生OutOfMemory等錯誤,這會讓程序或者系統奔潰。如果發生這種情況,將會影響大批量的用戶。

綜上所述,我們需要有一個截流器來負責流量的過濾。過濾以後可以通過一些控制,把流量轉移到另外一臺空閒的服務器上,當然直接拒絕請求也可以(如果你們老闆沒意見的話)。

用Redis和JavaScript/express實現一個RateLimit截流器


正文

RateLimit,這個是我前幾個月在應聘某家公司時候出的筆試題。今天無意中翻到,所以,拿出來與各位分享分享。

以下是公司的筆試題目:

用 Redis 和 Typescript / JavaScript 實現一個 RateLimit 限制器,可以指定事件、限制時間、限制次數,例如限制 1 分鐘內最多 3 次獲取短信驗證碼,或 10 分鐘內最多一次重置密碼。

可以實現為一個 express 的中間件。

基本思路

截流器的實現,網絡上有很多。以前有了解過一種基於“令牌桶”的截流算法。也確實效果不錯。但是具體情況具體分析,需要另外開一個程序去不斷的刷新令牌桶內的令牌數量,其實也是蠻耗費資源的。當然合理的實現技巧,不會太差。我主要是覺得“令牌桶”不是適合這個題目的情境。我個人是比較喜歡簡潔的東西,如果能更加簡潔方便的實現這種功能,並不一定要搞得那麼麻煩。至少在目前這個需求下也許並不是特別適合。

代碼裡的這個方法,是我腦子一下子就想到的,也許並不很完美,有我考慮不夠周到的地方。

主要思想是,為每一個事件建立一個Redis的list鍵。每一次請求發送過來,去取list的第前N個索引,這裡的N是限制的次數,如果限制2次,N就是2。每一個LIST的值保存的是請求到達的時間,第前N個索引,就是第前N次請求到達的時間。如果這個時間和當前的時間相減的expire小於限制時間,那什麼規定時間內,請求已經到達上限,對其阻止。

可能表述的還是不夠清晰,直觀。可以直接看redis.lua裡的代碼,畢竟‘talk is cheap,show me the code’

代碼結構

./eventBuilder.js 事件構建器,方便生成指定事件。代碼如下

<code>class eventBuilder{
constructor(){
this.event_name = ''; //事件名稱,便於分辨
this.event_id = ""; //事件ID,需要自定義,數字和英文組合
this.limit_count = 60; //限制流量截流數量
this.time_period = 60; //限制流量截流週期,單位為秒
}
/** 事件名稱 */

EventName(data){
this.event_name = data;
return this;
}
/** 事件ID,數字和英文組合 */
EventID(data){
this.event_id = data;
return this;
}
/** 限制流量截流數量 */
LimitCount(data){
this.limit_count = data;
return this;
}
/** 限制流量截流週期,單位為秒 */
TimePeriod(data){
this.time_period = data;
return this;
}

}
module.exports = new eventBuilder();
/<code>

./rateLimit.js 截流器源代碼。代碼如下

<code>const app = require('express');
const redis = require("redis");
const fs = require("fs")
const {promisify} = require('util');
var client = redis.createClient();
/**
* 截流器
*/
class rateLimit{

constructor(){
//初始化
this.hash = null;
this.luaCode = fs.readFileSync('redis.lua');
//因為該代碼是在程序開啟後初始化的時候才執行,所以採用同步讀取,不會影響性能。
this.Event = {};

}

/**
* 啟動前,必須調用init,加載lua腳本。否則無法使用
* 返回一個promise對象
*/
init(){
//構建promise
let client_script = promisify(client.script).bind(client);

return client_script('load', this.luaCode)
.then( (reply)=>{

this.hash = reply;

}).catch(err =>{

console.log('err:'+err);

});
}

/**
* 根據事件ID獲取對應的路由,成功返回路由,失敗返回null
* @param {String} event_id 事件ID
*/
getRouter(event_id){
if(this.Event[event_id] === undefined){
return null;
}
return this.Event[event_id].router;
}

/**
* 添加新的事件到截流器中,建議使用eventBuilder
* @param {Object} pros
pros{
event_name:"", //事件名稱,便於分辨
event_id:"", //事件ID,需要自定義,數字和英文組合
limit_count:1, //限制流量截流數量

time_period:60 //限制流量截流週期,單位為秒
}
* @return index 返回對應的index
*/
addNewEvent(pros){
if(pros['event_id'] === undefined){
throw 'pros ERROR, please fix that!';
}
let event_id = pros.event_id;
this.Event[event_id] = pros
this.Event[event_id]['router'] = app.Router().use('/',this.limitFuncFactory(event_id));
return true;
}

/**
* 截流器工廠函數,專門用於生產帶有截流功能的路由函數
* @param {String} event_id 事件ID
*/
limitFuncFactory(event_id){

let event = this.Event[event_id];

return (req,res,next)=>{
let handle = (err,reply)=>{

if(err != null){
//如果lua腳本執行出現錯誤,也進行截流
//一般不會執行到這裡
res.status(400).send("Limit");
console.log("Something wrong :" + err);
}

//lua腳本返回“OK”和“NO”,ok表示當前流量可以通過。
if(reply == "OK"){
next()
}else{
res.status(400).send("Limit");
}


}

//執行對應的lua腳本
client.evalsha(this.hash, 1, event.event_id, event.limit_count, event.time_period, handle)
}

}

}

module.exports = new rateLimit();
/<code>

./redis.lua 負責截流算法的實現,使用lua是為了能讓redis實現原子操作.。代碼如下

<code>local eventX = 'EVENT_'..KEYS[1] --事件ID--
local count = tonumber(ARGV[1]) --限制總數--
local nowTime = redis.call('TIME') --當前時間--
local t = nowTime[1] --當前時間戳--
local expire = tonumber(ARGV[2]) --過期時間,秒--

-- Pass函數,當條件允許時,處理後事 --
local function Pass()
redis.call('LPUSH',eventX,t)
end

-- 需要清理之前過期的token,保留當前個數 --
local function Trim()
redis.call('ltrim',eventX,0,count-1)
end

-- 總數為0,直接就禁止訪問 --
if(count == nil or count <= 0)
then
return 'NO'
end

-- 獲取當前事件list的個數,小於總數都允許直接通過 --
if(redis.call('LLEN',eventX) < count)
then
Pass()

return 'OK'
end

local index = count - 1

-- 獲取第count個值裡保存的時間 --
local begin_time = redis.call('LINDEX',eventX,index)

-- 時間為空,可能是list不夠大,直接通過 --
if(begin_time == nil)
then
Pass()
return 'OK'
end

-- 當前時間減去第count的時間,如果在過期時間內 --
-- 表示在給定的過期時間內,我們允許的最多count個訪問已經填滿了--
-- 所以不允許訪問 --
if(t - begin_time <= expire)
then
return 'NO'
else
-- 否則,直接pass --
Pass()
-- 順手清理一下之前的數據 --
Trim()
return 'OK'
end
/<code>

./index.js 利用截流器的一個小例子,開啟本地8088端口,並以中間件的形式插入到 /test 路由中

<code>const express = require('express')
const app = express()
const rateLimit = require("./rateLimit")
const eventBuilder = require('./eventBuilder')

rateLimit.init().then(()=>{

//生成一個20秒內,只允許訪問2次的事件
let event = eventBuilder.EventName("測試截流")
.EventID('testLimit')
.LimitCount(40)
.TimePeriod(5);

//添加到截流器中
rateLimit.addNewEvent(event);

//獲得事件的路由
let rout = rateLimit.getRouter('testLimit')
if(rout == null){
throw "路由器為空"
}

//以中間件的形勢插入到 /test 路由中
app.get('/test', [rout, (req,res)=>{
console.log('#');
res.send("

HELLO WORLD

");
}] );


}).catch(err=>{
console.error(err)
})

app.listen('8088', () => console.log(`服務器啟動成功!`))

/<code>

需要依賴的包:"express": "^4.16.4","redis": "^2.8.0"

測試

為了方便測試,在路由內輸出‘#’作為請求的響應(確實比較傻,這個方法。數的累死)

<code>app.get('/test', [rout, (req,res)=>{
console.log('#');
res.send("

HELLO WORLD

");
}] );
/<code>

使用wrk進行測試

電腦配置:MacBook air 2017版本

可用性測試1

wrk配置

<code>wrk -t 5 -c 20 -d 20 http://localhost:8088/test
/<code>

5個線程,同時維持20個請求,持續週期20秒

事件配置:

20秒內,20個響應

<code>let event = eventBuilder.EventName("測試截流")
.EventID('testLimit')
.LimitCount(20)
.TimePeriod(20);
/<code>

測試結果:

wrk結果

<code>Running 20s test @ http://localhost:8088/test
5 threads and 20 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 3.24ms 1.32ms 41.20ms 93.47%
Req/Sec 1.26k 218.45 1.52k 82.20%
125121 requests in 20.01s, 25.89MB read
Non-2xx or 3xx responses: 125101
Requests/sec: 6251.45
Transfer/sec: 1.29MB
/<code>

nodejs輸出:

正常,輸出20個“#”

可用性測試2

wrk配置

<code>wrk -t 5 -c 20 -d 5 http://localhost:8088/test
/<code>

5個線程,同時維持20個請求,持續週期5秒

事件配置:

5秒內,40個響應

<code>let event = eventBuilder.EventName("測試截流")
.EventID('testLimit')
.LimitCount(40)
.TimePeriod(5);
/<code>

測試結果:

wrk結果

<code>Running 5s test @ http://localhost:8088/test
5 threads and 20 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 4.09ms 2.52ms 43.84ms 94.64%
Req/Sec 1.03k 234.50 1.40k 68.00%
25550 requests in 5.01s, 5.29MB read
Non-2xx or 3xx responses: 25510
Requests/sec: 5102.71
Transfer/sec: 1.06MB
/<code>

nodejs輸出:

正常,輸出40個“#”

正確性測試:redis內查看token

代碼:lrange EVENT_testLimit 0 -1

返回:40個請求時間戳

<code> 1) "1554893376"
2) "1554893376"
3) "1554893376"
4) "1554893376"
5) "1554893376"
6) "1554893376"
7) "1554893376"
8) "1554893376"
9) "1554893376"
10) "1554893376"
11) "1554893376"
12) "1554893376"
13) "1554893376"
14) "1554893376"
15) "1554893376"
16) "1554893376"
17) "1554893376"
18) "1554893376"
19) "1554893376"
20) "1554893376"
21) "1554893376"
22) "1554893376"
23) "1554893376"
24) "1554893376"
25) "1554893376"
26) "1554893376"
27) "1554893376"
28) "1554893376"
29) "1554893376"
30) "1554893376"
31) "1554893376"
32) "1554893376"
33) "1554893376"
34) "1554893376"
35) "1554893376"
36) "1554893376"
37) "1554893376"
38) "1554893376"
39) "1554893376"
40) "1554893376"
/<code>

總結

以上測試說明,在短時間內大量的請求同時發送,不會破壞程序的正確性和可用性。最好通過查看redis內的數據,可以確保,不會因為訪問人數太大而,佔用redis大量的內存,影響性能。redis下最多隻會保存N個請求數據,N表示限制的請求個數。


分享到:


相關文章: