dubbo provider是如何啟動的

dubbo provider是如何啟動的

Netty支持多種服務端的server實例,包括mina、netty等,如下所示:

dubbo provider是如何啟動的

由於開發者目前使用dubbo幾乎都是基於Netty4的,因此下面的分析就以netty4的NettyServer為例,dubbo啟動過程中會調用 NettyServer#doOpen 初始化和啟動netty server。這裡主要操作就是初始化 bossGroup 和 workerGroup,然後進行bind、設置channelHandler,一個標準的netty初始化啟動流程,具體代碼如下:

<code>

protected

void

doOpen

()

throws

Throwable

{ bootstrap =

new

ServerBootstrap(); bossGroup =

new

NioEventLoopGroup(

1

,

new

DefaultThreadFactory(

"NettyServerBoss"

,

true

)); workerGroup =

new

NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),

new

DefaultThreadFactory(

"NettyServerWorker"

,

true

));

final

NettyServerHandler nettyServerHandler =

new

NettyServerHandler(getUrl(),

this

); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel

.

class

) .

childOption

(

ChannelOption

.

TCP_NODELAY

,

Boolean

.

TRUE

) .

childOption

(

ChannelOption

.

SO_REUSEADDR

,

Boolean

.

TRUE

) .

childOption

(

ChannelOption

.

ALLOCATOR

,

PooledByteBufAllocator

.

DEFAULT

) .

childHandler

(

new

ChannelInitializer

<

NioSocketChannel

>()

{

protected

void

initChannel

(NioSocketChannel ch)

throws

Exception

{

int

idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter =

new

NettyCodecAdapter(getCodec(), getUrl(), NettyServer.

this

);

if

(getUrl().getParameter(SSL_ENABLED_KEY,

false

)) { ch.pipeline().addLast(

"negotiation"

, SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler)); } ch.pipeline() .addLast(

"decoder"

, adapter.getDecoder()) .addLast(

"encoder"

, adapter.getEncoder()) .addLast(

"server-idle-handler"

,

new

IdleStateHandler(

0

,

0

, idleTimeout, MILLISECONDS)) .addLast(

"handler"

, nettyServerHandler); } }); ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }/<code>

dubbo啟動netty server時,會創建心跳檢查的ChannelHandler,從IdleStateHandler的實現來看,它提供針對了 讀空閒檢測readerIdleTime、寫空閒檢測writerIdleTime和讀寫空閒檢測allIdleTime的能力,當readerIdleTime、writerIdleTime或者allIdleTime大於0時,會在channelActive時初始化對應的netty的延時任務。

<code>

public

IdleStateHandler

(

long

readerIdleTime,

long

writerIdleTime,

long

allIdleTime, TimeUnit unit)

{

this

(

false

, readerIdleTime, writerIdleTime, allIdleTime, unit); }/<code>

當任務到期執行時,會檢查上次的讀寫時間戳是否大於設定的最大空閒時間,如果大於則發送 IdleStateEvent 事件,這時就會觸發用戶設定的ChannelHandler的fireUserEventTriggered回調,針對上述代碼來說,就會走到dubbo中org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#userEventTriggered方法中:

<code>

public

void

userEventTriggered

(ChannelHandlerContext ctx, Object evt)

throws

Exception

{

if

(evt

instanceof

IdleStateEvent) { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

try

{ logger.info(

"IdleStateEvent triggered, close channel "

+ channel); channel.close(); }

finally

{ NettyChannel.removeChannelIfDisconnected(ctx.channel()); } }

super

.userEventTriggered(ctx, evt); }/<code>

默認的心跳超時時間是心跳間隔的3倍,從實現來看,如果心跳超時了,dubbo provider端會主動斷開連接,這說明comsumer端可能已經掛了或者重啟了。

從上述dubbo啟動netty的初始化代碼來看,當consumer發出的請求達到provider時,首先要經過解碼器InternalDecoder,注意這個解碼器只是簡單的轉發作用,實際上解碼工作是靠具體協議對應的解碼器的,比如針對dubbo協議來說就是DubboCountCodec。

dubbo provider是如何啟動的

注意:dubbo provider端的解碼流程不是本文的關注重點,因此大家只需知道其流程即可,關於編解碼這塊後續我會寫專門的文章來分析。

consumer的請求數據經過解碼之後就到達了dubbo業務處理的ChannelHandler — NettyServerHandler。

<code>

public

void

channelRead

(ChannelHandlerContext ctx, Object msg)

throws

Exception

{ NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); handler.received(channel, msg); }/<code>

關於dubbo中處理各種IO事件,和netty中處理類似也定義了一套處理回調接口,定義如下:

<code>

public

interface

ChannelHandler

{

void

connected

(Channel channel)

throws

RemotingException

;

void

disconnected

(Channel channel)

throws

RemotingException

;

void

sent

(Channel channel, Object message)

throws

RemotingException

;

void

received

(Channel channel, Object message)

throws

RemotingException

;

void

caught

(Channel channel, Throwable exception)

throws

RemotingException

; }/<code>

傳遞給dubbo處理器,會走到MultiMessageHandler處理器,由於dubbo定義的各種處理器實際上就是責任鏈的體現,為了方便看流程,先看下大致的處理涉及的類圖:

dubbo provider是如何啟動的

  • MultiMessageHandler:提供了針對多請求的處理能力;
  • HeartbeatHandler:是針對心跳請求的處理邏輯,如果是心跳請求,則更新心跳時間戳,然後直接返回,這時是不會傳遞給接下來的處理器的;
  • AllChannelHandler:all線程模型的實現,這是dubbo provider端默認的線程模型,這種線程模型把所有事件都直接交給業務線程池進行處理了。

注意:dubbo的provider線程池模型不是本文關注的重點,因此大家理解節課,後續dubbo provider線程池模型這塊後續我會寫專門的文章來分析。

將請求數據傳遞給dubbo provider端的線程池來處理之後,接下來就是dubbo真正的業務處理流程了。也到了本文該結束的時刻了,關於dubbo provider後續的處理流程解析,歡迎大家看接下來的文章哈。


分享到:


相關文章: