【dubbo】dubbo服务注册(dubbo:registry)

Bean名称生成规则

发布完成以后,注册到spring容器中的BeanName的生成规则如下:

【dubbo】dubbo服务注册(dubbo:registry)

此方法位于DubboBeanDefinitionParser类的parse方法。

如果设置了id属性,则把id属性的值作为此Bean的name,如果没有设置id属性,则获取interface节点的值,然后把interface的值作为bean的name以及id。

在上例中,没有设置,则通过applicationContext.getBean(“com.tofuwang.myrpc.api.HelloService”) 可以获取这个service的bean实例。

service 节点的会被映射到ServiceBean对象,对象实现了InitializingBean类的afterPropertiesSet方法,用于在类的创建之前进行的一些初始化工作,实际上在spring中还有一个类似的方法,init-method,例如:

他们只是执行顺序上的不同,如果进行了同时配置,则会先调用afterPropertiesSet方法,而后才会调用Init-method中配置的方法。

对于service的初始化,dubbo大量的工作都放在了afterPropertiesSet这个方法执行。这个方法比较长,这个方法是为了设置一些初始化的数据,分为以下几个步骤:

  • 设置providerconfig

  • 设置spring的applicationContext上下文

  • 设置注册中心配置(RegistryConfig)

  • 设置监控和协议信息

  • 设置classpath

相关配置信息如下图

【dubbo】dubbo服务注册(dubbo:registry)

服务注册(doExport)

查看ServiceBean这个类可以看到,它实现了ApplicationListener这个接口,这样会在spring启动最后,会调用finishRefresh()方法,通过调用子类的onApplicationEvent方法进行通知,所以在spring启动完成之后,会调用ServiceBean的onApplicationEvent方法,而dubbo把服务注册到注册中心,就是在这个方法中完成的。

程序进行判断是否要延迟注册,如果不是延迟注册,则直接调用ServiceConfig中的doExport(),需要延迟注册的话,就新启动一个线程,然后Thread.sleep相应的延迟时间以后再调用doExport()。

doExport()主要用来各种前置校验,各种校验完成以后才调用doExportUrls()进行发布服务。

首先获取所有注册中心,并且封装到List对象中,获取配置的protocols。从这种写法上可以看出来,dubbo是支持多注册中心,多协议的。

private void doExportUrls() { List registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); }}

然后调用doExportUrlsFor1Protocol进行逐个的注册到注册中心的中。里面有一小段Socket的代码,用来测试一下注册中心的地址的连通性。如果连接不上,程序不不阻断,只是打出来一行warn日志进行提醒。

然后封装method,把服务者的所有method方法封装到config中。

  1. 设置Token

  2. 根据设置是否是inJVM,如果是injvm的话,则不进行异步调用。

  3. 根据service 的scope配置,如果配置为了为none,则也不注册。

  4. 根据配置,范围有两种,一种是local,另外一种是remote,配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务),如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务),默认不配置,则两种都有,本地和远程的URL中设置了两种protocol类型,injvm和在配置文件中配置的节点的protocol,通常的话我们配置的是dubbo协议。然后根据协议分别被InjvmProtocol和DubboProtocol这两个类进行了处理。分别调用各个协议的export方法对服务进行导出。

服务导出(export)

服务导出的代码如下(RegistryProtocol)

public  Exporter export(final Invoker originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter() { public Invoker getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } };}

该方法主要讲述的东西如下:

  1. 首先调用doLocalExport方法实现真正的服务发布,Protocol是通过SPI注册进来的,默认是dubbo,并且进行了缓存,避免了多次发布。

  2. 根据URL,通过RegistryFactory获取Registry对象

  3. 然后调用相关的registry方法进行服务注册。

  4. 监听服务变化,监听到变化了以后,对服务进行更新。

  5. 缓存exporter,服务下线的时候进行销毁等后续操作。

以dubboProtocol的export方法为例,服务导出有两步操作,一是开启netty服务端口进行监听和接收数据(如果使用的是netty的话),另外去注册中心进行注册。

开启服务、等待连接

开启服务端端口,接收并处理RegistryProtocol.doLocalExport

对于localExport,根据相关参数,实际上真正执行的是DubboProtol中的export方法,在这个方法中返回了一个exporter,而比较关键的方法是openServer,在这个server中对外暴露服务。

private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } }}

通过以上可以看出,从URL中取得要暴露服务的地址,这个地址的格式实际上是ip:port,在serverMap中查询服务是否存在,如果存在的就不再重复创建,不存在则调用createServer方法创建服务。

关键是服务如何创建的?

private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set 
supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server;}

以上就是创建服务的代码,通过以上代码可以看出相关逻辑

  1. 默认开启关闭事件接收回调readonly和开启心跳

  2. 可以看出dubbo封装了一个中间层,transporter,用来封装服务,dubbo支持三种服务类型,netty,mina,grizzly,默认使用的是netty,如果想更改的话,可以在 的server节点更改。

  3. 封装一个ExchangeHandlerAdapter,这个也是个中间层,用来统一处理请求服务,作为接收服务请求的一个句柄。

  4. 调用Exchangers的bind方法,把url和requestHandler(也就是ExchangeHandler)作为参数传递进去,进行真正的绑定服务。

具体是如何bind的,在分析dubbo的remoteing-api这一层的时候再进行具体分析。

服务注册到注册中心

调用Registry的registry方法进行服务注册,注册到注册中心,registry.register(registedProviderUrl);

如果是zookeeper的话,则真正的实现是ZookeeperRegistry。如何自动找到具体的实现类的,请查看【dubbo】dubbo SPI机制(ExtensionLoader)

服务注册这一层,被dubbo封装在了dubbo-registry这个模块

【dubbo】dubbo服务注册(dubbo:registry)

可以看出,dubbo支持默认的直连方式,广播方式和用redis以及zookeeper作为注册中心的方式,用zookeeper作为注册中心是dubbo推荐的方式。

继续以zookeeper为例,看如何注册到zookeeper上的。首先zookeeperRegister继承FailbackRegistry,这个类对于快速失败修复做了一个抽象层,记录已经注册的服务,并且开启心跳检测重试机制。把注册失败的服务放到一个CurrentHashSet中,进行无限次的重试。真正注册的时候调用doRegistry方法,这是一个钩子方法,有具体的子类实现,在这里就是有ZookeeperRegistry类来实现。

Dubbo使用的sgroschupf的zkclient(https://github.com/sgroschupf/zkclient)的连接zookeeper,创建的时候分为永久节点和临时节点,duboo创建的服务都是临时节点

zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));

最后一个参数代表的是节点类型,true是临时节点,false是永久节点

public void create(String path, boolean ephemeral) { int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } if (ephemeral) { createEphemeral(path); } else { createPersistent(path); }}

通过这种方式,dubbo就把自己的服务注册到了注册中心上。通过zkclient客户端连接上zk,查看zk上的dubbo节点数据如下:

【dubbo】dubbo服务注册(dubbo:registry)

附:服务注册流程图

【dubbo】dubbo服务注册(dubbo:registry)

tencent://AddContact/?fromId=50&fromSubId=1&subcmd=all&uin=2284732365


分享到:


相關文章: