GRPC-C++源码分析(四)--ServerCompletionQueue续

2.1.2 grpc_determine_iomgr_platform

看下调用逻辑:

<code>//iomgr_internal.cc
void grpc_determine_iomgr_platform() {
if (iomgr_platform_vtable == nullptr) {
grpc_set_default_iomgr_platform();
}
}/<code>

grpc_set_default_iomgr_platform的实现:

<code>//iomgr_posix.cc
void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
grpc_set_tcp_server_impl(&grpc_posix_tcp_server_vtable);
grpc_set_timer_impl(&grpc_generic_timer_vtable);
grpc_set_pollset_vtable(&grpc_posix_pollset_vtable);
grpc_set_pollset_set_vtable(&grpc_posix_pollset_set_vtable);
grpc_set_resolver_impl(&grpc_posix_resolver_vtable);
grpc_set_iomgr_platform_vtable(&vtable);
}/<code>
  • 先关注grpc_set_iomgr_platform_vtable,从名字上就能看出set了一个叫&vtable的东西
  • 关注它的原因是接下来的grpc_iomgr_platform_init方法马上就会用到
  • 其他的grpc_set_方法也同样很重要,会在后面看到它们的用处

2.1.3 grpc_iomgr_platform_init

<code>void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); }/<code>
  • 关注点:iomgr_platform_vtable,而这个iomgr_platform_vtable就是由2.1.2节中的grpc_set_iomgr_platform_vtable方法赋值的
  • 线索转移到了iomgr_platform_vtable->init()

2.1.4 grpc_iomgr_platform_init----iomgr_platform_vtable->init

iomgr_platform_vtable定义在:

<code>static grpc_iomgr_platform_vtable vtable = {
iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
iomgr_platform_shutdown_background_closure,
iomgr_platform_is_any_background_poller_thread};/<code>
  • iomgr_platform_vtable->init调用的是iomgr_platform_init方法,下面给出了调用路径
GRPC-C++源码分析(四)--ServerCompletionQueue续

  • 上图明确了grpc_iomgr_platform_init方法的终极意义在于赋值给全局变量grpc_event_engine_vtable* g_event_engine
<code>//ev_posix.cc
static const grpc_event_engine_vtable* g_event_engine = nullptr;/<code>
  • try_engine调用了g_factoriesi.factory的方法,返回值赋给了grpc_event_engine_vtable* g_event_engine
  • g_factories定义在ev_posix.cc中
<code>//ev_posix.cc
static event_engine_factory g_factories[] = {
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{ENGINE_HEAD_CUSTOM, nullptr}, {ENGINE_HEAD_CUSTOM, nullptr},
{"epollex", grpc_init_epollex_linux}, {"epoll1", grpc_init_epoll1_linux},
{"poll", grpc_init_poll_posix}, {"none", init_non_polling},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
{ENGINE_TAIL_CUSTOM, nullptr}, {ENGINE_TAIL_CUSTOM, nullptr},
};/<code>
  • 具体选取那个factory是GRPC_POLL_STRATEGY环境变量决定的,它是在grpc_event_engine_init中获取的
<code>//ev_posix.cc
void grpc_event_engine_init(void) {
char* s = gpr_getenv("GRPC_POLL_STRATEGY");
if (s == nullptr) {
s = gpr_strdup("all");
}/<code>
  • 在我的centos7.5上,默认选取的g_factories是:"epollex", grpc_init_epollex_linux
  • grpc_init_epollex_linux方法中return了一个vtable
<code>//ev_epollex_linux.cc 

const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
………………
return &vtable;
}/<code>
  • 一起看下这个vtable的定义
<code>//ev_epollex_linux.cc
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
false,

fd_create,
fd_wrapped_fd,
fd_orphan,
fd_shutdown,
fd_notify_on_read,
fd_notify_on_write,
fd_notify_on_error,
fd_become_readable,
fd_become_writable,
fd_has_errors,
fd_is_shutdown,

pollset_init,
pollset_shutdown,
pollset_destroy,
pollset_work,
pollset_kick,
pollset_add_fd,

pollset_set_create,
pollset_set_unref, // destroy ==> unref 1 public ref
pollset_set_add_pollset,
pollset_set_del_pollset,
pollset_set_add_pollset_set,
pollset_set_del_pollset_set,
pollset_set_add_fd,
pollset_set_del_fd,

is_any_background_poller_thread,
shutdown_background_closure,
shutdown_engine,
};/<code>

如果整个2.1节都没理解也没关系,记住我们有了一个grpc_event_engine_vtable* g_event_engine指针就可以了,这个指针的内容就是上面的static const grpc_event_engine_vtable vtable

2.2 grpc::g_core_codegen_interface

把思路回到第二节开头,不仅g_glip在GrpcLibraryInitializer 中完成了初始化,还有一个全局变量grpc::g_core_codegen_interface也在其中完成了初始化,它会在CompletionQueue的构造函数中用到,再贴一次代码:

<code>//grpc_library.h
class GrpcLibraryInitializer final {
public:
GrpcLibraryInitializer() {
if (grpc::g_glip == nullptr) {
static auto* const g_gli = new GrpcLibrary();
grpc::g_glip = g_gli;
}
if (grpc::g_core_codegen_interface == nullptr) {
static auto* const g_core_codegen = new CoreCodegen();
grpc::g_core_codegen_interface = g_core_codegen;
}
}/<code>

3 CompletionQueue

回到第一节的图:

GRPC-C++源码分析(四)--ServerCompletionQueue续

  • 第2节已经分析了GrpcLibraryCodegen
  • 本节开始分析CompletionQueue

再来看看CompletionQueue的构造方法实现:

<code>  CompletionQueue(const grpc_completion_queue_attributes& attributes) {
cq_ = g_core_codegen_interface->grpc_completion_queue_create(
g_core_codegen_interface->grpc_completion_queue_factory_lookup(
&attributes),
&attributes, NULL);
InitialAvalanching(); // reserve this for the future shutdown
}/<code>
  • 终极目的是返回一个cq_
  • 先调用g_core_codegen_interface->grpc_completion_queue_factory_lookup返回一个grpc_completion_queue_factory* factory
  • 再调用g_core_codegen_interface->grpc_completion_queue_create返回cq_

3.1 grpc_completion_queue_factory_lookup

2.2节中已经看到g_core_codegen_interface的初始化,grpc_completion_queue_factory_lookup在父类CoreCodegenInterface中是个纯虚函数,具体实现在CoreCodegen类中

<code>//core_codegen.h
class CoreCodegen final : public CoreCodegenInterface {
private:
virtual const grpc_completion_queue_factory*
grpc_completion_queue_factory_lookup(
const grpc_completion_queue_attributes* attributes) override;
virtual grpc_completion_queue* grpc_completion_queue_create(
const grpc_completion_queue_factory* factory,
const grpc_completion_queue_attributes* attributes,
void* reserved) override;/<code>
GRPC-C++源码分析(四)--ServerCompletionQueue续

  • lookup方法最终返回一个grpc_completion_queue_factory g_default_cq_factory变量

3.2 grpc_completion_queue_create

GRPC-C++源码分析(四)--ServerCompletionQueue续

  • grpc_completion_queue_create中调用了factory->vtable->create,其中factory就是3.1节中grpc_completion_queue_factory_lookup返回的变量g_default_cq_factory
  • 图中看到grpc_completion_queue_create最终调用的是grpc_completion_queue_create_internal方法
<code>//completion_queue.cc
grpc_completion_queue* grpc_completion_queue_create_internal(
grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type,
grpc_experimental_completion_queue_functor* shutdown_callback) {
GPR_TIMER_SCOPE("grpc_completion_queue_create_internal", 0);

//最终返回的的那个cq
grpc_completion_queue* cq;

GRPC_API_TRACE(
"grpc_completion_queue_create_internal(completion_type=%d, "
"polling_type=%d)",
2, (completion_type, polling_type));

//这两个vtable是cq的核心内容
const cq_vtable* vtable = &g_cq_vtable[completion_type];
const cq_poller_vtable* poller_vtable =
&g_poller_vtable_by_poller_type[polling_type];

grpc_core::ExecCtx exec_ctx;
GRPC_STATS_INC_CQS_CREATED();

//cq的初始化也很魔性,除了本身的大小,还额外预留了vtable->data_size + poller_vtable->size()
cq = static_cast<grpc>(
gpr_zalloc(sizeof(grpc_completion_queue) + vtable->data_size +
poller_vtable->size()));

//赋值给了cq里的变量
cq->vtable = vtable;
cq->poller_vtable = poller_vtable;

/* One for destroy(), one for pollset_shutdown */
gpr_ref_init(&cq->owning_refs, 2);

//vtable的初始化

poller_vtable->init(POLLSET_FROM_CQ(cq), &cq->mu);
vtable->init(DATA_FROM_CQ(cq), shutdown_callback);

GRPC_CLOSURE_INIT(&cq->pollset_shutdown_done, on_pollset_shutdown_done, cq,
grpc_schedule_on_exec_ctx);
return cq;
}/<grpc>/<code>
  • completion_type和polling_type都是0,,是在1.1 节初始化ServerCompletionQueue时候定义的
  • g_cq_vtable和g_poller_vtable_by_poller_type都定义在completion_queue.cc文件中,很容易定位到
  • 这里需要解释的是cq的初始化和两个vtable的init调用(poller_vtable->init和vtable->init)


分享到:


相關文章: