微信phxrpc源碼分析(六)--收發流管理

收發流管理在網絡庫中處於一個非常重要的位置,與其他rpc框架不同,phxrpc在這方面可謂獨闢蹊徑,將socket與iostream和streambuf結合起來,完成了緩衝區的設計。

1 整體流程介紹

再來複習下phxrpc的收發流邏輯在哪兒

微信phxrpc源碼分析(六)--收發流管理

  • 主要邏輯在HshaServerIO::IOFunc中
  • IOFunc裡的msg_handler是在HshaServer中註冊的,這兒的結構會在下面第2節具體說明
  • msg_handler->RecvRequest(stream, req)開始接收請求內容,注意這裡的req出來的時候已經變成了HttpRequest,第3節有說明
  • 請求接收完後會放到請求隊列,Worker::WorkerLogic會拉取請求進行業務邏輯處理,生成結果
  • IOFunc拿到結果後,表面看是使用BaseResponse的Send方法發送結果,實際使用的是HttpResponse,第4節有說明

2 msg_handler

先看下代碼中的msg_handler是怎麼初始化的

<code>void HshaServerIO::IOFunc(int accepted_fd) {
………………
auto msg_handler(msg_handler_factory_->Create());
int ret{msg_handler->RecvRequest(stream, req)};/<code>

msg_handler_factory_來源於:

<code>class HshaServer {
public:
HshaServer(const HshaServerConfig &config, const Dispatch_t &dispatch,void *args,
phxrpc::BaseMessageHandlerFactoryCreateFunc msg_handler_factory_create_func =
[]()->std::unique_ptr<:httpmessagehandlerfactory> {
return std::unique_ptr<:httpmessagehandlerfactory>(new phxrpc::HttpMessageHandlerFactory);
});/<code>

所以msg_handler是HttpMessageHandlerFactory使用create方法返回的,下面的示意圖有利於理解他們之間的關係:

微信phxrpc源碼分析(六)--收發流管理

當代碼調用:msg_handler->RecvRequest(stream, req)時候,其實調用的是HttpMessageHandler.RecvRequest

3 輸入流說明:HttpMessageHandler.RecvRequest

在說這個函數前,先看下面這段代碼,否則下面的流程不好理解了

<code>void HshaServerIO::IOFunc(int accepted_fd) {
cout< UThreadSocket_t *socket{scheduler_->CreateSocket(accepted_fd)};
UThreadTcpStream stream;
stream.Attach(socket);//關鍵點在這裡,Attach裡將streambuf與iostream進行綁定/<code>

stream.Attach裡調用了NewRdbuf方法

<code>void BaseTcpStream::NewRdbuf(BaseTcpStreamBuf * buf) {
std::streambuf * old = rdbuf(buf);
delete old;
}/<code>

BaseTcpStream繼承了std::iosream,BaseTcpStreamBuf繼承了std::streambuf,上面的rdbuf方法將對iostream的操作轉到了streambuf上

進入正題,來一起看下phxrpc怎樣處理輸入流。

微信phxrpc源碼分析(六)--收發流管理

  • 先來看HttpMessageHandler::RecvRequest方法中是怎樣將BaseRequest變為HttpRequest的
<code>int HttpMessageHandler::RecvRequest(BaseTcpStream &socket, BaseRequest *&req) {
HttpRequest *http_req{new HttpRequest};//這裡新建了HttpRequest

int ret{HttpProtocol::RecvReq(socket, http_req)};
if (0 == ret) {
req_ = req = http_req;//這裡將http_req指針指向了BaseRequest指針
version_ = (http_req->version() != nullptr ? http_req->version() :"");
keep_alive_ = http_req->keep_alive();
} else {
delete http_req;
http_req = nullptr;
}

return ret;
}/<code>
  • 調用順序依照箭頭數字所示,關鍵點在兩個地方:一是箭頭2的getline是怎樣調到箭頭3的underflow方法;二是箭頭3的underflow方法是怎樣調用到箭頭4的precv方法。
  • precv方法調用了UThreadRecv方法,完成了文件描述的符的讀取工作

4 輸出流說明:HttpResponse.Send(socket)

微信phxrpc源碼分析(六)--收發流管理

與上圖對應的代碼如下:

<code>int HttpResponse::Send(BaseTcpStream &socket) const {
socket << version() << " " << status_code() << " " << reason_phrase() << "\\r\\n";

for (size_t i{0}; GetHeaderCount() > i; ++i) {
socket << GetHeaderName(i) << ": " << GetHeaderValue(i) << "\\r\\n";
}

if (content().size() > 0) {
if (nullptr == GetHeaderValue(HttpMessage::HEADER_CONTENT_LENGTH)) {
socket << HttpMessage::HEADER_CONTENT_LENGTH << ": " << content().size() << "\\r\\n";
}
}

socket << "\\r\\n";

if (content().size() > 0)
socket << content();

if (socket.flush().good()) {//這裡flush中調用了紅色箭頭1指向的sync方法,後續又調用了紅色箭頭2的psend方法
return 0;
} else {
return static_cast(socket.LastError());
}
}
/<code>

psend方法裡調用了UThreadSend,完成了對文件描述符的寫操作

這裡額外說明下,為什麼HshaServerIO::IOFuncBaseResponse.Send是HttpResponse.Send方法實現的

<code>BaseResponse *resp{(BaseResponse *)UThreadGetArgs(*socket)};//IOFunc中,resp的來源是UThreadGetArgs
if(!resp->fake()){
ret = resp->Send(stream);
…………/<code>

很明顯,有UThreadGetArgs,就有UThreadSetArgs

<code>UThreadSocket_t *HshaServerIO::ActiveSocketFunc() {
while (data_flow_->CanPluckResponse()) {
void *args{nullptr};
BaseResponse *resp{nullptr};
//(3)在這裡resp來源
int queue_wait_time_ms{data_flow_->PluckResponse(args, resp)};
if (!resp) {
// break out
return nullptr;
}
hsha_server_stat_->outqueue_wait_time_costs_ += queue_wait_time_ms
;
hsha_server_stat_->outqueue_wait_time_costs_count_++;

UThreadSocket_t *socket{(UThreadSocket_t *)args};
if (socket != nullptr && IsUThreadDestory(*socket)) {
// socket aready timeout
//log(LOG_ERR, "%s socket aready timeout", __func__);
UThreadClose(*socket);
free(socket);
delete resp;

continue;
}
//(1)UThreadSetArgs在這裡,可以判斷,這裡的resp已經是HttpResponse類型了
//(2)這裡resp的來源在上面(3)中:data_flow_->PluckResponse
UThreadSetArgs(*socket, (void *)resp);

return socket;
}

return nullptr;
} /<code>

所以繼續追查誰把resp放到了data_flow_裡

<code>void Worker::WorkerLogic(void *args, BaseRequest *req, int queue_wait_time_ms) {
………………
//(3)resp來源於req->GenResponse()
BaseResponse *resp{req->GenResponse()};
………………
//(1)這裡把resp放到了data_flow_中
//(2)這裡resp的來源看上面(3)

pool_->data_flow_->PushResponse(args, resp);/<code>

最後看看req->GenResponse()方法就一切真相大白了!

<code>BaseResponse *HttpRequest::GenResponse() const {
return new HttpResponse;//看到了吧,這裡返回的是HttpResponse類型
}/<code>


分享到:


相關文章: