注:本文基于dubbo2.6.1
1. protocol.export(invoker)
接着《深度解析dubbo服务远程暴露(一)》这篇的讲,我们讲到exporter = new ExporterChangeableWrapper
,我们来看一下RegistryProtocol类的private Protocol protocol 成员,这个成员的值是由dubbo spi 扩展技术在实例完成后setter注入进来的,实际上这里protocol注入的是ExtensionLoader.getExtensionLoader(Protocol).getAdaptiveExtension();是自适应的实现类。我们可以看下自适应的实现类的export方法:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
然后invokerDelegete 这个委托类getUrl得到的是protocol=dubbo的url,所以
Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
最后获取的extension是经过dubbo spi setter注入与wrapper包装后的。
Listener-----> Filter----->Qos-----> Dubbo
这里的包装其实是没有顺序的,因为在包装的时候是遍历的一个ConcurrentHashSet集合,可以看下包装这块代码
所以我们每次启动可能会看到不一样的包装顺序。
我们先来看下ProtocolListenerWrapper这个类
2.ProtocolListenerWrapper
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {// registry 是否是注册中心
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),// dubbo export dubbo --->暴露服务 生成exporter
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));// exporter listener
}
这里不是REGISTRY_PROTOCOL,然后走了下面使用ListenerExporterWrapper 将服务暴露返回的exporter与自动激活的listener们绑在了一起。我们可以看下这个ListenerExporterWrapper类,
public class ListenerExporterWrapper<T> implements Exporter<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
private final Exporter<T> exporter;
private final List<ExporterListener> listeners;
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {// 判断null
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {//遍历通知
if (listener != null) {
try {
listener.exported(this);//通知
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
...
}
我们可以看到就是遍历通知listener,告诉他们当前服务暴露完了。这些listener们我们后面单独拿出来解析。
3.ProtocolFilterWrapper
接下来我们看下ProtocolFilterWrapper类的export方法
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {/// registry protocol 就到下一个wapper 包装对象中就可以了
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
这里不是registry protocol 然后会走下面,我们看下buildInvokerChain 方法,这里buildInvokerChain的三个参数分别是
invoker, service.filter,provider
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
...
//exception->moniter->timeout->trace->context->generic->classloader->echo
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
...
};
}
}
return last;
}
这个方法实际上是根据dubbo spi 扩展技术自动激活的特性获取到对应的filter们,然后一层一层的包装这个invoker,生成一个过滤调用链,最后到真实的invoker上面。这个Filter我们后期会单独拿出来解析,这里只需要知道它一层层包装了invoker就行。
4.QosProtocolWrapper
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//判断是registry
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
// 启动Qos服务器?
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
这里同理不是registry protocol ,直接到了下一层
5. DubboProtocol
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service. key = com.alibaba.dubbo.demo.DemoService:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
这里首先是根据url生成一个服务的key,创建一个DubboExporter 把invoker,key与缓存exporter的map 绑在一起。将 创建的这个exporter缓存到exporterMap里面。我们可以exporterMap的定义:
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
其中key就是生成的那个serviceKey,然后value是创建的那个exporter。
接着就是调用openServer(url),来打开服务器,我们看下源码:
// 打开server
private void openServer(URL url) {
// find server. 获得地址 ip:port 192.168.1.104:20880
String key = url.getAddress();
//client can export a service which's only for server to invoke 客户端可以暴露仅供服务器调用的服务
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {// 判断是否是服务器
//查找缓存的服务器
ExchangeServer server = serverMap.get(key);
if (server == null) {// 没有找到server 就要创建server
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override server 支持重置,与覆盖一起使用
server.reset(url); //
}
}
}
首先是获取地址,这个地址形式是ip:port,获取isserver参数,默认是true,然后从serverMap这个缓存中查找对应的server,如果之前没有创建过,就调用createServer(url) 来创建server,之后把创建的server缓存在serverMap中,我们先看下serverMap这个成员变量
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); //
其中key是服务器地址,也就是上面url.getAddress();获得的,value就是对应的ExchangeServer。
我们再来看看是怎么创建server的,看下createServer(url)这个方法的源码:
// 创建server
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default 服务器关闭时发送只读事件,默认情况下启用
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default 60 * 1000 设置心跳
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
//获取配置的服务器类型, 缺省就是使用netty 默认服务器是netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 获取的使用的server 缺省使用netty
// 不存在Transports 话 , 就抛出 不支持的server type
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 设置 Codec 的类型为dubbo
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);// client
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
这个方法前面部分就是设置一些参数,channel.readonly.sent =true,就是服务器关闭的时候指发送只读属性,heartbeat=60*1000 设置默认的心跳时间,获取server,缺省的情况下使用netty,设置Codec 的类型为dubbo。
然后Exchangers.bind(url, requestHandler),其实这个Exchangers 是个门面类,封装了bind与connect两个方法的调用
6.Exchanger
我们可以看看Exchanger的方法们
我们接着,看下当前调用的bind方法:
// bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//验证
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 如果没有codec 的话 就设置为exchange
// exchanger.bind()
return getExchanger(url).bind(url, handler);
}
前面的都是参数校验,我们看下getExchanger(url) 方法:
public static Exchanger getExchanger(URL url) {
// 获取exchanger 缺省header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
这里可以看到从url中获取exchanger ,缺省是header,然后使用dubbo spi 获取到HeaderExchanger,我们看下HeaderExchanger源码:
7.HeaderExchanger
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 创建一个通信client
//DecodeHandler => HeaderExchangeHandler => ExchangeHandler( handler ) 。
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建一个通信server DecodeHandler << HeaderExchangeHandler << handler
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
这里有三个点,一是将这handler又包装了两层,二是使用Transporters 这个门面类进行bind,再就是创建HeaderExchangeServer 将server进行增强,其实HeaderExchangeServer 这个是专门发送心跳的。
我们先看下Transporters这个类
8.Transporters
这个Transporters也是门面类,对外统一了bind 与connect。我们只看下与这次有关的部分
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
//验证
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else { // 多个channal 对 Channel分发 ChannelHandlerDispatcher 循环
handler = new ChannelHandlerDispatcher(handlers);
}
// 真正服务器 进行bind
本作品采用 知识共享署名-相同方式共享 4.0
国际许可协议 进行许可