Dubbo作為一款高性能Java RPC框架,RPC是其最重要的流程之一。Dubbo RPC涉及到consumer端和provider端的流程,本文主要分析consumer端的RPC流程實現,包括集群容錯、dubbo路由、負載均衡、Filter處理鏈、DubboInvoker和RPC結果返回等流程。
在分析dubbo consumer端的RPC實現之前,首先來看下dubbo的整體架構,有個整體概念。
dubbo架構
dubbo架構圖如下:
- 註冊中心負責服務地址的註冊與查找,相當於目錄服務,服務提供者和消費者只在啟動時與註冊中心交互,註冊中心不轉發請求,壓力較小;
- 監控中心負責統計各服務調用次數,調用時間等,統計先在內存彙總後每分鐘一次發送到監控中心服務器,並以報表展示;
- 服務提供者向註冊中心註冊其提供的服務,並彙報調用時間到監控中心,此時間不包含網絡開銷;
- 服務消費者向註冊中心獲取服務提供者地址列表,並根據負載算法直接調用提供者,同時彙報調用時間到監控中心,此時間包含網絡開銷;
- 註冊中心,服務提供者,服務消費者三者之間均為長連接(默認情況下分別只有1個長連接,因為consume和provider網絡連接都使用了IO複用,性能上還是OK的),監控中心除外;
- 註冊中心通過長連接感知服務提供者的存在,服務提供者宕機,註冊中心將立即推送事件通知消費者(這裡dubbo和spring cloud是不一樣的,(spring cloud) eureka中consumer是有一個刷新線程來定時從eureka註冊中心拉取服務信息,因為eureka沒有通知機制,而dubbo中的zookeeper有Watcher通知機制);
- 註冊中心和監控中心全部宕機,不影響已運行的提供者和消費者,消費者在本地緩存了提供者列表;
- 註冊中心和監控中心都是可選的,服務消費者可以直連服務提供者;
注意,dubbo服務調用連接是長連接,dubbo服務調用是小數據量的通信,針對每一次RPC通信,都會生成一個唯一的id來標識,這樣就能區分出一次RPC請求對應的RPC響應了。
RPC流程
由於RPC流程涉及consumer和provider端,先來看一下在二者之間RPC流程的線程模型圖,有個初步認識:
consumer端的Dubbo業務線程池,可以是cached或者fixed類型的線程池,該線程的業務邏輯主要是讀取返回結果,然後響應對應defaultFuture,默認是cached類型線程池。線程池配置可以通過SPI方式來配置。provider端的Dubbo業務線程池,默認是fixed類型線程池。
RPC流程圖
以如下consumer端代碼為例開始進行講解:
<code>DemoService demoService = (DemoService) context.getBean("demoService"
);while
(true
) {try
{ String hello = demoService.sayHello("world"
); System.out
.println(hello); System.in
.read(); }catch
(Throwable throwable) { throwable.printStackTrace(); } }/<code>
當consumer端調用一個@Reference的RPC服務,在consumer端的cluster層首先從Driectory中獲取invocation對應的invokerList,經過Router過濾符合路由策略的invokerList,然後執行LoadBalance,選擇出某個Invoker,最後進行RPC調用操作。
調用某個Invoker(經過cluter之後)進行RPC時,依次會經過Filter、DubboInvoker、HeaderExchangeClient,將RPC消息類RPCInvocation傳遞到netty channel.eventLoop中。
最後由netty Channel經過Serializer之後將RPC請求發送給provider端。
集群容錯
從上面的RPC執行流程看出,一個重要的流程是集群容錯Cluster,Dubbo提供了多種容錯方案,默認模式為Failover Cluster,也就是失敗重試。目前dubbo支持的集群容錯策略如下:
- Failover Cluster:失敗重試,當服務消費方調用服務提供者失敗後,會自動切換到其他服務提供者服務器進行重試,這通常用於讀操作或者具有冪等的寫操作。dubbo默認重試2次,可通過配置retries屬性來設置重試次數,retries支持接口和方法級別配置。
- Failfast Cluster:快速失敗,當服務消費方調用服務提供者失敗後,立即報錯,也就是隻調用一次。通常,這種模式用於非冪等性的寫操作。
- Failsafe Cluster:安全失敗,當服務消費者調用服務出現異常時,直接忽略異常,異常返回null。這種模式通常用於寫入審計日誌等操作。
- Failback Cluster:失敗自動恢復,當服務消費端調用服務出現異常後,在後臺記錄失敗的請求,並按照一定的策略後期再進行重試。這種模式通常用於消息通知操作。
- Forking Cluster:並行調用,當消費方調用一個接口方法後,Dubbo Client會並行調用多個服務提供者的服務,只要其中有一個成功即返回。這種模式通常用於實時性要求較高的讀操作,但需要浪費更多服務資源。可以通過forks設置並行數,注意這種很容易造成寫放大,對服務端性能要求較高。
- Broadcast Cluster:廣播調用,當消費者調用一個接口方法後,Dubbo Client會逐個調用所有服務提供者,任意一臺服務器調用異常則這次調用就標誌失敗。這種模式通常用於通知所有提供者更新緩存或日誌等本地資源信息。
Directory
Directory是RPC服務類的目錄服務,一個服務接口對應一個Directory實例,比如com.xxx.xx.dubbo.api.HelloService就是一個服務接口。
<code>public
interface
Directory
<T
>extends
Node
{Class
getInterface
()
; List> list(Invocation invocation)throws
RpcException; }/<code>
Directory有2個實現類,一個是StaticDirectory,一個是RegistryDirectory。前者是靜態類型,其內部的Invocation在初始化時就已確定(public StaticDirectory(URL url, List> invokers, List routers),運行過程中不再變化;後者是動態類型,實現了接口NotifyListener,notify時動態更新invokers。Directory的重點在於list(invocation)和notify更新機制,list(invocation)就是獲取invokerList過程。
Router
Router是RPC的路由策略,通過Directory獲取到invokerList之後,會執行對應的路由策略。Dubbo的默認路由策略是MockInvokersSelector。Dubbo路由策略接口是Router,其有3個實現類,Router的作用就是根據invocation和invokerList,選擇出符合路由策略的invokerList。
LoadBalance
LoadBalance是RPC的負載均衡策略,通過Directory獲取到invokerList並且執行對應的路由策略之後,就會執行LoadBalance(負載均衡)了。
<code> (RandomLoadBalance.NAME)public
interface
LoadBalance
{ ("loadbalance"
)Invoker
select
(List> invokers, URL url, Invocation invocation)
throws
RpcException; }/<code>
- RandomLoadBalance:隨機選擇,Dubbo的默認策略,如果Invoker的weiget都一樣,則就是標準的隨機策略;如果不一樣,那就是按照權重比例的隨機策略。
- RoundRobinLoadBalance:輪詢策略,如果Invoker的weiget都一樣,則就是標準的輪詢策略;如果不一樣,那就是按照權重比例的輪詢策略,這裡的處理機制和有權重的RandomLoadBalance是類似的。比如有2個Invoker,第一個weight為1,第二個weight為2,則一個輪詢週期內,第一個會輪詢1次,第二個會輪詢2次。
- LeastActiveLoadBalance:最少活躍數,最少活躍數策略使慢的提供者收到更少請求,因為越慢的提供者的調用前後計數差會越大。初始時針對一次RPC調用(具體是到method)各個Invoker的active都為0,這時隨機選擇。對某個Invoker進行RPC時,其對應的active+1,當RPC結束時其對應的active-1。當各個Invoker的active不一致時,選擇最少的那個Invoker進行調用。當各個Invoker的active一致時,也就是隨機策略,如果weight不一致,則按照權重比例的隨機策略。
- ConsistentHashLoadBalance:一致性hash,相同參數的請求總是發到同一提供者。當某一臺提供者掛時,原本發往該提供者的請求,基於虛擬節點(默認虛擬節點160個),平攤到其它提供者,不會引起劇烈變動。算法參見:http://en.wikipedia.org/wiki/Consistent_hashing,缺省只對第一個參數 Hash,如果要修改,可自行配置。
Filter處理
filter處理機制使用的是調用鏈模式,啟動流程中會初始化該filter鏈,對應邏輯是ProtocolFilterWrapper.buildInvokerChain()方法,filter鏈默認包括幾個filter,依次是ConsumerContextFilter(設置上下文信息)、FutureFilter(執行某些hook方法)和MonitorFilter(monitor RPC統計信息)等。
DubboInvoker
DubboInvoker的主要邏輯就是從provider的長連接中選擇某個連接,然後根據不同的策略(同步/異步/單向)來進行操作。
<code>protected
ResultdoInvoke
(
final
Invocation invocation)throws
Throwable { RpcInvocation inv = (RpcInvocation) invocation;final
String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient;if
(clients.length ==1
) { currentClient = clients[0
]; }else
{ currentClient = clients[index.getAndIncrement() % clients.length]; }try
{boolean
isAsync = RpcUtils.isAsync(getUrl(), invocation);boolean
isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);boolean
isOneway = RpcUtils.isOneway(getUrl(), invocation);int
timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if
(isOneway) {boolean
isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY,false
); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null
);return
new
RpcResult(); }else
if
(isAsync) { ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter futureAdapter =new
FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result;if
(isAsyncFuture) { result =new
AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(),false
); }else
{ result =new
SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(),false
); }return
result; }else
{ RpcContext.getContext().setFuture(null
);return
(Result) currentClient.request(inv, timeout).get(); } }catch
(TimeoutException e) { }catch
(RemotingException e) { } }/<code>
注意,dubbo 2.7版本的DubboInvoker.doInvoke流程已和上述流程不太一樣了,不過實現思路是類似的。
最後會調用channel.writeAndFlush,之後的流程就是netty channel內部的處理流程了,這部分暫可不關注,只需要知道後續流程會走到我們設定的NettyHandler中對應的方法中,比如channel.write就會走到NettyHandler.writeRequested方法中邏輯,也就是針對RPC請求數據進行序列化操作。
數據序列化操作是由netty ChannelHandler來處理的,對應的初始化邏輯如下:
<code>bootstrap.handler(new
ChannelInitializer() {protected
void
initChannel
(Channel ch)
throws
Exception { NettyCodecAdapter adapter =new
NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this
); ch.pipeline() .addLast("decoder"
, adapter.getDecoder()) .addLast("encoder"
, adapter.getEncoder()) .addLast("handler"
, nettyClientHandler); } });/<code>
在consumer與provider建立連接之後,initChannel是會添加對應的encoder、decoder。
RPC結果處理
接收到provider端返回的RPC結果進行反序列化之後,就該將結果數據提交到consuemr端dubbo業務線程池了,如下所示:
<code>public
void
channelRead
(ChannelHandlerContext ctx, Object msg)
throws
Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);try
{ handler.received(channel, msg); }finally
{ NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }public
void
received
(Channel channel, Object message)
throws
RemotingException { ExecutorService cexecutor = getExecutorService(); cexecutor.execute(new
ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); }/<code>
DubboClientHandler線程池裡的邏輯比較簡單,首先根據response.getId()獲取從FUTURES(Map)中獲取該次通信對應的DefaultFuture,將response設置到DefaultFuture中並喚醒等待的線程。
<code>public
static
void
received
(Channel channel, Response response
) {try
{ DefaultFuture future = FUTURES.remove
(response.getId());if
(future !=null
) { future.doReceived(response); } }finally
{ CHANNELS.remove
(response.getId()); } }/<code>
當喚醒在DefaultFuture阻塞的線程(也就是業務線程)之後,也就是以下代碼返回了:
<code>DubboInvoker
.doInvoke
return
(Result
)currentClient
.request
(inv
,timeout
).get
();/<code>
獲取到Response之後,就獲取到了provider返回結果,也就是整個RPC的consumer端流程結束了。
小結
dubbo RPC流程基本上可以說是比較完備了,比如集群容錯、請求路由、負載均衡、序列化等等,這些能力其實就是微服務調用的通用能力,比如SpringCloud中也是需要這一套能力的。這種通用能力能否下放,讓業務應用更加純粹的專注於業務呢?
解決方案是有的,那就是近兩年比較流行的service mesh概念,其增加了SideCar代理,將服務調用中涉及到的通用服務治理能力放到SideCar代理中完成,讓開發者更加專注於業務邏輯試下,而非dubbo或者SpringCloud這種框架級實現的服務治理。
推薦閱讀: