Dubbo RPC在consumer端是如何跑起來的

Dubbo RPC在consumer端是如何跑起來的

Dubbo作為一款高性能Java RPC框架,RPC是其最重要的流程之一。Dubbo RPC涉及到consumer端和provider端的流程,本文主要分析consumer端的RPC流程實現,包括集群容錯、dubbo路由、負載均衡、Filter處理鏈、DubboInvoker和RPC結果返回等流程。

在分析dubbo consumer端的RPC實現之前,首先來看下dubbo的整體架構,有個整體概念。

dubbo架構

dubbo架構圖如下:

Dubbo RPC在consumer端是如何跑起來的

  • 註冊中心負責服務地址的註冊與查找,相當於目錄服務,服務提供者和消費者只在啟動時與註冊中心交互,註冊中心不轉發請求,壓力較小;
  • 監控中心負責統計各服務調用次數,調用時間等,統計先在內存彙總後每分鐘一次發送到監控中心服務器,並以報表展示;
  • 服務提供者向註冊中心註冊其提供的服務,並彙報調用時間到監控中心,此時間不包含網絡開銷;
  • 服務消費者向註冊中心獲取服務提供者地址列表,並根據負載算法直接調用提供者,同時彙報調用時間到監控中心,此時間包含網絡開銷;
  • 註冊中心,服務提供者,服務消費者三者之間均為長連接(默認情況下分別只有1個長連接,因為consume和provider網絡連接都使用了IO複用,性能上還是OK的),監控中心除外;
  • 註冊中心通過長連接感知服務提供者的存在,服務提供者宕機,註冊中心將立即推送事件通知消費者(這裡dubbo和spring cloud是不一樣的,(spring cloud) eureka中consumer是有一個刷新線程來定時從eureka註冊中心拉取服務信息,因為eureka沒有通知機制,而dubbo中的zookeeper有Watcher通知機制);
  • 註冊中心和監控中心全部宕機,不影響已運行的提供者和消費者,消費者在本地緩存了提供者列表;
  • 註冊中心和監控中心都是可選的,服務消費者可以直連服務提供者;

注意,dubbo服務調用連接是長連接,dubbo服務調用是小數據量的通信,針對每一次RPC通信,都會生成一個唯一的id來標識,這樣就能區分出一次RPC請求對應的RPC響應了。

RPC流程

由於RPC流程涉及consumer和provider端,先來看一下在二者之間RPC流程的線程模型圖,有個初步認識:

Dubbo RPC在consumer端是如何跑起來的

consumer端的Dubbo業務線程池,可以是cached或者fixed類型的線程池,該線程的業務邏輯主要是讀取返回結果,然後響應對應defaultFuture,默認是cached類型線程池。線程池配置可以通過SPI方式來配置。provider端的Dubbo業務線程池,默認是fixed類型線程池。

RPC流程圖

Dubbo RPC在consumer端是如何跑起來的

以如下consumer端代碼為例開始進行講解:

<code>DemoService demoService = (DemoService) context.getBean(

"demoService"

); 

while

 (

true

) {     

try

 {         String hello = demoService.sayHello(

"world"

);          System.

out

.println(hello);          System.

in

.read();     } 

catch

 (Throwable throwable) {         throwable.printStackTrace();     } }/<code>

當consumer端調用一個@Reference的RPC服務,在consumer端的cluster層首先從Driectory中獲取invocation對應的invokerList,經過Router過濾符合路由策略的invokerList,然後執行LoadBalance,選擇出某個Invoker,最後進行RPC調用操作。

Dubbo RPC在consumer端是如何跑起來的

調用某個Invoker(經過cluter之後)進行RPC時,依次會經過Filter、DubboInvoker、HeaderExchangeClient,將RPC消息類RPCInvocation傳遞到netty channel.eventLoop中。

最後由netty Channel經過Serializer之後將RPC請求發送給provider端。

集群容錯

從上面的RPC執行流程看出,一個重要的流程是集群容錯Cluster,Dubbo提供了多種容錯方案,默認模式為Failover Cluster,也就是失敗重試。目前dubbo支持的集群容錯策略如下:

  • Failover Cluster:失敗重試,當服務消費方調用服務提供者失敗後,會自動切換到其他服務提供者服務器進行重試,這通常用於讀操作或者具有冪等的寫操作。dubbo默認重試2次,可通過配置retries屬性來設置重試次數,retries支持接口和方法級別配置。
  • Failfast Cluster:快速失敗,當服務消費方調用服務提供者失敗後,立即報錯,也就是隻調用一次。通常,這種模式用於非冪等性的寫操作。
  • Failsafe Cluster:安全失敗,當服務消費者調用服務出現異常時,直接忽略異常,異常返回null。這種模式通常用於寫入審計日誌等操作。
  • Failback Cluster:失敗自動恢復,當服務消費端調用服務出現異常後,在後臺記錄失敗的請求,並按照一定的策略後期再進行重試。這種模式通常用於消息通知操作。
  • Forking Cluster:並行調用,當消費方調用一個接口方法後,Dubbo Client會並行調用多個服務提供者的服務,只要其中有一個成功即返回。這種模式通常用於實時性要求較高的讀操作,但需要浪費更多服務資源。可以通過forks設置並行數,注意這種很容易造成寫放大,對服務端性能要求較高。
  • Broadcast Cluster:廣播調用,當消費者調用一個接口方法後,Dubbo Client會逐個調用所有服務提供者,任意一臺服務器調用異常則這次調用就標誌失敗。這種模式通常用於通知所有提供者更新緩存或日誌等本地資源信息。

Directory

Directory是RPC服務類的目錄服務,一個服務接口對應一個Directory實例,比如com.xxx.xx.dubbo.api.HelloService就是一個服務接口。

<code>

public

 

interface

 

Directory

<

T

extends

 

Node

 

{     

Class 

getInterface

()

;     List> list(Invocation invocation) 

throws

 RpcException; }/<code>

Directory有2個實現類,一個是StaticDirectory,一個是RegistryDirectory。前者是靜態類型,其內部的Invocation在初始化時就已確定(public StaticDirectory(URL url, List> invokers, List routers),運行過程中不再變化;後者是動態類型,實現了接口NotifyListener,notify時動態更新invokers。Directory的重點在於list(invocation)和notify更新機制,list(invocation)就是獲取invokerList過程。

Router

Router是RPC的路由策略,通過Directory獲取到invokerList之後,會執行對應的路由策略。Dubbo的默認路由策略是MockInvokersSelector。Dubbo路由策略接口是Router,其有3個實現類,Router的作用就是根據invocation和invokerList,選擇出符合路由策略的invokerList。

Dubbo RPC在consumer端是如何跑起來的

LoadBalance

LoadBalance是RPC的負載均衡策略,通過Directory獲取到invokerList並且執行對應的路由策略之後,就會執行LoadBalance(負載均衡)了。

<code> (RandomLoadBalance.NAME)

public

 

interface

 

LoadBalance

 

{           (

"loadbalance"

)      

Invoker 

select

(List> invokers, URL url, Invocation invocation)

 

throws

 RpcException

; }/<code>
Dubbo RPC在consumer端是如何跑起來的

  • RandomLoadBalance:隨機選擇,Dubbo的默認策略,如果Invoker的weiget都一樣,則就是標準的隨機策略;如果不一樣,那就是按照權重比例的隨機策略。
  • RoundRobinLoadBalance:輪詢策略,如果Invoker的weiget都一樣,則就是標準的輪詢策略;如果不一樣,那就是按照權重比例的輪詢策略,這裡的處理機制和有權重的RandomLoadBalance是類似的。比如有2個Invoker,第一個weight為1,第二個weight為2,則一個輪詢週期內,第一個會輪詢1次,第二個會輪詢2次。
  • LeastActiveLoadBalance:最少活躍數,最少活躍數策略使慢的提供者收到更少請求,因為越慢的提供者的調用前後計數差會越大。初始時針對一次RPC調用(具體是到method)各個Invoker的active都為0,這時隨機選擇。對某個Invoker進行RPC時,其對應的active+1,當RPC結束時其對應的active-1。當各個Invoker的active不一致時,選擇最少的那個Invoker進行調用。當各個Invoker的active一致時,也就是隨機策略,如果weight不一致,則按照權重比例的隨機策略。
  • ConsistentHashLoadBalance:一致性hash,相同參數的請求總是發到同一提供者。當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點(默認虛擬節點160個),平攤到其它提供者,不會引起劇烈變動。算法參見:http://en.wikipedia.org/wiki/Consistent_hashing,缺省只對第一個參數 Hash,如果要修改,可自行配置。

Filter處理

filter處理機制使用的是調用鏈模式,啟動流程中會初始化該filter鏈,對應邏輯是ProtocolFilterWrapper.buildInvokerChain()方法,filter鏈默認包括幾個filter,依次是ConsumerContextFilter(設置上下文信息)、FutureFilter(執行某些hook方法)和MonitorFilter(monitor RPC統計信息)等。

DubboInvoker

DubboInvoker的主要邏輯就是從provider的長連接中選擇某個連接,然後根據不同的策略(同步/異步/單向)來進行操作。

<code> 

protected

 Result 

doInvoke

(

final

 Invocation invocation)

 

throws

 Throwable 

{     RpcInvocation inv = (RpcInvocation) invocation;     

final

 String methodName = RpcUtils.getMethodName(invocation);     inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());     inv.setAttachment(Constants.VERSION_KEY, version);     ExchangeClient currentClient;     

if

 (clients.length == 

1

) {                  currentClient = clients[

0

];     } 

else

 {          currentClient = clients[index.getAndIncrement() % clients.length];     }     

try

 {                  

boolean

 isAsync = RpcUtils.isAsync(getUrl(), invocation);         

boolean

 isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);         

boolean

 isOneway = RpcUtils.isOneway(getUrl(), invocation);         

int

 timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);         

if

 (isOneway) {                          

boolean

 isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, 

false

);             currentClient.send(inv, isSent);             RpcContext.getContext().setFuture(

null

);             

return

 

new

 RpcResult();         } 

else

 

if

 (isAsync) {                                       ResponseFuture future = currentClient.request(inv, timeout);                          FutureAdapter futureAdapter = 

new

 FutureAdapter<>(future);             RpcContext.getContext().setFuture(futureAdapter);             Result result;             

if

 (isAsyncFuture) {                                  result = 

new

 AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), 

false

);             } 

else

 {                 result = 

new

 SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), 

false

);             }             

return

 result;         } 

else

 {                          RpcContext.getContext().setFuture(

null

);             

return

 (Result) currentClient.request(inv, timeout).get();         }     } 

catch

 (TimeoutException e) {     } 

catch

 (RemotingException e) {     } }/<code>

注意,dubbo 2.7版本的DubboInvoker.doInvoke流程已和上述流程不太一樣了,不過實現思路是類似的。

最後會調用channel.writeAndFlush,之後的流程就是netty channel內部的處理流程了,這部分暫可不關注,只需要知道後續流程會走到我們設定的NettyHandler中對應的方法中,比如channel.write就會走到NettyHandler.writeRequested方法中邏輯,也就是針對RPC請求數據進行序列化操作。

數據序列化操作是由netty ChannelHandler來處理的,對應的初始化邏輯如下:

<code>bootstrap.handler(

new

 ChannelInitializer() {          

protected

 

void

 

initChannel

(Channel ch)

 

throws

 Exception 

{         NettyCodecAdapter adapter = 

new

 NettyCodecAdapter(getCodec(), getUrl(), NettyClient.

this

);         ch.pipeline()                 .addLast(

"decoder"

, adapter.getDecoder())                 .addLast(

"encoder"

, adapter.getEncoder())                      .addLast(

"handler"

, nettyClientHandler);     } });/<code>

在consumer與provider建立連接之後,initChannel是會添加對應的encoder、decoder。

RPC結果處理

接收到provider端返回的RPC結果進行反序列化之後,就該將結果數據提交到consuemr端dubbo業務線程池了,如下所示:

<code> 

public

 

void

 

channelRead

(ChannelHandlerContext ctx, Object msg)

 

throws

 Exception 

{     NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);     

try

 {         handler.received(channel, msg);     } 

finally

 {         NettyChannel.removeChannelIfDisconnected(ctx.channel());     } }

public

 

void

 

received

(Channel channel, Object message)

 

throws

 RemotingException 

{               ExecutorService cexecutor = getExecutorService();     cexecutor.execute(

new

 ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); }/<code>

DubboClientHandler線程池裡的邏輯比較簡單,首先根據response.getId()獲取從FUTURES(Map)中獲取該次通信對應的DefaultFuture,將response設置到DefaultFuture中並喚醒等待的線程。

<code> 

public

 

static

 

void

 

received

(

Channel channel, Response response

)

 {     

try

 {         DefaultFuture future = FUTURES.

remove

(response.getId());         

if

 (future != 

null

) {             future.doReceived(response);         }     } 

finally

 {         CHANNELS.

remove

(response.getId());     } }/<code>

當喚醒在DefaultFuture阻塞的線程(也就是業務線程)之後,也就是以下代碼返回了:

<code>

DubboInvoker

.doInvoke

 

return

 (

Result

currentClient

.request

(

inv

timeout

)

.get

();/<code>

獲取到Response之後,就獲取到了provider返回結果,也就是整個RPC的consumer端流程結束了。

小結

dubbo RPC流程基本上可以說是比較完備了,比如集群容錯、請求路由、負載均衡、序列化等等,這些能力其實就是微服務調用的通用能力,比如SpringCloud中也是需要這一套能力的。這種通用能力能否下放,讓業務應用更加純粹的專注於業務呢?

解決方案是有的,那就是近兩年比較流行的service mesh概念,其增加了SideCar代理,將服務調用中涉及到的通用服務治理能力放到SideCar代理中完成,讓開發者更加專注於業務邏輯試下,而非dubbo或者SpringCloud這種框架級實現的服務治理。


推薦閱讀:


分享到:


相關文章: