首先还是Spring碰到dubbo的标签之后,会使用parseCustomElement解析dubbo标签,使用的解析器是dubbo的DubboBeanDefinitionParser,解析完成之后返回BeanDefinition给Spring管理。
服务消费者端对应的是ReferenceBean,实现了ApplicationContextAware接口,Spring会在Bean的实例化那一步回调setApplicationContext方法。也实现了InitializingBean接口,接着会回调afterPropertySet方法。还实现了FactoryBean接口,实现FactoryBean可以在后期获取bean的时候做一些操作,dubbo在这个时候做初始化。另外ReferenceBean还实现了DisposableBean,会在bean销毁的时候调用destory方法。
消费者的初始化是在ReferenceBean的init方法中执行,分为两种情况:
- reference标签中没有配置init属性,此时是延迟初始化的,也就是只有等到bean引用被注入到其他Bean中,或者调用getBean获取这个Bean的时候,才会初始化。比如在这里的例子里reference没有配置init属性,只有等到
HelloService helloService = (HelloService) applicationContext.getBean("helloService");
这句getBean的时候,才会开始调用init方法进行初始化。 - 另外一种情况是立即初始化,即是如果reference标签中init属性配置为true,会立即进行初始化(也就是上面说到的实现了FactoryBean接口)。
初始化开始
这里以没有配置init的reference为例,只要不注入bean或者不调用getBean获取bean的时候,就不会被初始化。HelloService helloService = (HelloService) applicationContext.getBean("helloService");
另外在ReferenceBean这个类在Spring中初始化的时候,有几个静态变量会被初始化:
1 | private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); |
这几个变量的初始化是根据dubbo的SPI扩展机制动态生成的代码:
refprotocol:
1 | import com.alibaba.dubbo.common.extension.ExtensionLoader; |
cluster:
1 | import com.alibaba.dubbo.common.extension.ExtensionLoader; |
proxyFactory:
1 | import com.alibaba.dubbo.common.extension.ExtensionLoader; |
初始化入口
初始化的入口在ReferenceConfig的get()方法:
1 | public synchronized T get() { |
init()方法会先检查初始化所有的配置信息,然后调用ref = createProxy(map);
创建代理,消费者最终得到的是服务的代理。初始化主要做的事情就是引用对应的远程服务,大概的步骤:
- 监听注册中心
- 连接服务提供者端进行服务引用
- 创建服务代理并返回
文档上关于Zookeeper作为注册中心时,服务消费者启动时要做的事情有:
订阅/dubbo/com.foo.BarService/providers目录下的提供者URL地址。
并向/dubbo/com.foo.BarService/consumers目录下写入自己的URL地址。
创建代理
- 引用远程服务
- 创建代理
init()中createProxy方法:
1 | private T createProxy(Map<String, String> map) { |
这里refprotocol是上面生成的代码,会根据协议不同选择不同的Protocol协议。
引用远程服务
对于服务引用refprotocol.refer(interfaceClass, url)
会首先进入ProtocolListenerWrapper的refer方法,然后在进入ProtocolFilterWrapper的refer方法,然后再进入RegistryProtocol的refer方法,这里的url协议是registry,所以上面两个Wrapper中不做处理,直接进入了RegistryProtocol,看下RegistryProtocol中:
1 | public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { |
获取注册中心
连接注册中心Registry registry = registryFactory.getRegistry(url);
首先会到AbstractRegistryFactory的getRegistry方法:
1 | public Registry getRegistry(URL url) { |
ZookeeperRegistryFactory的createRegistry方法:
1 | public Registry createRegistry(URL url) { |
zookeeperTransporter代码:
1 | package com.alibaba.dubbo.remoting.zookeeper; |
上面代码中可以看到,如果我们没有指定Zookeeper的client属性,默认使用zkClient,所以上面的zookeeperTransporter是ZkclientZookeeperTransporter。
继续看new ZookeeperRegistry(url, zookeeperTransporter);
:
1 | public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { |
AbstractRegistry的处理:
1 | public AbstractRegistry(URL url) { |
获取注册中心时的通知方法
notify方法:
1 | protected void notify(List<URL> urls) { |
notify(url, listener, filterEmpty(url, urls));
代码:
1 | protected void notify(URL url, NotifyListener listener, List<URL> urls) { |
AbstractRegistry构造完,接着是FailbackRegistry的处理:
1 | public FailbackRegistry(URL url) { |
这里会启动一个新的定时线程,主要是有连接失败的话,会进行重试连接retry(),启动完之后返回ZookeeperRegistry中继续处理。接下来下一步是服务的引用。
引用远程服务
继续看ref方法中最后一步,服务的引用,返回的是一个Invoker,return doRefer(cluster, registry, type, url);
1 | private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { |
注册中心接收到消费者发送的订阅请求后,会根据提供者注册服务的列表,推送服务消息给消费者。消费者端接收到注册中心发来的提供者列表后,进行服务的引用。触发Directory监听器的可以是订阅请求,覆盖策略消息,路由策略消息。
注册到注册中心
AbstractRegistry的register方法:
1 | public void register(URL url) { |
上面只是把url添加到registered这个set中。
接着看FailbackRegistry的register方法:
1 | public void register(URL url) { |
接着看下doRegister(url);方法,向服务器端发送注册请求,在ZookeeperRegistry中:
1 | protected void doRegister(URL url) { |
zkClient.create()方法:
1 | //path为 |
经过上面create之后,Zookeeper中就存在了消费者需要订阅的服务的节点:
1 | /dubbo |
订阅服务提供者
消费者自己注册到注册中心之后,接着是订阅服务提供者,directory.subscribe():
1 | public void subscribe(URL url) { |
看下registry.subscribe(url, this);,这里registry是ZookeeperRegistry,会先经过AbstractRegistry的处理,然后是FailbackRegistry的处理。
在AbstractRegistry中:
1 | //此时url为consumer://192.168.1.100/dubbo.common.hello.service.HelloService?application=dubbo-consumer& |
然后是FailbackRegistry中:
1 | public void subscribe(URL url, NotifyListener listener) { |
继续看doSubscribe(url, listener);向服务端发送订阅请求,在ZookeeperRegistry中:
1 | protected void doSubscribe(final URL url, final NotifyListener listener) { |
服务订阅完之后的通知
服务订阅完成之后,接着就是notify(url, listener, urls);:
会先经过FailbackRegistry将失败的通知请求记录到失败列表,定时重试。
1 | protected void notify(URL url, NotifyListener listener, List<URL> urls) { |
doNotify(url, listener, urls);:
1 | protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { |
AbstractRegistry中的doNotify实现:
1 | protected void notify(URL url, NotifyListener listener, List<URL> urls) { |
到RegistryDirectory中查看notify方法:
1 | public synchronized void notify(List<URL> urls) { |
重建invoker实例
refreshInvoker(invokerUrls);:
1 | /** |
toInvokers(invokerUrls) 方法:
1 | private Map<String, Invoker<T>> toInvokers(List<URL> urls) { |
创建invoker invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
:
- 先使用DubboProtocol的refer方法,这一步会依次调用ProtocolFIlterListenerWrapper,ProtocolFilterWrapper,DubboProtocol中的refer方法。经过两个Wrapper中,会添加对应的InvokerListener并构建Invoker Filter链,在DubboProtocol中会创建一个DubboInvoker对象,该Invoker对象持有服务Class,providerUrl,负责和服务提供端通信的ExchangeClient。
- 接着使用得到的Invoker创建一个InvokerDelegete
创建invoker
在DubboProtocol中创建DubboInvoker的时候代码如下:
1 | public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { |
查看getClients方法:
1 | private ExchangeClient[] getClients(URL url){ |
直接看initClient方法:
1 | //创建新连接 |
和服务端建立连接,Exchangers.connect(url ,requestHandler);,其实最后使用的是HeaderExchanger,Exchanger目前只有这一个实现:
1 | public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { |
Transporters.connect中也是根据SPI扩展获取Transport的具体实现,这里默认使用NettyTransporter.connect(),在NettyTransporter的connect方法中直接返回一个NettyClient(url, listener);,下面看下具体的NettyClient初始化细节,会先初始化AbstractPeer这里只是吧url和handler赋值;然后是AbstractEndpoint初始化:
1 | public AbstractEndpoint(URL url, ChannelHandler handler) { |
接着是AbstractClient的初始化:
1 | public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { |
看下在NettyClient中doOpen()的实现:
1 | protected void doOpen() throws Throwable { |
这里是Netty3中的客户端连接的一些常规步骤,暂不做具体解析。open之后,就是真正连接服务端的操作了,connect():
1 | protected void connect() throws RemotingException { |
NettyClient中的doConnect方法:
1 | protected void doConnect() throws Throwable { |
这里连接的细节都交给了netty。
NettyClient初始化完成之后,返回给Transporters,再返回给HeaderExchanger,HeaderExchanger中将NettyClient包装成HeaderExchangeClient返回给DubboProtocol的initClient方法中,到此在getSharedClient中就获取到了一个ExchangeClient,然后包装一下返回client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
。
到这里在DubboProtocol的refer方法中这句DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
创建DubboInvoker就已经解析完成,创建过程中连接了服务端,包含一个ExchangeClient等:
1 | public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { |
接着返回ProtocolFilterWrapper的refer方法,在这里会构建invoker链:
1 | public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { |
接着再返回到ProtocolListenerWrapper的refer方法,这里会初始化监听器,包装:
1 | public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { |
接着在返回到toInvokers方法,然后返回refreshInvoker方法的Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;
这就获得了Invoker,接着就是方法名映射Invoker列表:Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
这里将invokers列表转成与方法的映射关系。到这里refreshInvoker方法就完成了,在往上就返回到AbstractRegistry的notify方法,到这里也完成了。
创建服务代理
到这里有关消费者端注册到注册中心和订阅注册中心就完事儿了,这部分是在RegistryProtocol.doRefer方法中,这个方法最后一句是return cluster.join(directory);
,这里由Cluster组件创建一个Invoker并返回,这里的cluster默认是用FailoverCluster,最后返回的是经过MockClusterInvoker包装过的FailoverCluster。继续返回到ReferenceConfig中createProxy方法,这时候我们已经完成了消费者端引用服务的Invoker。然后最后返回的是根据我们得到的invoker创建的服务代理return (T) proxyFactory.getProxy(invoker);
。这里proxyFactory是我们在最上面列出的动态生成的代码。
首先经过AbstractProxyFactory的处理:
1 | public <T> T getProxy(Invoker<T> invoker) throws RpcException { |
然后经过StubProxyFactoryWrapper的处理:
1 | public <T> T getProxy(Invoker<T> invoker) throws RpcException { |
返回代理。到此HelloService helloService = (HelloService) applicationContext.getBean("helloService");
就解析完成了,得到了服务的代理,代理会被注册到Spring容器中,可以调用服务方法了。接下来的方法调用过程,是消费者发送请求,提供者处理,然后消费者接受处理结果的请求。
初始化的过程:主要做了注册到注册中心,监听注册中心,连接到服务提供者端,创建代理。这些都是为了下面消费者和提供者之间的通信做准备。