深度解析dubbo服务远程暴露(二)


注:本文基于dubbo2.6.1

1. protocol.export(invoker)

接着《深度解析dubbo服务远程暴露(一)》这篇的讲,我们讲到exporter = new ExporterChangeableWrapper((Exporter) protocol.export(invokerDelegete), originInvoker);/// dubbo protocol,我们来看一下RegistryProtocol类的private Protocol protocol 成员,这个成员的值是由dubbo spi 扩展技术在实例完成后setter注入进来的,实际上这里protocol注入的是ExtensionLoader.getExtensionLoader(Protocol).getAdaptiveExtension();是自适应的实现类。我们可以看下自适应的实现类的export方法:


    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }

然后invokerDelegete 这个委托类getUrl得到的是protocol=dubbo的url,所以

Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");

最后获取的extension是经过dubbo spi setter注入与wrapper包装后的。
在这里插入图片描述
Listener-----> Filter----->Qos-----> Dubbo
这里的包装其实是没有顺序的,因为在包装的时候是遍历的一个ConcurrentHashSet集合,可以看下包装这块代码
在这里插入图片描述
所以我们每次启动可能会看到不一样的包装顺序。
我们先来看下ProtocolListenerWrapper这个类

2.ProtocolListenerWrapper

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {// registry   是否是注册中心
            return protocol.export(invoker);
        }
        return new ListenerExporterWrapper<T>(protocol.export(invoker),// dubbo export  dubbo --->暴露服务 生成exporter
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));//  exporter listener
    }

这里不是REGISTRY_PROTOCOL,然后走了下面使用ListenerExporterWrapper 将服务暴露返回的exporter与自动激活的listener们绑在了一起。我们可以看下这个ListenerExporterWrapper类,

public class ListenerExporterWrapper<T> implements Exporter<T> {
    private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
    private final Exporter<T> exporter;
    private final List<ExporterListener> listeners;
    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
        if (exporter == null) {// 判断null
            throw new IllegalArgumentException("exporter == null");
        }
        this.exporter = exporter;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            RuntimeException exception = null;
            for (ExporterListener listener : listeners) {//遍历通知
                if (listener != null) {
                    try {
                        listener.exported(this);//通知
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }
   ...
}

我们可以看到就是遍历通知listener,告诉他们当前服务暴露完了。这些listener们我们后面单独拿出来解析。

3.ProtocolFilterWrapper

接下来我们看下ProtocolFilterWrapper类的export方法

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {/// registry protocol  就到下一个wapper 包装对象中就可以了
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

这里不是registry protocol 然后会走下面,我们看下buildInvokerChain 方法,这里buildInvokerChain的三个参数分别是
invoker, service.filter,provider

 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;  
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    ...
                    //exception->moniter->timeout->trace->context->generic->classloader->echo
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                   ...
                };
            }
        }
        return last;
    }

这个方法实际上是根据dubbo spi 扩展技术自动激活的特性获取到对应的filter们,然后一层一层的包装这个invoker,生成一个过滤调用链,最后到真实的invoker上面。这个Filter我们后期会单独拿出来解析,这里只需要知道它一层层包装了invoker就行。

4.QosProtocolWrapper

 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        //判断是registry
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            // 启动Qos服务器?
            startQosServer(invoker.getUrl());
            return protocol.export(invoker);
        }
        return protocol.export(invoker);
    }

这里同理不是registry protocol ,直接到了下一层

5. DubboProtocol

  @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.  key = com.alibaba.dubbo.demo.DemoService:20880
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

这里首先是根据url生成一个服务的key,创建一个DubboExporter 把invoker,key与缓存exporter的map 绑在一起。将 创建的这个exporter缓存到exporterMap里面。我们可以exporterMap的定义:

protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

其中key就是生成的那个serviceKey,然后value是创建的那个exporter。
接着就是调用openServer(url),来打开服务器,我们看下源码:

 // 打开server
    private void openServer(URL url) {
        // find server.    获得地址 ip:port   192.168.1.104:20880
        String key = url.getAddress();
        //client can export a service which's only for server to invoke  客户端可以暴露仅供服务器调用的服务
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {// 判断是否是服务器

            //查找缓存的服务器
            ExchangeServer server = serverMap.get(key);
            if (server == null) {// 没有找到server 就要创建server
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override  server 支持重置,与覆盖一起使用
                server.reset(url);   //
            }
        }
    }

首先是获取地址,这个地址形式是ip:port,获取isserver参数,默认是true,然后从serverMap这个缓存中查找对应的server,如果之前没有创建过,就调用createServer(url) 来创建server,之后把创建的server缓存在serverMap中,我们先看下serverMap这个成员变量

private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // 

其中key是服务器地址,也就是上面url.getAddress();获得的,value就是对应的ExchangeServer。
我们再来看看是怎么创建server的,看下createServer(url)这个方法的源码:

// 创建server
    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default  服务器关闭时发送只读事件,默认情况下启用
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default    60 * 1000   设置心跳
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        //获取配置的服务器类型, 缺省就是使用netty 默认服务器是netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  // 获取的使用的server  缺省使用netty
        // 不存在Transports 话  , 就抛出 不支持的server type
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        // 设置  Codec 的类型为dubbo
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);// client
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

这个方法前面部分就是设置一些参数,channel.readonly.sent =true,就是服务器关闭的时候指发送只读属性,heartbeat=60*1000 设置默认的心跳时间,获取server,缺省的情况下使用netty,设置Codec 的类型为dubbo。
然后Exchangers.bind(url, requestHandler),其实这个Exchangers 是个门面类,封装了bind与connect两个方法的调用

6.Exchanger

我们可以看看Exchanger的方法们
在这里插入图片描述
我们接着,看下当前调用的bind方法:

// bind
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
         //验证
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 如果没有codec  的话 就设置为exchange
        // exchanger.bind()
        return getExchanger(url).bind(url, handler);
    }

前面的都是参数校验,我们看下getExchanger(url) 方法:

 public static Exchanger getExchanger(URL url) {
        // 获取exchanger  缺省header
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }
public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}

这里可以看到从url中获取exchanger ,缺省是header,然后使用dubbo spi 获取到HeaderExchanger,我们看下HeaderExchanger源码:

7.HeaderExchanger

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";
    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {

        // 创建一个通信client
        //DecodeHandler => HeaderExchangeHandler => ExchangeHandler( handler ) 。
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }
    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

        // 创建一个通信server  DecodeHandler  << HeaderExchangeHandler  << handler
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

这里有三个点,一是将这handler又包装了两层,二是使用Transporters 这个门面类进行bind,再就是创建HeaderExchangeServer 将server进行增强,其实HeaderExchangeServer 这个是专门发送心跳的。
我们先看下Transporters这个类

8.Transporters

这个Transporters也是门面类,对外统一了bind 与connect。我们只看下与这次有关的部分

 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        //验证
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {   // 多个channal  对 Channel分发   ChannelHandlerDispatcher 循环
            handler = new ChannelHandlerDispatcher(handlers);
        }
        // 真正服务器 进行bind
        
                                        
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可