Nginx 基於epoll模型事件驅動流程詳解

epoll是一種基於事件驅動的模型,其是nginx能夠高效處理客戶端請求的重要原因之一。從流程上來講,epoll模型的使用主要分為三步:epoll句柄的創建,監聽文件描述符的添加和等待事件的觸發,本文將介紹nginx是如何基於這三個步驟實現客戶端請求的高效處理的。

1. epoll模型介紹

在介紹nginx的實現原理之前,我們首先需要介紹一下epoll模型的基本使用方式。epoll在使用的時候主要有三個方法:

<code>// 創建epoll句柄
int epoll_create(int size);/<code>
<code>// 往epoll句柄中添加需要進行監聽的文件描述符
int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event);/<code>
<code>// 等待需要監聽的文件描述符上對應的事件的發生
int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);/<code>

首先,我們會調用epoll_create()方法創建一個epoll實例的句柄,可以將這裡的句柄理解為一個eventpoll結構體實例,而這個結構體中有一個紅黑樹和一個隊列,紅黑樹中主要存儲需要監聽的文件描述符,而隊列則是在所監聽的文件描述符中有指定的事件發生時就會將這些事件添加到隊列中,如下圖所示為eventpoll的示意圖:

Nginx 基於epoll模型事件驅動流程詳解

一般來說,這個epoll句柄在程序的整個運行週期中只會有一個,比如nginx每個worker進程就都只維護了一個epoll句柄。在創建完句柄之後,對於我們的程序監聽的每一個端口,其實本質上也都是一個文件描述符,這個文件描述符上是可以發生Accept事件,也即接收到客戶端請求的。因而,初始時,我們會將需要監聽的端口對應的文件描述符通過epoll_ctl()方法添加到epoll句柄中。添加成功之後,這每一個監聽的文件描述符就對應了eventpoll的紅黑樹中的一個節點。另外,在調用epoll_ctl()方法添加了文件描述符之後,會將其與相應的設備(網卡)進行關聯,當設備驅動發生某個事件時,就會回調當前文件描述符的回調方法ep_poll_callback(),從而生成一個事件,並且將該事件添加到eventpoll的事件隊列中。最後,當我們調用epoll_wait()方法時,就會從epoll句柄中獲取對應的事件,本質上就是檢查eventpoll的事件隊列是否為空,如果有事件則將其返回,否則就會等待事件的發生。另外,對於epoll的使用,這裡獲取的事件一般都是Accept事件,而在處理這個事件的時候,會獲取客戶端的連接的句柄,這個句柄本質上也是一個文件描述符,此時我們則會將其繼續通過epoll_ctl()方法添加到當前的epoll句柄中,以繼續通過epoll_wait()方法等待其數據的讀取和寫入事件。

通過這裡我們可以看出,在epoll使用的過程中,會有兩類文件描述符,一類是我們所監聽的端口所對應的文件描述符,這類描述符我們一般監聽其Accept事件,以等待客戶端連接,另一類則是每個客戶端連接所對應的一個文件描述符,而這裡描述符我們一般監聽其讀寫事件以接收和發送數據給客戶端。

2. nginx中epoll實現方式

在前面的文章中,我們講解了nginx是如何初始化事件驅動框架的,其中講到事件框架的一個核心模塊的定義如下:

<code>ngx_module_t ngx_event_core_module = {
NGX_MODULE_V1,
&ngx_event_core_module_ctx, /* module context */
ngx_event_core_commands, /* module directives */
NGX_EVENT_MODULE, /* module type */
NULL, /* init master */
// 該方法主要是在master進程啟動的過程中調用的,用於初始化時間模塊
ngx_event_module_init, /* init module */
// 該方法是在各個worker進程啟動之後調用的
ngx_event_process_init, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};/<code>

這裡我們需要特別注意一下ngx_event_process_init()方法,我們講到,這個方法是在每個worker創建的時候進行初始化調用的,這裡面就涉及到兩個非常重要的調用:a. 進行對應的事件模型的初始化;b. 監聽配置文件中指定的各個端口。如下是這兩個步驟的主要代碼:

<code>static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) {
// 省略部分代碼....

// 在nginx.conf配置文件的events{}配置塊中需要使用use指令指定當前使用的事件模型,

// 此時就會將所使用的事件模型的索引號存儲在ecf->use中,下面的代碼就是通過這種方式獲取當前
// 所指定的事件模型所對應的模塊的,然後調用該模塊的actions.init()方法初始化該事件模型
for (m = 0; cycle->modules[m]; m++) {
if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
continue;
}

// ecf->use存儲了所選用的事件模型的模塊序號,這裡是找到該模塊
if (cycle->modules[m]->ctx_index != ecf->use) {
continue;
}

// module即為所選用的事件模型對應的模塊
module = cycle->modules[m]->ctx;

// 調用指定事件模型的初始化方法
if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
exit(2);
}

break;
}

// 省略部分代碼...

ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {

#if (NGX_HAVE_REUSEPORT)
if (ls[i].reuseport && ls[i].worker != ngx_worker) {
continue;
}
#endif

// 這裡是為當前所監聽的每一個端口都綁定一個ngx_connection_t結構體

c = ngx_get_connection(ls[i].fd, cycle->log);

if (c == NULL) {
return NGX_ERROR;
}

rev = c->read;

// SOCK_STREAM表示TCP,一般都是TCP,也就是說在接收到客戶端的accept事件之後,
// 就會調用ngx_event_accept()方法處理該事件
rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg;

if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ccf->worker_processes > 1) {
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT) == NGX_ERROR) {
return NGX_ERROR;
}

continue;
}
}

return NGX_OK;
}/<code>

對這裡的代碼主要完成了如下幾部分的工作:

首先找到所使用的事件模型模塊,然後調用其init()方法初始化該模型,這個方法裡主要做了兩件事,一個是通過epoll_create()方法創建一個epoll句柄,該句柄是當前worker進程運行的一個基礎;另一個是為全局變量ngx_event_actions進行了賦值,即:

<code>// 這裡將epoll相關的事件操作方法賦值給ngx_event_actions,
// 也就是說後續有相關的事件發生則都會使用epoll相關的方法
ngx_event_actions = ngx_epoll_module_ctx.actions;/<code>

這個賦值的調用是非常重要的,在賦值之後,nginx所定義的幾個方法宏就都是使用的epoll模塊中所指定的方法,這裡的幾個宏定義如下:

<code>#define ngx_process_events   ngx_event_actions.process_events
#define ngx_done_events ngx_event_actions.done

#define ngx_add_event ngx_event_actions.add
#define ngx_del_event ngx_event_actions.del
#define ngx_add_conn ngx_event_actions.add_conn
#define ngx_del_conn ngx_event_actions.del_conn/<code>

而這裡的ngx_epoll_module_ctx.actions結構體的定義如下:

<code>{
// 對應於ngx_event_actions_t中的add方法
ngx_epoll_add_event, /* add an event */
// 對應於ngx_event_actions_t中的del方法
ngx_epoll_del_event, /* delete an event */
// 對應於ngx_event_actions_t中的enable方法,與add方法一致
ngx_epoll_add_event, /* enable an event */
// 對應於ngx_event_actions_t中的disable方法,與del方法一致
ngx_epoll_del_event, /* disable an event */
// 對應於ngx_event_actions_t中的add_conn方法
ngx_epoll_add_connection, /* add an connection */
// 對應於ngx_event_actions_t中的del_conn方法
ngx_epoll_del_connection, /* delete an connection */
#if (NGX_HAVE_EVENTFD)
ngx_epoll_notify, /* trigger a notify */
#else
NULL, /* trigger a notify */
#endif
// 對應於ngx_event_actions_t中的process_events方法
ngx_epoll_process_events, /* process the events */
// 對應於ngx_event_actions_t中的init方法
ngx_epoll_init, /* init the events */
// 對應於ngx_event_actions_t中的done方法
ngx_epoll_done, /* done the events */

}/<code>

由此,就可以看出nginx出色的設計方式了,通過我們所選用的事件模型,就可以動態的為ngx_add_event()等宏指定所實現的子模塊了。

  • 上面的方法完成的第二個主要的工作就是遍歷所有監聽的端口,獲取其描述符,然後通過ngx_add_event()方法將其添加到epoll句柄中以監聽其客戶端連接事件。從這裡就可以感覺到比較巧妙了,因為上面一步中正好對epoll模塊進行了初始化,並且設置了ngx_add_event()宏的實現方法,而這裡就使用到了這裡設置的方法,該方法本質上就是通過epoll_ctl()方法將當前監聽的socket描述符添加到epoll句柄中;
  • 最後就是上面的方法在遍歷所有監聽的端口的時候,為每個連接的accept事件添加的回調方法是ngx_event_accept(),通過前面我們對epoll模型的使用方式的介紹,我們大概可以理解,這裡的ngx_event_accept()方法的主要作用是將當前accept到的客戶端連接的句柄通過epoll_ctl()方法添加到當前epoll句柄中,以繼續監聽其讀寫事件;

這裡我們首先看一下上面第一點中介紹的module->actions.init(cycle, ngx_timer_resolution)方法調用時是如何初始化epoll模塊的。由於是epoll模塊,這裡的init()方法指向的就是ngx_epoll_init()方法,如下是該方法的源碼:

<code>static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { 

ngx_epoll_conf_t *epcf;

// 獲取解析得到的ngx_epoll_conf_t結構體
epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

if (ep == -1) {
// 創建eventpoll結構體,將創建得到的文件描述符返回
ep = epoll_create(cycle->connection_n / 2);

// ep==-1表示創建失敗
if (ep == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"epoll_create() failed");
return NGX_ERROR;
}
}

// 如果nevents小於epcf->events,說明event_list數組的長度不夠,因而需要重新申請內存空間
if (nevents < epcf->events) {
if (event_list) {
ngx_free(event_list);
}

// 為event_list重新申請內存空間
event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log);
if (event_list == NULL) {
return NGX_ERROR;
}
}

// 將nevents更新為配置文件中指定的大小
nevents = epcf->events;

ngx_io = ngx_os_io;

// 這裡將epoll相關的事件操作方法賦值給ngx_event_actions,也就是說後續有相關的事件發生則
// 都會使用epoll相關的方法
ngx_event_actions = ngx_epoll_module_ctx.actions;


// 這裡NGX_USE_CLEAR_EVENT指的是使用ET模式來使用epoll,默認使用ET模式,
// 而NGX_USE_LEVEL_EVENT表示使用LE模式來使用epoll
#if (NGX_HAVE_CLEAR_EVENT)
ngx_event_flags = NGX_USE_CLEAR_EVENT
#else
ngx_event_flags = NGX_USE_LEVEL_EVENT
#endif
// NGX_USE_GREEDY_EVENT表示每次拉取事件是都嘗試拉取最多的事件
| NGX_USE_GREEDY_EVENT
| NGX_USE_EPOLL_EVENT;

return NGX_OK;
}/<code>

可以看到,這裡的ngx_epoll_init()方法主要的作用有兩個:a. 通過epoll_create()方法創建一個epoll句柄;b. 設置ngx_event_actions屬性所指向的方法的實現,從而確定ngx_add_event()等宏的實現方法。下面我們來看一下ngx_add_event()是如何將需要監聽的文件描述符添加到epoll句柄中的:

<code>static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) {
int op;
uint32_t events, prev;
ngx_event_t *e;
ngx_connection_t *c;
struct epoll_event ee;

// ev->data在使用的過程中存儲的是當前對應的ngx_connection_t,如果是free_connection,
// 則存儲的是下一個節點的指針
c = ev->data;

// 事件類型
events = (uint32_t) event;

// 如果是讀事件
if (event == NGX_READ_EVENT) {

e = c->write;
prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN | EPOLLRDHUP)
events = EPOLLIN | EPOLLRDHUP; // 設置讀事件類型
#endif

} else {
e = c->read;
prev = EPOLLIN | EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
events = EPOLLOUT; // 設置寫事件類型
#endif
}

// 根據active標誌位確定是否為活躍事件,以決定到底是修改還是添加事件
if (e->active) {
op = EPOLL_CTL_MOD; // 類型為修改事件
events |= prev;

} else {
op = EPOLL_CTL_ADD; // 類型為添加事件
}

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
if (flags & NGX_EXCLUSIVE_EVENT) {
events &= ~EPOLLRDHUP;
}
#endif

// 將flags參數指定的事件添加到監聽列表中
ee.events = events | (uint32_t) flags;
// 這裡是將connection指針的最後一位賦值為ev->instance,然後將其賦值給事件的ptr屬性,通過這種方式檢測事件是否過期
ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"epoll add event: fd:%d op:%d ev:%08XD",
c->fd, op, ee.events);

// 將事件添加到epoll句柄中

if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
"epoll_ctl(%d, %d) failed", op, c->fd);
return NGX_ERROR;
}

// 將事件標記為活躍狀態
ev->active = 1;
#if 0
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

return NGX_OK;
}/<code>

這裡的ngx_add_event()方法本質上是比較簡單的,就是將當前的ngx_event_t轉換為一個epoll_event結構體,並且會設置該結構體中需要監聽的事件類型,然後通過epoll_ctl()方法將當前epoll_event添加到epoll句柄中。

在前面的ngx_event_process_init()方法中,nginx通過ngx_add_event()方法將各個監聽的端口的描述符添加到epoll句柄中之後,就會開始監聽這些描述符上的accept連接事件,如果有客戶端連接請求,此時就會回調ngx_event_accept()方法處理該請求,我們來看一下該方法是如何處理客戶端建立連接的請求的:

<code>/**
* 當客戶端有accept事件到達時,將調用此方法處理該事件
*/
void ngx_event_accept(ngx_event_t *ev) {
socklen_t socklen;
ngx_err_t err;
ngx_log_t *log;
ngx_uint_t level;
ngx_socket_t s;
ngx_event_t *rev, *wev;
ngx_sockaddr_t sa;

ngx_listening_t *ls;
ngx_connection_t *c, *lc;
ngx_event_conf_t *ecf;
#if (NGX_HAVE_ACCEPT4)
static ngx_uint_t use_accept4 = 1;
#endif

if (ev->timedout) {
// 如果當前事件超時了,則繼續將其添加到epoll句柄中以監聽accept事件
if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
return;
}

ev->timedout = 0;
}

// 獲取解析event核心配置結構體
ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
ev->available = ecf->multi_accept;
}

lc = ev->data;
ls = lc->listening;
ev->ready = 0;

do {
socklen = sizeof(ngx_sockaddr_t);

#if (NGX_HAVE_ACCEPT4)
if (use_accept4) {
s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
} else {
s = accept(lc->fd, &sa.sockaddr, &socklen);
}
#else
// 這裡lc->fd指向的是監聽的文件句柄,調用accept()獲取客戶端的連接,並且將其存儲到sa.sockaddr中
s = accept(lc->fd, &sa.sockaddr, &socklen);
#endif

// 檢查當前進程獲取的連接個數是否超過了最大可用連接數的7/8,是則不再繼續接收連接

ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

// 獲取新的連接
c = ngx_get_connection(s, ev->log);

// 獲取連接失敗則直接返回
if (c == NULL) {
if (ngx_close_socket(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_close_socket_n
" failed");
}

return;
}

// 標記當前為TCP連接
c->type = SOCK_STREAM;

// 為當前連接創建連接池
c->pool = ngx_create_pool(ls->pool_size, ev->log);
if (c->pool == NULL) {
ngx_close_accepted_connection(c);
return;
}

// 更新socklen的長度
if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
socklen = sizeof(ngx_sockaddr_t);
}

// 為sockaddr申請內存空間,並且將客戶端連接地址複製到c->sockaddr中
c->sockaddr = ngx_palloc(c->pool, socklen);
if (c->sockaddr == NULL) {
ngx_close_accepted_connection(c);
return;
}

ngx_memcpy(c->sockaddr, &sa, socklen);

// 申請ngx_log_t結構體的內存空間
log = ngx_palloc(c->pool, sizeof(ngx_log_t));
if (log == NULL) {

ngx_close_accepted_connection(c);
return;
}

/* set a blocking mode for iocp and non-blocking mode for others */

if (ngx_inherited_nonblocking) {
if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
// 將連接設置為阻塞模式
if (ngx_blocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_blocking_n
" failed");
ngx_close_accepted_connection(c);
return;
}
}

} else {
if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
// 將連接設置為非阻塞模式
if (ngx_nonblocking(s) == -1) {
ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
ngx_nonblocking_n
" failed");
ngx_close_accepted_connection(c);
return;
}
}
}

*log = ls->log;

// 設置連接的基本屬性
c->recv = ngx_recv;
c->send = ngx_send;
c->recv_chain = ngx_recv_chain;
c->send_chain = ngx_send_chain;

c->log = log;
c->pool->log = log;

c->socklen = socklen;
c->listening = ls;
c->local_sockaddr = ls->sockaddr;
c->local_socklen = ls->socklen;

#if (NGX_HAVE_UNIX_DOMAIN)
if (c->sockaddr->sa_family == AF_UNIX) {
c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
/* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
c->sendfile = 0;
#endif
}
#endif

rev = c->read;
wev = c->write;

wev->ready = 1;

if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
rev->ready = 1;
}

if (ev->deferred_accept) {
rev->ready = 1;
#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
rev->available = 1;
#endif
}

rev->log = log;
wev->log = log;

// 更新連接使用次數
c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

// 將網絡地址更新為字符串形式的地址
if (ls->addr_ntop) {
c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
if (c->addr_text.data == NULL) {
ngx_close_accepted_connection(c);
return;
}

c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
c->addr_text.data,
ls->addr_text_max_len, 0);
if (c->addr_text.len == 0) {
ngx_close_accepted_connection(c);
return;
}

}

#if (NGX_DEBUG)
{
ngx_str_t addr;
u_char text[NGX_SOCKADDR_STRLEN];

ngx_debug_accepted_connection(ecf, c);

if (log->log_level & NGX_LOG_DEBUG_EVENT) {
addr.data = text;
addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
NGX_SOCKADDR_STRLEN, 1);

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,
"*%uA accept: %V fd:%d", c->number, &addr, s);
}

}
#endif

// 將當前連接添加到epoll句柄中進行監控
if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
if (ngx_add_conn(c) == NGX_ERROR) {
ngx_close_accepted_connection(c);
return;
}
}

log->data = NULL;
log->handler = NULL;

// 建立新連接之後的回調方法
ls->handler(c);

if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
ev->available--;
}

} while (ev->available);
}/<code>

這裡客戶端連接的建立過程主要可以分為如下幾個步驟:

  • 首先調用accept()方法獲取當前客戶端建立的連接,並且將其地址信息保存到結構體sa中;
  • 接著通過調用ngx_get_connection()方法獲取一個ngx_connection_t結構體以對應當前獲取到的客戶端連接,並且會初始化該結構體的各個屬性;
  • 調用ngx_add_conn()方法將當前方法添加到epoll句柄中,這裡的添加過程本質上就是通過epoll_ctl()方法將當前客戶端的連接的文件描述符添加到epoll句柄中,以監聽其讀寫事件;

如此我們就講解了從epoll句柄的創建,到指定的端口的監聽,接著處理客戶端連接,並且將客戶端連接對應的文件描述符繼續添加到epoll句柄中以監聽讀寫事件的流程。下面我們繼續來看一下nginx是如何等待所監聽的這些句柄上的事件的發生的,也即整個事件框架的驅動程序。worker進程對於事件的處理,主要在ngx_process_events_and_timers()方法中,如下是該方法的源碼:

<code>void ngx_process_events_and_timers(ngx_cycle_t *cycle) {
\t// 嘗試獲取共享鎖
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}

// 這裡開始處理事件,對於kqueue模型,其指向的是ngx_kqueue_process_events()方法,
// 而對於epoll模型,其指向的是ngx_epoll_process_events()方法
// 這個方法的主要作用是,在對應的事件模型中獲取事件列表,然後將事件添加到ngx_posted_accept_events

// 隊列或者ngx_posted_events隊列中
(void) ngx_process_events(cycle, timer, flags);

// 這裡開始處理accept事件,將其交由ngx_event_accept.c的ngx_event_accept()方法處理;
ngx_event_process_posted(cycle, &ngx_posted_accept_events);

// 開始釋放鎖
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}

// 如果不需要在事件隊列中進行處理,則直接處理該事件
// 對於事件的處理,如果是accept事件,則將其交由ngx_event_accept.c的ngx_event_accept()方法處理;
// 如果是讀事件,則將其交由ngx_http_request.c的ngx_http_wait_request_handler()方法處理;
// 對於處理完成的事件,最後會交由ngx_http_request.c的ngx_http_keepalive_handler()方法處理。

// 這裡開始處理除accept事件外的其他事件
ngx_event_process_posted(cycle, &ngx_posted_events);
}
/<code>

這裡的ngx_process_events_and_timers()方法我們省略了大部分代碼,只留下了主要的流程。簡而言之,其主要實現瞭如下幾個步驟的工作:

  • 獲取共享鎖,以得到獲取客戶端連接的權限;
  • 調用ngx_process_events()方法監聽epoll句柄中各個文件描述符的事件,並且處理這些事件。在前面我們講到,nginx在調用epoll模塊的init()方法時,初始化了ngx_event_actions屬性的值,將其指向了epoll模塊所實現的方法,這裡就包括ngx_process_events方法宏所對應的方法,也即ngx_epoll_process_events()方法,因而這裡其實就可以理解,ngx_epoll_process_events()方法本質上就是調用epoll_wait()方法等待epoll句柄上所監聽的事件的發生;
  • 處理ngx_posted_accept_events隊列中的事件,這些事件其實就是前面講到的客戶端建立連接的事件,在ngx_epoll_process_events()方法中獲取到事件之後,會判斷其是accept事件還是讀寫事件,如果是accept事件,就會將其添加到ngx_posted_accept_events隊列中,如果是讀寫事件,就會將其添加到ngx_posted_events隊列中;
  • 釋放共享鎖,以讓其他的worker進程可以獲取鎖,從而接收客戶端連接;
  • 處理ngx_posted_events隊列中的事件,也即客戶端連接的讀寫事件。從這裡就可以看出nginx高性能的一個原因,其將accept事件和讀寫事件放到了兩個不同的隊列中,accept事件是必須在鎖內部處理的,而讀寫事件則可以異步於accept事件,這提高了nginx處理客戶端請求的能力。

下面我們來看一下ngx_epoll_process_events()方法是如何處理epoll句柄中的事件的:

<code>static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) {
int events;
uint32_t revents;
ngx_int_t instance, i;
ngx_uint_t level;
ngx_err_t err;
ngx_event_t *rev, *wev;
ngx_queue_t *queue;
ngx_connection_t *c;

/* NGX_TIMER_INFINITE == INFTIM */

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll timer: %M", timer);

// 通過epoll_wait()方法進行事件的獲取,獲取到的事件將存放在event_list中,並且會將獲取的事件個數返回

events = epoll_wait(ep, event_list, (int) nevents, timer);

err = (events == -1) ? ngx_errno : 0;

// 這裡的ngx_event_timer_alarm是通過一個定時器任務來觸發的,在定時器中會將其置為1,
// 從而實現定期更新nginx緩存的時間的目的
if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
ngx_time_update();
}

if (err) {
if (err == NGX_EINTR) {

if (ngx_event_timer_alarm) {
ngx_event_timer_alarm = 0;
return NGX_OK;
}

level = NGX_LOG_INFO;

} else {
level = NGX_LOG_ALERT;
}

ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
return NGX_ERROR;
}

// 獲取的事件個數為0
if (events == 0) {
// 如果當前時間類型不為NGX_TIMER_INFINITE,說明獲取事件超時了,則直接返回
if (timer != NGX_TIMER_INFINITE) {
return NGX_OK;
}

// 這裡說明時間類型為NGX_TIMER_INFINITE,但是卻返回了0個事件,說明epoll_wait()調用出現了問題
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"epoll_wait() returned no events without timeout");
return NGX_ERROR;
}


// 遍歷各個事件
for (i = 0; i < events; i++) {
// 每個事件的data.ptr中存儲了當前事件對應的connection對象
c = event_list[i].data.ptr;

// 獲取事件中存儲的instance的值
instance = (uintptr_t) c & 1;
// 獲取connection指針地址值
c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

// 獲取讀事件結構體
rev = c->read;

// 如果當前連接的文件描述符為-1,獲取其instance不等於當前事件的instance,
// 說明該連接已經過期了,則不對該事件進行處理
if (c->fd == -1 || rev->instance != instance) {

/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}

// 獲取當前事件監聽的類型
revents = event_list[i].events;

ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: fd:%d ev:%04XD d:%p",
c->fd, revents, event_list[i].data.ptr);

// 如果事件發生錯誤,則打印相應的日誌
if (revents & (EPOLLERR | EPOLLHUP)) {
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,

"epoll_wait() error on fd:%d ev:%04XD",
c->fd, revents);

/*
* if the error events were returned, add EPOLLIN and EPOLLOUT
* to handle the events at least in one active handler
*/

revents |= EPOLLIN | EPOLLOUT;
}

#if 0
if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"strange epoll_wait() events fd:%d ev:%04XD",
c->fd, revents);
}
#endif

// 如果當前是讀事件,並且事件是活躍的
if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
if (revents & EPOLLRDHUP) {
rev->pending_eof = 1;
}

rev->available = 1;
#endif

// 將事件標記為就緒狀態
rev->ready = 1;

// 默認是開啟了NGX_POST_EVENTS開關的
if (flags & NGX_POST_EVENTS) {
// 如果當前是accept事件,則將其添加到ngx_posted_accept_events隊列中,
// 如果是讀寫事件,則將其添加到ngx_posted_events隊列中
queue = rev->accept ? &ngx_posted_accept_events
: &ngx_posted_events;

ngx_post_event(rev, queue);

} else {

// 如果不需要分離accept和讀寫事件,則直接處理該事件
rev->handler(rev);
}
}

// 獲取寫事件結構體
wev = c->write;

if ((revents & EPOLLOUT) && wev->active) {

// 如果當前連接的文件描述符為-1,獲取其instance不等於當前事件的instance,
// 說明該連接已經過期了,則不對該事件進行處理
if (c->fd == -1 || wev->instance != instance) {

/*
* the stale event from a file descriptor
* that was just closed in this iteration
*/

ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"epoll: stale event %p", c);
continue;
}

// 將當前事件標記為就緒狀態
wev->ready = 1;
#if (NGX_THREADS)
wev->complete = 1;
#endif

// 由於是寫事件,並且需要標記為了NGX_POST_EVENTS狀態,
// 因而將其直接添加到ngx_posted_events隊列中,否則直接處理該事件
if (flags & NGX_POST_EVENTS) {
ngx_post_event(wev, &ngx_posted_events);

} else {
wev->handler(wev);
}

}
}

return NGX_OK;
}/<code>

這裡ngx_epoll_process_events()方法首先就是調用epoll_wait()方法獲取所監聽的句柄的事件,然後遍歷獲取的事件,根據事件的類型,如果是accept事件,則添加到ngx_posted_accept_events隊列中,如果是讀寫事件,則添加到ngx_posted_events隊列中,而隊列中事件的處理,則在上面介紹的ngx_process_events_and_timers()方法中進行。

4. 小結

本文首先對epoll模型的實現原理進行了講解,然後從源碼的層面對nginx是如何基於epoll模型實現事件驅動模式的原理進行了講解。

轉載自:https://my.oschina.net/zhangxufeng/blog/3158026


分享到:


相關文章: