dubbo請求處理線程模型實現分析

問題的由來:

如果事件處理的邏輯能迅速完成,並且不會發起新的 IO 請求,比如只是在內存中記個標識,

則直接在 IO 線程上處理更快,因為減少了線程池調度。

但如果事件處理邏輯較慢,或者需要發起新的 IO 請求,比如需要查詢數據庫,則必須派發到

線程池,否則 IO 線程阻塞,將導致不能接收其它請求。

如果用 IO 線程處理事件,又在事件處理過程中發起新的 IO 請求,比如在連接事件中發起登

錄請求,會報“可能引發死鎖”異常,但不會真死鎖。

因此,需要通過不同的派發策略和不同的線程池配置的組合來應對不同的場景。

這裡說的IO線程(以netty為例)是netty啟動服務時指定的boss/worker執行器中的woker線程。

具體配置方式如下兩種:

或者

目前dubbo提供的Dispatcher擴展實現有如下5種實現,默認派發方式是all

all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcherdirect=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatchermessage=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcherexecution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcherconnection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

在分析源碼之前,這裡再溫習下裝飾模式,因為dubbo從交換層到傳輸層通過裝飾模式完成了多消息的接收處理,心跳,線程派發,消息解碼,請求響應消息的處理邏輯。最外層裝飾總優於裡層的方法的調用。

本文雖說是要分析線程派發模型,但會從連接接處理基本handler開始,層層分析包裹在它外層的裝飾類。

裝飾模式類關係圖如下

dubbo請求處理線程模型實現分析

如圖裝飾模式主要包含以下幾種類:

業務接口類,定義要裝飾的業務操作

業務實現類,也就是要被裝飾的類

裝飾類父類,它同樣實現了被裝飾的業務接口,同時它通過構造函數,內部持有一個裝飾接口類型的對象,一般這個對象提供接口方法默認實現。

具體裝飾類,要繼承裝飾類父類,不同的裝飾類,可以重寫父類方式完成具體的裝飾操作。

有時也可以沒有裝飾類父類,直接有裝飾類實現接口完成裝飾。

接下來就從裝飾模式的角度分析源碼

由於dispatcher配置是服務端的,這裡從服務暴露流程中分析dubbo的實現。

具體可以看下DubboProtocol類裡的私有變量

requestHandler,它就是被裝飾的類。

定義如下:

 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { //調用服務端實現方法,返回結果。 public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //獲取暴露的服務代理 Invoker> invoker = getInvoker(channel, inv); //如果是callback 需要處理高版本調用低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //通過代理服務,執行方法 return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override //接受消息處理方法 public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { //調用消息,調用replay reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override //客戶端連接後處理方法 public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override //斷開連接後處理方法 public void disconnected(Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } //調用過程 private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } //從url中創建調用對象 private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, new Class>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } }; 

它到是個匿名類,通過實現抽象類ExchangeHandlerAdapter定義來實例化得到,ExchangeHandlerAdapter繼承關係如下

dubbo請求處理線程模型實現分析

可以看到它和它的祖先類,實現了ChannelHandler接口5個關鍵方法,連接,斷開連接,發送消息,接受消息和異常處理方法。也是rpc調用的常用處理方法。

同時也是線程派發處理關注的方法。

所以ChannelHandler就是,裝飾模式裡的業務接口類。

接下來,就是找裝飾類的過程了。

可以找到requestHandler對象第一被使用是在DubboProtocol的createServer方法中

try { //Exchangers是門面類,裡面封裝了具體交換層實現 server = Exchangers.bind(url, requestHandler);} catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);}
//跟到Exchangers.bind方法public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");//通過spi會走HeaderExchanger的bind邏輯 return getExchanger(url).bind(url, handler); }//HeaderExchanger的bind方法 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //可以看到這時原始handler第一被裝飾 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }

HeaderExchangeHandler裝飾類

可以看下類的繼承關係

dubbo請求處理線程模型實現分析

可以看到HeaderExchangeHandler實現了ChannelHandler接口,符合裝飾模式要求。

看下它的構造函數:

public HeaderExchangeHandler(ExchangeHandler handler) { if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.handler = handler; }

這裡的ExchangeHandler是ChannelHandler子接口,符合裝飾模式通過構造函數持有接口類型對象引用。

下面看下它對主要幾個rpc方法的裝飾實現:

//連接處理邏輯 public void connected(Channel channel) throws RemotingException { //添加一些心跳時間參數 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //通過被包裝類對應方法處理 handler.connected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } //斷開邏輯, public void disconnected(Channel channel) throws RemotingException { //添加一些心跳時間參數 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //通過被包裝類對應方法處理 handler.disconnected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } //發送數據 public void sent(Channel channel, Object message) throws RemotingException { Throwable exception = null; try { channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //調用被包裝類對應方法處理。 handler.sent(exchangeChannel, message); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } catch (Throwable t) { exception = t; } //發送消息,若是請求消息,有個異步發送重試邏輯 if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); } if (exception != null) { if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else if (exception instanceof RemotingException) { throw (RemotingException) exception; } else { throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(), exception.getMessage(), exception); } } } /*** * 接受請求數據,通過handleRequest方法處理後得到處理結果。 * @param channel channel. * @param message message. * @throws RemotingException */ public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) {//是往返消息,調用私有方法handleRequest處理得到結果 Response response = handleRequest(exchangeChannel, request); channel.send(response); } else {//不需要回復的消息調用 handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { //處理響應消息的邏輯 handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } //異常處理 public void caught(Channel channel, Throwable exception) throws RemotingException { if (exception instanceof ExecutionException) { ExecutionException e = (ExecutionException) exception; Object msg = e.getRequest(); if (msg instanceof Request) { Request req = (Request) msg; if (req.isTwoWay() && !req.isHeartbeat()) {//有往返要求,就回復消息 Response res = new Response(req.getId(), req.getVersion()); res.setStatus(Response.SERVER_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); return; } } } ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //調用對應被裝飾類方法 handler.caught(exchangeChannel, exception); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }

HeaderExchangeHandler裝飾類,在Request/Response層面定義了請求響應消息的處理邏輯。

第二個裝飾類DecodeHandler

通過源碼可以看到DecodeHandler它和它的父類AbstractChannelHandlerDelegate共同完成了對ChannelHandler接口方法的裝飾,看下DecodeHandler具體裝飾的received方法:

 public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); }

DecodeHandler類如它的名稱,主要通過對received方法的裝飾處理,完成完成消息解碼的處理。

接著

return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));

這句繼續跟蹤方法

//Transporters.bind方法 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); }//根據spi 這裡具體走NettyTransporter.bind方法 return getTransporter().bind(url, handler); } //NettyTransporter的bind方法 public Server bind(URL url, ChannelHandler listener) throws RemotingException { //這裡是創建NettyServer實例 return new NettyServer(url, listener); } //NettyServer構造器 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { //這裡看下ChannelHandlers.wrap方法 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } //ChannelHandlers.wrap方法 public static ChannelHandler wrap(ChannelHandler handler, URL url) { //調用內部wrapInternal方法 return ChannelHandlers.getInstance().wrapInternal(handler, url); } //wrapInternal方法 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { //這裡終於看到通過spi獲取Dispatcher實現的代碼 //還能看到通過Dispatcher.dispatch方法返回的handler後又經過了兩層裝飾,HeartbeatHandler然後MultiMessageHandler類 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }

這裡再分析下ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension()的代碼實現:

public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher { public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; //默認是all實現方案 String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all"))); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])"); com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName); //調用接口Dispatcher實現的dispatch方法返回ChannelHandler對象 return extension.dispatch(arg0, arg1); }}

下面就具體對照用戶手冊上關於派發實現的說明,分別對照源碼分析下:

1, all實現,用戶手冊說,所有消息都派發到線程池,包括請求,響應,連接事件,斷開事件,心跳等。

看下實現類AllDispatcher

public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //使用AllChannelHandler類實現 return new AllChannelHandler(handler, url); }}

AllChannelHandler類,通過類結構可以看到它也是ChannelHandler的裝飾類。

dubbo請求處理線程模型實現分析

裝飾類結構清晰。通過代碼可知,其他幾種線程分派模型實現裝飾類,都遵循同樣的繼承機構,都會繼承

WrappedChannelHandler

看下它對裝飾方法的實現

//連接事件放入線程池public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } //斷開事件連接放入線程池 public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } //接受請求(包含回覆消息處理)消息放入線程池 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO 臨時解決線程池滿後異常信息無法發送到對端的問題。待重構 //fix 線程池滿了拒絕調用不返回,導致消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //異常處理線程池 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } 

通過實現看到,它把所有操作都放入了線程池中執行。但是心跳消息的接受和發送沒有進入線程池。

2,direct 分配實現,文檔上說,所有消息都不派發到線程池,全部在 IO 線程上直接執行。

實現類DirectDispatcher

public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //直接返回原生的handler不進行另外裝飾 return handler; }}

如它文檔所說一樣,所有消息處理不派發線程池。

3,message 手冊上說,只有請求響應消息派發到線程池,其它連接斷開事件,心跳等消息,直接在 IO 線程上執行。

實現類MessageOnlyDispatcher

public class MessageOnlyDispatcher implements Dispatcher { public static final String NAME = "message"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //通過MessageOnlyChannelHandler裝飾類處理 return new MessageOnlyChannelHandler(handler, url); }}

這裡在貼下它的繼承圖:

dubbo請求處理線程模型實現分析

具體裝飾實現

public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //接收請求(包括響應)消息放在線程池。 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }}

如文檔所說,只有請求(響應發送)消息放入線程池執行。

4,execution 手冊上說,只請求消息派發到線程池,不含響應,響應和其它連接斷開事件,心跳等消息,直接在 IO 線程上執行。

實現類ExecutionDispatcher

public class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //通過裝飾類ExecutionChannelHandler實現 return new ExecutionChannelHandler(handler, url); }}

實現如下:

public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //連接事件放入線程池 public void connected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } //斷開事件連接放入線程池 public void disconnected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } //消息接受(響應消息發送)放入線程池 public void received(Channel channel, Object message) throws RemotingException { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO 臨時解決線程池滿後異常信息無法發送到對端的問題。待重構 //fix 線程池滿了拒絕調用不返回,導致消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side("+url.getIp()+","+url.getPort()+") threadpool is exhausted ,detail msg:"+t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //異常消息放入線程池 public void caught(Channel channel, Throwable exception) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); }}

通過實現可以看到,它同all派發實現一樣,並不是只有請求放入線程池。這個手冊上說的不一樣,手冊有誤,還是沒有實現!!

5,connection實現,手冊說,在 IO 線程上,將連接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池。

實現類ConnectionOrderedDispatcher

public class ConnectionOrderedDispatcher implements Dispatcher { public static final String NAME = "connection"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //裝飾類ConnectionOrderedChannelHandler實現 return new ConnectionOrderedChannelHandler(handler, url); }}

ConnectionOrderedChannelHandler實現:

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); //通過定義只有一個線程的線程池,保證執行的順序 //用LinkedBlockingQueue保存待處理的任務 connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME 沒有地方釋放connectExecutor! //這是等待隊列報警大小 queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } //連接事件放入隊列 public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } //斷開事件放入隊列 public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } //放入線程池 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //fix 線程池滿了拒絕調用不返回,導致消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //放入線程池 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); } }}

通過代碼分析,可以看到本實現如文檔說的一樣把連接斷事件處理放入隊列,有序執行,其他放入線程池。

以上就是具體線程派發模型的分析。

最後再看下上面提到的最後兩個裝飾類,

HeartbeatHandler裝飾類

public class HeartbeatHandler extends AbstractChannelHandlerDelegate { private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class); public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP"; public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP"; public HeartbeatHandler(ChannelHandler handler) { super(handler); } public void connected(Channel channel) throws RemotingException { setReadTimestamp(channel); setWriteTimestamp(channel); handler.connected(channel); } public void disconnected(Channel channel) throws RemotingException { clearReadTimestamp(channel); clearWriteTimestamp(channel); handler.disconnected(channel); } public void sent(Channel channel, Object message) throws RemotingException { setWriteTimestamp(channel); handler.sent(channel, message); } /*** * 心跳消息的接受和發送 * * @param channel * @param message * @throws RemotingException */ public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug( new StringBuilder(32) .append("Receive heartbeat response in thread ") .append(Thread.currentThread().getName()) .toString()); } return; }//非心跳消息的接受,走派發裝飾類 handler.received(channel, message); }}

可以看到HeartbeatHandler對received方法進行了處理,所以消息的接受和發送是不會派發到線程池的。

MultiMessageHandler裝飾類

public class MultiMessageHandler extends AbstractChannelHandlerDelegate { public MultiMessageHandler(ChannelHandler handler) { super(handler); } @SuppressWarnings("unchecked") @Override public void received(Channel channel, Object message) throws RemotingException { //多個消息類型時,循環接受 if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } }}

此裝飾類,主要完成多消息類型的循環解析接收。

所以到了NettyServer類,原始的handler已經經過的5層的裝飾。

這裡在其父類AbstractServer的構造方法中加斷點,截圖看下handler對象圖

dubbo請求處理線程模型實現分析

可以印證。


分享到:


相關文章: