前言
- 同步I/O模型通常用於實現Reactor模式
- 異步I/O模型則用於實現Proactor模式
- 最後我們會使用同步I/O方式模擬出Proactor模式
一、Reactor模式
- Reactor 釋義“反應堆”,是一種
事件驅動機制
Reactor的回調函數:
和普通函數調用的不同之處在於,應用程序不是主動的調用某個 API 完成處理,而是恰恰 相反,Reactor 逆置了事件處理流程,應用程序需要提供相應的接口並註冊到 Reactor 上, 如果相應的時間發生,Reactor 將主動調用應用程序註冊的接口,這些接口又稱為“回調函數”
- Reactor 模式是處理併發I/O比較常見的一種模式,用於同步 I/O,中心思想是將所有要處理的I/O 事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上; 一旦有 I/O 事件到來或是準備就緒(文件描述符或 socket 可讀、寫),多路複用器返回並將事先註冊的相應 I/O 事件分發到對應的處理器中。
Reactor 模型有三個重要的組件:
多路複用器:由操作系統提供,在 linux 上一般是 select, poll, epoll 等系統調用。事件分發器:將多路複用器中返回的就緒事件分到對應的處理函數中事件處理器:負責處理特定事件的處理函數具體流程如下:
註冊讀就緒事件和相應的事件處理器事件分離器等待事件事件到來,激活分離器,分離器調用事件對應的處理器事件處理器完成實際的讀操作,處理讀到的數據,註冊新的事件,然後返還控制 權
需要C/C++ Linux服務器架構師學習資料後臺私信“1”免費獲取(資料包括C/C++,Linux,golang技術,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK,ffmpeg等),免費分享
多線程Reactor模式多線程Reactor模式特點:它要求主線程(I/O處理單元)只負責監聽文件描述符上是否有事件發生,有的話就立即將時間通知工作線程(邏輯單元)。除此之外,主線程不做任何其他實質性的工作讀寫數據,接受新的連接,以及處理客戶請求均在工作線程中完成工作流程:①主線程往epoll內核事件表中註冊socket上有數據可讀②主線程調用epoll_wait等待socket上有數據可讀③當socket上有數據可讀時,epoll_wait通知主線程。主線程則將socket可讀事件放入請求隊列④睡眠在請求請求隊列上的某個工作線程被喚醒,它從socket讀取數據,並處理客戶請求,然後往epoll內核事件表中註冊該socket上的寫就緒時間⑤主線程調用epoll_wait等到socket可寫⑥當socket可寫時,epoll_wait通知主線程。主線程將socket可寫事件放入請求隊列⑦睡眠在請求隊列上的某個工作線程被喚醒,它向socket上寫入服務器處理客戶請求的結果
單線程Reactor模式單線程Reactor模式與多線程Reactor模式原理相同。但是工作都是在同一個線程中完成的單線程優缺點:優點:Reactor模型開發效率上比起直接使用IO複用要高,它通常是單線程的,設計目標是希望單線程使用一顆 CPU 的全部資源。優點為每個事件處理中很多時候可以 不考慮共享資源的互斥訪問缺點:可是缺點也是明顯的,現在的硬件發展,已經不再遵循摩爾定 律,CPU 的頻率受制於材料的限制不再有大的提升,而改為是從核數的增加上提升能力單線程Reactor使用多核:如果程序業務很簡單,例如只是簡單的訪問一些提供了併發訪問的服務,就可以直接開啟多個反應堆(Reactor),每個反應堆對應一顆CPU核心這些反應堆上跑的請求互不相關,這是完全可以利用多核的。例如Nginx這樣的http靜態服務器下面是單線程Reactor模式的實現代碼,下載下來之後可以直接編譯運行:
<code>typedef
int
NCALLBACK
(
int
,int
,void
*);struct
ntyevent
{int
fd;int
events;void
*arg;int
(*callback)(int
fd,int
events,void
*arg);int
status;char
buffer[MAX_BUFFER_SIZE];int
length;long
last_active; };struct
ntyreactor
{int
epoll_fd;struct
ntyevent
*events
; };
int
init_server
(
char
*ip, short port);int
ntyreactor_addlistener
(struct ntyreactor *reactor,
int
fd, NCALLBACK callback);struct ntyreactor *
ntyreactor_init
()
;int
ntyreactor_destroy
(struct ntyreactor *reactor)
;int
ntyreactor_run
(struct ntyreactor *reactor)
;int
nty_event_set
(struct ntyevent *ev,
int
fd,int
event,int
length,int
status, NCALLBACK callback,void
*arg);int
nty_event_add
(
int
epoll_fd, struct ntyevent* ev);int
nty_event_del
(
int
epoll_fd, struct ntyevent* event);int
accept_callback
(
int
fd,int
events,void
*arg);int
recv_callback
(
int
fd,int
events,void
*arg);int
send_callback
(
int
fd,int
events,void
*arg);int
main
(
int
argc,char
*argv[]) {if
(argc !=3
) {printf
("usage: ./%s [ip] [port]\n"
, basename(argv[0
]));exit
(EXIT_FAILURE); }char
*ip = argv[1
]; short port = atoi(argv[2
]);int
sock_fd; sock_fd = init_server(ip, port);struct
ntyreactor
*reactor
=ntyreactor_init
();if
( reactor ==NULL
) {printf
("Error in %s(), ntyreactor_init: create reactor error\n"
, __func__);exit
(EXIT_FAILURE); } ntyreactor_addlistener(reactor, sock_fd, accept_callback); ntyreactor_run(reactor); ntyreactor_destroy(reactor); close(sock_fd);return
0
; }int
init_server
(
char
*ip, short port) {int
sock_fd = socket(AF_INET, SOCK_STREAM,0
);if
(sock_fd ==-1
) {printf
("Error in %s(), socket: %s\n"
, __func__, strerror(errno));return
-1
; }struct
sockaddr_in
server_addr
;memset
(&server_addr,0
,sizeof
(server_addr)); server_addr.sin_family = AF_INET;if
(inet_pton(AF_INET, ip, (void
*)&server_addr.sin_addr.s_addr) ==-1
) {printf
("Error in %s(), inet_pton: %s\n"
, __func__, strerror(errno));return
-1
; } server_addr.sin_port = htons(port);if
(bind(sock_fd, (const
struct sockaddr*)&server_addr,sizeof
(server_addr)) ==-1
) {printf
("Error in %s(), bind: %s\n"
, __func__, strerror(errno));return
-1
; }if
(listen(sock_fd,20
) ==-1
) {printf
("Error in %s(), listen: %s\n"
, __func__, strerror(errno));return
-1
; }printf
("Listen start [%s:%d]...\n"
, inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));return
sock_fd; }struct ntyreactor *
ntyreactor_init
()
{struct
ntyreactor
*reactor
= (struct
ntyreactor
*)malloc
(sizeof
(struct
ntyreactor
));if
(reactor ==NULL
)return
NULL
;memset
(reactor,0
,sizeof
(struct ntyreactor)); reactor->epoll_fd = epoll_create(1
);if
(reactor->epoll_fd ==-1
) {printf
("Error in %s(), epoll_create: %s\n"
, __func__, strerror(errno));free
(reactor);return
NULL
; } reactor->events = (struct ntyevent*)malloc
(sizeof
(struct ntyevent) * MAX_EPOLL_EVENTS);if
(reactor->events ==NULL
) {printf
("Error in %s(), malloc: %s\n"
, __func__, strerror(errno)); close(reactor->epoll_fd);free
(reactor);return
NULL
; }return
reactor; }int
ntyreactor_destroy
(struct ntyreactor *reactor)
{if
(reactor ==NULL
) {printf
("Error in %s(): %s\n"
, __func__,"reactor arg is NULL"
);return
-1
; } close(reactor->epoll_fd);free
(reactor->events);free
(reactor);return
0
; }int
ntyreactor_run
(struct ntyreactor *reactor)
{if
(reactor ==NULL
|| reactor->epoll_fd0
|| reactor->events ==NULL
) {printf
("Error in %s(): %s\n"
, __func__,"reactor arg is error"
);return
-1
; }
struct
epoll_event
ep_events
[MAX_EPOLL_EVENTS
+ 1];int
nready;while
(1
) { nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS,1000
);if
(nready ==-1
) {if
(errno == EAGAIN || errno == EWOULDBLOCK)continue
;printf
("Error in %s(), epoll_wait: %s\n"
, __func__, strerror(errno));return
-1
; }else
if
(nready ==0
)continue
;else
{int
i;for
(i =0
; i < nready; ++i) {struct
ntyevent
*ev
= (struct
ntyevent
*)ep_events
[i
].data
.ptr
;if
((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) ev->callback(ev->fd, ev->events, ev->arg);if
((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) ev->callback(ev->fd, ev->events, ev->arg); } } }return
0
; }int
ntyreactor_addlistener
(struct ntyreactor *reactor,
int
fd, NCALLBACK callback) {if
(reactor ==NULL
|| fd <0
|| callback ==NULL
) {printf
("Error in %s(): %s\n"
, __func__,"arg error"
);return
-1
; } nty_event_set(&reactor->events[fd], fd, EPOLLIN,0
,0
, callback, reactor); nty_event_add(reactor->epoll_fd, &reactor->events[fd]);return
0
; }int
nty_event_set
(struct ntyevent *ev,
int
fd,int
event,int
length,int
status, NCALLBACK callback,void
*arg) {if
(ev ==NULL
|| fd <0
|| event <0
|| length0
|| callback ==NULL
|| arg ==NULL
|| status0
) {printf
("Error in %s(): %s\n"
, __func__,"arg error"
);return
-1
; } ev->fd = fd; ev->events = event; ev->arg = arg; ev->callback = callback; ev->status = status; ev->length = length; ev->last_active = time(NULL
);return
0
; }int
nty_event_add
(
int
epoll_fd, struct ntyevent* ev) {if
(epoll_fd <0
|| ev ==NULL
) {printf
("Error in %s(): %s\n"
, __func__,"arg error"
);return
-1
; }struct
epoll_event
ep_event
;memset
(&ep_event,0
,sizeof
(ep_event)); ep_event.events = ev->events; ep_event.data.ptr = ev;int
op;if
(ev->status ==0
) { op = EPOLL_CTL_ADD; ev->status =1
; }else
op = EPOLL_CTL_MOD;if
(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) ==-1
) {printf
("Error in %s(), epoll_ctl: %s\n"
, __func__, strerror(errno));return
-1
; }return
0
; }int
nty_event_del
(
int
epoll_fd, struct ntyevent* ev) {if
(epoll_fd0
|| ev ==NULL
|| ev->status !=1
) {printf
("Error in %s(): %s\n"
, __func__,"ev arg is error"
);return
-1
; }struct
epoll_event
ep_event
;memset
(&ep_event,0
,sizeof
(ep_event)); ep_event.data.ptr = ev; ev->status =0
;if
(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) ==-1
) {printf
("Error in %s(), epoll_ctl: %s\n"
, __func__, strerror(errno));return
-1
; }return
0
; }int
accept_callback
(
int
fd,int
events,void
*arg) {struct
ntyreactor
*reactor
= (struct
ntyreactor
*)arg
;struct
ntyevent
*ev
=reactor
->events
+fd
;struct
sockaddr_in
cli_addr
;memset
(&cli_addr,0
,sizeof
(cli_addr));socklen_t
len =sizeof
(cli_addr);int
cli_fd; cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len);if
(cli_fd ==-1
) {printf
("Error in %s(), accept: %s\n"
, __func__, strerror(errno));return
-1
; }int
i;do
{for
(i =5
; i< MAX_EPOLL_EVENTS; ++i) {if
(reactor->events[i].status ==0
)break
; }if
(i == MAX_EPOLL_EVENTS) {printf
("Error in %s(): max connect limit[%d]\n"
, __func__, MAX_EPOLL_EVENTS);return
-1
; }int
flag =0
;if
((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK))0
) {printf
("Error in %s(), fcntl: %s\n"
, __func__, strerror(errno));return
-1
; } nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN,0
,0
, recv_callback, reactor); nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]); }while
(0
);printf
("New connect: [%s:%d], [time:%ld], pos[%d]\n"
, \ inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);return
0
; }int
recv_callback
(
int
fd,int
events,void
*arg) {struct
ntyreactor
*reactor
=(struct
ntyreactor
*)arg
;struct
ntyevent
*ev
=reactor
->events
+fd
; nty_event_del(reactor->epoll_fd, ev);int
rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE,0
);if
(rc0
) {printf
("Error in %s(), recv: %s\n"
, __func__, strerror(errno)); close(ev->fd); }else
if
(rc ==0
) {printf
("Client closed the connection, fd = %d\n"
, ev->fd); close(ev->fd); }else
{ ev->buffer[rc] ='\0'
;printf
("Recv[fd = %d]: %s\n"
, ev->fd, ev->buffer); nty_event_set(ev, ev->fd, EPOLLOUT, rc,0
, send_callback, reactor); nty_event_add(reactor->epoll_fd, ev); }return
rc; }int
send_callback
(
int
fd,int
events,void
*arg) {struct
ntyreactor
*reactor
=(struct
ntyreactor
*)arg
;struct
ntyevent
*ev
=reactor
->events
+fd
;int
rc = send(ev->fd, ev->buffer, ev->length,0
);if
(rc >0
) {printf
("Send[fd = %d]: %s\n"
, ev->fd, ev->buffer); nty_event_del(reactor->epoll_fd, ev); nty_event_set(ev, ev->fd, EPOLLIN,0
,0
, recv_callback, reactor); nty_event_add(reactor->epoll_fd, ev); }else
{printf
("Error in %s(), send: %s\n"
, __func__, strerror(errno)); close(ev->fd); nty_event_del(reactor->epoll_fd, ev); }return
rc; }/<code>
二、Proactor模式
Proactor模式特點與Reactor不同,Proactor模式將所有的I/O操作都交給主線程和內核來處理,工作線程僅僅負責業務邏輯
Proactor模式的工作流程①主線程調用aio_read函數向內核註冊socket上讀完成事件,並告訴內核用戶讀緩衝區的位置,以及讀操作完成時如何通知應用程序(這裡以信號為例)②主線程繼續處理其他邏輯③當socket上的數據被讀入用戶緩衝區後,內核將嚮應用程序發送一個信號,以通知應用程序數據已經可用④應用程序預先定義好的信號處理函數選擇一個工作線程來處理客戶請求。工作線程處理完客戶請求之後,調用aio_write函數向內核註冊socket上的寫完成事件,並告訴內核用戶寫緩衝區的位置,以及寫操作完成時如何通知應用程序(這裡以信號為例)⑤主線程繼續處理其他邏輯⑥當用戶緩衝區的數據被寫入socket之後,內核將嚮應用程序發送一個信號,以通知應用程序數據已經發送完畢⑦應用程序預先定義好的信號處理函數選擇一個工作線程來做善後處理,比如決定是否關閉socket在上圖中,連接socket上的讀寫事件是通過aio_read/aio_write向內核註冊的,因此內核將通過信號來嚮應用程序報告連接socket上的讀寫事件。所以,主線程的epoll_wait調用僅能用來檢測監聽socket上的連接請求事件,而不能用來檢測連接socket的讀寫事件
三、使用同步I/O模擬Proactor模式
原理:主線程執行數據讀寫操作,讀寫完成之後,主線程向工作線程通知這一“完成事件”。那麼從工作線程的角度來看,它們就直接獲得了數據讀寫的結果,接下來要做的只是對讀寫的結果進行邏輯處理
工作流程:①主線程往epoll內核事件表中註冊socket上的讀就緒事件②主線程調用epoll_wait等待socket上有數據可讀③當socket上有數據可讀時,epoll_wait通知主線程。主線程從socket循環讀取數據,直到沒有更多數據可讀,然後將讀取到的數據封裝成一個請求對象並插入請求隊列④睡眠在請求隊列上的某個工作線程被喚醒,它獲得請求對象並處理客戶請求,然後往epoll內核事件表中註冊socket上的寫就緒事件⑤主線程調用epoll_wait等到socket可寫⑥當socket可寫時,epoll_wait通知主線程。主線程往socket上寫入服務器處理客戶請求的結果
四、幾種開源庫
下面是幾種使用到上面技術的開源庫:
libevent:
名氣最大,應用最廣泛,歷史悠久的跨平臺事件庫libev:
較 libevent 而言,設計更簡練,性能更好,但對 Windows 支持不夠好;libuv:
開發 node 的過程中需要一個跨平臺的事件庫,他們首選了 libev,但又要支持 Windows,故重新封裝了一套,linux 下用 libev 實現,Windows 下用 IOCP 實現
優先級libevent:激活的事件組織在優先級隊列中,各類事件默認的優先級是相同的,可以通過設置 事件的優先級使其優先被處理libev:也是通過優先級隊列來管理激活的時間,也可以設置事件優先級libuv:也是通過優先級隊列來管理激活的時間,也可以設置事件優先級
事件循環libevent:event_base 用於管理事件libev:激活的事件組織在優先級隊列中,各類事件默認的優先級是相同的,libuv: 可以通 過設置事件的優先級 使其優先被處理
線程安全event_base 和 loop 都不是線程安全的,一個 event_base 或 loop 實例只能在用戶的一個線程 內訪問(一般是主線程),註冊到 event_base 或者 loop 的 event 都是串行訪問的,即每個執 行過程中,會按照優先級順序訪問已經激活的事件,執行其回調函數。所以在僅使用一個 event_base 或 loop 的情況下,回調函數的執行不存在並行關係