dubbo PRC服务调用过程很复杂,这里准备通过分析一个典型rpc方法调用的调用栈来说明调用过程。说它典型,是因为本次分析的调用场景很典型简单
先定义一个接口
public interface DemoService {
public String sayHello(String name);
}
然后一个服务实现类
public class DemoServiceImpl implements DemoService {
public String sayHello(String name) {
Random random=new Random();
try {
Thread.sleep(800* random.nextInt(6));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress();
}
}
客户端通过注册中心引用这个服务,注册中心用zookeepr协议实现。
<dubbo:registry id="hangzhouRegistry" address="zookeeper://192.168.64.128:2181" />
//这里配置的过滤器 和负载均衡策略
<dubbo:reference id="demoService" interface="demo.dubbo.api.DemoService" loadbalance="random" timeout="6000" filter="monitor"/>
服务端通过注册中心发布服务,默认是dubbo协议发布
<dubbo:registry address="zookeeper://192.168.64.128:2181"/>
<dubbo:service interface="demo.dubbo.api.DemoService" ref="demoService" />
启动发布好服务时候后,通过mian方法调用服务方法sayHello,并打印,代码如下:
public class DemoApplicationCustomer {
public static void main(String[] args) {
ApplicationContext context= new ClassPathXmlApplicationContext(new String[]{"/spring/dubbo-demo-consumer.xml"});
final demo.dubbo.api.DemoService demoService= (DemoService) context.getBean("demoService");
System.err.println(demoService.sayHello("mydubbodebug"));
}
}
这个mian方法会,启动一个spring容器,然后触发一个简单的rpc方法调用。
接下来就是在dubbo源码中,某个地方打个断点在rpc把请求消息通过网络发送出去之前。hold住请求你,这样就好查看dubbo在客户端的调用方法栈了,
关于断点打在哪里的问题,之前写过一篇博客https://cloud.tencent.com/developer/article/1109547,介绍过dubbo通信消息的解析过程。可以知道dubbo默认底层的传输框架是netty。
看下com.alibaba.dubbo.remoting.transport.netty.NettyClient类doOpen方法
/***
* 打开到远端服务机器的连接
* @throws Throwable
*/
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
//设置消息流的处理handler,发出去的消息先经过handler再经过encoder,
//这里断点可以设置在nettyHandler类里。
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
NettyHandler类继承了netty的SimpleChannelHandler类,并实现了writeRequested方法
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.writeRequested(ctx, e);//断点处
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
我们可以在super.writeRequested(ctx, e);这句前打个断点。
启动运行DemoApplicationCustomer后,我们可以得到如下线程栈信息:
java.lang.Thread.State: RUNNABLE
at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)
at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:266)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
at org.jboss.netty.channel.Channels.write(Channels.java:611)
at org.jboss.netty.channel.Channels.write(Channels.java:578)
at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98)
at com.alibaba.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:258)
at com.alibaba.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:54)
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:112)
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:88)
at com.alibaba.dubbo.rpc.protocol.dubbo.ReferenceCountExchangeClient.request(ReferenceCountExchangeClient.java:78)
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:97)
at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:144)
at com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:74)
at com.alibaba.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:65)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:48)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:229)
at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)
at com.example.DemoApplicationCustomer.main(DemoApplicationCustomer.java:6)
这里用到29个类,除了6个jboss内部的6个类,其他23个就是需要我们研究的。从下往上看,可以直观的看到pc客户端方法调要用经过的类和方法。
接下来对每个一个类的创建过程和调用时机做出解读。
第一行栈信息
1,at com.example.DemoApplicationCustomer.main(DemoApplicationCustomer.java:6)
DemoApplicationCustomer类是启动类,可以忽略。
第二行栈信息
2,at com.alibaba.dubbo.common.bytecode.proxy0.sayHello(proxy0.java:-1)
com.alibaba.dubbo.common.bytecode.proxy0类,它是一个代理类。它代理了所有RPC服务接口的方法调用。
这个类实例什么时候创建的,类代码是什么样的?
之前写过一篇博文,dubbo基于spring的构建分析,可以看到代理的创建是由ReferenceBean类里的
public Object getObject() throws Exception {
return get();
}
方法里触发,具体实现在ReferenceConfig类createProxy方法里
/***
* 创建客户端rpc调用代理
* @param map
* @return
*/
@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
private T createProxy(Map<String, String> map){
//....用于生成invoker的逻辑,关于inoker生成逻辑这里先忽略,后面会说到
//创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
而proxyFactory变量赋值为
ProxyFactoryproxyFactory=ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
博文 dubbo SPI解析 里可以得到到ProxyFactory接口的Adaptive类的getProxy方法源码如下:
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
//这里默认用了ProxyFactory javassist扩展的getProxy方法创建代理
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
}
ProxyFactory接口的javassist扩展类JavassistProxyFactory的getProxy方法实现
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
//代理类实现化以new InvokerInvocationHandler(invoker)问参数
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
再到生成动态代理的Proxy类
/**
* Get proxy.
*
* @param ics interface class array.
* @return Proxy instance.
*/
public static Proxy getProxy(Class<?>... ics) {
return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}
/**
* Get proxy.
*
* @param cl class loader.
* @param ics interface class array. 可以实现多个接口
* @return Proxy instance.
*/
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
if (ics.length > 65535)
throw new IllegalArgumentException("interface limit exceeded");
StringBuilder sb = new StringBuilder();
for (int i = 0; i < ics.length; i++) {
String itf = ics[i].getName();
if (!ics[i].isInterface())
throw new RuntimeException(itf + " is not a interface.");
Class<?> tmp = null;
try {
tmp = Class.forName(itf, false, cl);
} catch (ClassNotFoundException e) {
}
if (tmp != ics[i])
throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
sb.append(itf).append(';');
}
// use interface class name list as key.
// 用接口类名做key,多个接口以分号分开。
String key = sb.toString();
// get cache by class loader.
// 缓存
Map<String, Object> cache;
synchronized (ProxyCacheMap) {
cache = ProxyCacheMap.get(cl);
if (cache == null) {
cache = new HashMap<String, Object>();
ProxyCacheMap.put(cl, cache);
}
}
Proxy proxy = null;
synchronized (cache) {
do {
Object value = cache.get(key);
if (value instanceof Reference<?>) {
//如果有存在引用对象,返回缓存对象。
proxy = (Proxy) ((Reference<?>) value).get();
if (proxy != null)
return proxy;
}
//对象正在生成,线程挂起,等待
if (value == PendingGenerationMarker) {
try {
cache.wait();
} catch (InterruptedException e) {
}
} else {//放入正在生成标识
cache.put(key, PendingGenerationMarker);
break;
}
}
while (true);
}
//类名称后自动加序列号 0,1,2,3...
long id = PROXY_CLASS_COUNTER.getAndIncrement();
String pkg = null;
//ClassGenerator dubbo用javassist实现的工具类
ClassGenerator ccp = null, ccm = null;
try {
ccp = ClassGenerator.newInstance(cl);
Set<String> worked = new HashSet<String>();
List<Method> methods = new ArrayList<Method>();
for (int i = 0; i < ics.length; i++) {
//检查包名称及不同包的修饰符
if (!Modifier.isPublic(ics[i].getModifiers())) {
String npkg = ics[i].getPackage().getName();
if (pkg == null) {
pkg = npkg;
} else {
if (!pkg.equals(npkg))
throw new IllegalArgumentException("non-public interfaces from different packages");
}
}
//代理类添加要实现的接口Class对象
ccp.addInterface(ics[i]);
for (Method method : ics[i].getMethods()) {
//获取方法描述符,不同接口,同样的方法,只能被实现一次。
String desc = ReflectUtils.getDesc(method);
if (worked.contains(desc))
continue;
worked.add(desc);
int ix = methods.size();
//方法返回类型
Class<?> rt = method.getReturnType();
//方法参数类型列表
Class<?>[] pts = method.getParameterTypes();
//生成接口的实现代码,每个方法都一样
StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
for (int j = 0; j < pts.length; j++)
code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
if (!Void.TYPE.equals(rt))
code.append(" return ").append(asArgument(rt, "ret")).append(";");
methods.add(method);
ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
}
}
if (pkg == null)
pkg = PACKAGE_NAME;
// create ProxyInstance class.
// 具体代理类名称,这里是类全名
String pcn = pkg + ".proxy" + id;
ccp.setClassName(pcn);
ccp.addField("public static java.lang.reflect.Method[] methods;");
ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
//创建构造函数
ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
ccp.addDefaultConstructor();
Class<?> clazz = ccp.toClass();
//通过反射,把method数组放入,静态变量methods中,
clazz.getField("methods").set(null, methods.toArray(new Method[0]));
// create Proxy class.
String fcn = Proxy.class.getName() + id;
ccm = ClassGenerator.newInstance(cl);
ccm.setClassName(fcn);
ccm.addDefaultConstructor();
//设置父类为抽象类,Proxy类子类,
ccm.setSuperClass(Proxy.class);
//生成实现它的抽象方法newInstance代码
//new 的实例对象,是上面生成的代理类 pcn
ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
Class<?> pc = ccm.toClass();
proxy = (Proxy) pc.newInstance();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
} finally {
// release ClassGenerator
if (ccp != null)
ccp.release();
if (ccm != null)
ccm.release();
synchronized (cache) {
if (proxy == null)
cache.remove(key);
else
//放入缓存,key:实现的接口名,value 代理对象,这个用弱引用,
//当jvm gc时,会打断对实例对象的引用,对象接下来就等待被回收。
cache.put(key, new WeakReference<Proxy>(proxy));
cache.notifyAll();
}
}
return proxy;
}
以上简单分析下生成过程。这里贴出通过代码hack生成的代理类源码,这里动态生成了2个类
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.common.bytecode.ClassGenerator.DC;
import java.lang.reflect.InvocationHandler;
public class Proxy0 extends Proxy implements DC {
public Object newInstance(InvocationHandler var1) {
return new proxy01(var1);
}
public Proxy0_my() {
}
}
这个类继承了抽象类Proxy,实现了它的抽象方法newInstance,接口DC是dubbo内部作为动态类标识的接口。
还有一个类proxy01,就是在开始方法栈里看到的代理类,源码如下
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.rpc.service.EchoService;
import demo.dubbo.api.DemoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
public class proxy01 implements ClassGenerator.DC, EchoService, DemoService {
public static Method[] methods;
private InvocationHandler handler;
//实现了接口方法
public String sayHello(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = null;
try {
var3 = this.handler.invoke(this, methods[1], var2);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (String)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = null;
try {
var3 = this.handler.invoke(this, methods[3], var2);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return (Object)var3;
}
public proxy01() {
}
//public 构造函数,这里handler是
//由Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))语句传入的InvokerInvocationHandler对象
public proxy01(InvocationHandler var1) {
this.handler = var1;
}
}
可以看到代理类实现了3个接口。
ClassGeneratr.DC是dubbo动态类标识接口
DemoService是实际业务接口。这样代理就可以调用服务方法了。
EchoService是回显测试接口,只有一个方法,
public interface EchoService {
/**
* echo test.
*
* @param message message.
* @return message.
*/
Object $echo(Object message);
}
它能为所有dubbo rpc服务加上的一个回显测试方法。
EchoService echoService = (EchoService) memberService; // 通过强制转型为EchoService,可以测试。
到这里我们大概梳理了代理类生成过程。可以看到sayHello方法的调用其实是
this.handler.invoke(this, methods3, var2);调用。这也可以解释了方法栈里第3行信息
3,com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
再看下InvokerInvocationHandler类
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker;
//通过构造函数传入invoker
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
//如果是Object类方法
if (method.getDeclaringClass() == Object.class) {
//反射调用
return method.invoke(invoker, args);
}
//对3个特殊方法的调用,做了处理
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
//其他业务方法通过invoker.invoke方法调用(***看这里***)
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
这里的invoker对象,通过InvokerInvocationHandler构造方法传入,而InvokerInvocationHandler对象是由JavassistProxyFactory类
getProxy(Invoker<T> invoker, Class<?>[] interfaces)方法创建。
这还要回到调用proxyFactory.getProxy(invoker);方法的地方,即ReferenceConfig类的createProxy(Map<String, String> map)方法
以下部分逻辑是生成invoker的过程:
if (urls.size() == 1) {//只有一个直连地址或一个注册中心配置地址
//这里的urls.get(0)协议,可能是直连地址(默认dubbo协议),也可能是regiter注册地址(zookeeper协议)
//我们这里走的是注册中心,所以
invoker = refprotocol.refer(interfaceClass, urls.get(0));//本例通过配置一个注册中心的形式(***看这里***)
} else {//多个直连地址或者多个注册中心地址,甚至是两者的组合。
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
//创建invoker放入invokers
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 多个注册中心,用最后一个registry url
}
}
if (registryURL != null) { //有注册中心协议的URL,
//对多个url,其中存在有注册中心的,写死用AvailableCluster集群策略
//这其中包括直连和注册中心混合或者都是注册中心两种情况
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 多个直连的url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
可以看到invoker是通过
refprotocol.refer(interfaceClass, urls.get(0));
或者
cluster.join(new StaticDirectory(u, invokers));
cluster.join(new StaticDirectory(invokers));
三种构建语句依照条件选一种调用生成。
这里分析第一种生成invokder的情况,
根据spi机制这里refprotocol对象是Protocol$Adpative实例,具体refer实现是
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
通过代码可以得知,Protocol具体实现要根据url的Protocol值再通过spi得到
如果是直连地址,这里就是dubbo协议,最后走DubboProtocol类的refer方法
具体实现是:
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
如果是注册中心,这里protocol是register,会走RegistryProtocol类的refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//通过register 可以获取具体注册中心协议,这里是zookeeper,并设置为url的协议值。
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
//获取zookeeper Registry 实现,即ZookeeperRegistryFactory ,并调用getRegistry方法实现
//获取zookeeper类型的registry对象
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//这里cluster是Cluster$Adpative类对象
return doRefer(cluster, registry, type, url);
}
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//这里的RegistryDirectory和StaticDirectory向对应的,前者是动态从注册中心获取url目录对象,后者是静态指定url目录。
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//订阅注册中心,可以获取服务提供方地址等信息
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
//通过调用Cluster$Adpative类的join方法返回Invoker对象(***看这里***)
return cluster.join(directory);
}
这里看下Cluster$Adpative类join方法实现
public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
//通过cluster获取集群策略,默认是failover
//本例是使用failover机制
String extName = url.getParameter("cluster", "failover");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
//通过spi这里得到FailoverCluster对象
return extension.join(arg0);
}
再看下FailoverCluster的join方法:
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
//返回FailoverClusterInvoker对象
return new FailoverClusterInvoker<T>(directory);
}
由于Cluster spi实现中有个MockClusterWrapper是包装类,这里牵涉到是dubbo的aop机制,这里先调用它的join方法
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
又由于FailoverClusterInvoker是AbstractClusterInvoker的子类,它的invoke方法实现在其父类中的,所以如下方法栈信息:
at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:229)
at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
这些类都是dubbo的集群容错,之前写过一篇博客https://cloud.tencent.com/developer/article/1109572 是关于集群容错的介绍的。
再往下看AbstractClusterInvoker的invoke方法实现:
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
LoadBalance loadbalance;
//会调用directory的list方法 返回要调用invokers集合。
//其实是AbstractDirectory的list方法,这个方法里就是利用路由规则(如果有),从所有
//提供者中,遴选出符合规则的提供者
//接下里才是,集群容错和负载均衡。
List<Invoker<T>> invokers = list(invocation);//生成invoker方法(****看这里***)
if (invokers != null && invokers.size() > 0) {
//从url通过key "loadbalance" 取不到值,就取默认random随机策略
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
//取默认random随机策略
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
list方法:
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
//directory.list(invocation)获取invokers,这里directory是RegistryDirectory
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
跟到RegistryDirectory类的list方法,实现在其父类AbstractDirectory中
/***
* 落地路由规则
* @param invocation
* @return
* @throws RpcException
*/
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
//获取所有的提供者
List<Invoker<T>> invokers = doList(invocation);//(***看这里***)
//本地路由规则,这个其实已近设置好setRouters方法。什么时候设置的,稍后看看
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
//Router接口,实现类的rout的方法。路由获取服务提供者
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
//这里doList是个抽象方法,由RegistryDirectory实现具体:
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. 没有服务提供者 2. 服务提供者被禁用
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
}
List<Invoker<T>> invokers = null;
// local reference 从这里搜索methodInvokerMap赋值,在refreshInvoker方法里。(***看这里***)
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
下面是refreshInvoker(List<URL> invokerUrls)方法
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止访问
this.methodInvokerMap = null; // 置空列表
destroyAllInvokers(); // 关闭所有Invoker
} else {
this.forbidden = false; // 允许访问
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
}
if (invokerUrls.size() == 0) {
return;
}
//生成Invoker方法 toInvokers(***看这里****)
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表,invoker在这里创建
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
// state change
//如果计算错误,则不进行处理.
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
return;
}
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
顺便说下,refreshInvoker()方法会在 RegistryDirectory类的notify(List<URL> urls)方法里调用,这个方法也是订阅注册中心回调方法。下面跟到toInvokers方法
/**
* 将urls转成invokers,如果url已经被refer过,不再重新引用。
*
* @param urls
* @param overrides
* @param query
* @return invokers
*/
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.size() == 0) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//如果reference端配置了protocol,则只选择匹配的protocol
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // URL参数是排序的
if (keys.contains(key)) { // 重复URL
continue;
}
keys.add(key);
// 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 缓存中没有,重新refer
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
//这里是invoker的创建的地方(***看这里***)
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // 将新的引用放入缓存
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
终于找到invoker的创建的地方,先看InvokerDelegete,它是RegistryDirectory的内部类
/**
* 代理类,主要用于存储注册中心下发的url地址,
* 用于重新重新refer时能够根据providerURL queryMap overrideMap重新组装
*
* @param <T>
* @author chao.liuc
*/
private static class InvokerDelegete<T> extends InvokerWrapper<T> {
private URL providerUrl;
public InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl) {
//调用父类构造方法
super(invoker, url);
this.providerUrl = providerUrl;
}
public URL getProviderUrl() {
return providerUrl;
}
}
invoke方法在其父类InvokerWrapper里实现的
public Result invoke(Invocation invocation) throws RpcException {
//这里的invoker是从它的构造方法里传入的
return invoker.invoke(invocation);
}
所以在方法栈里看到下面一行栈信息
at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
InvokerDelegete构造方法调用的父类InvokerWrapper的构造方法并传入invoker,回头看
new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);这句。
可知上面的invoker是由protocol.refer(serviceType, url)创建的。
通过debug,可知这里的protocol是Protocol$Adpative类型,
这里的url的Protocol是dubbo,通过spi可以得到这里最后走DubboProtocol类refer方法
但是由于Protocal接口实现中,有两个包装类
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
所以这里先执行ProtocolFilterWrapper的refer方法,再执行ProtocolListenerWrapper的refer方法,
最后才执行DubboProtocol类refer方法。
ProtocolFilterWrapper的refer方法如下:
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//先获取激活的过滤器,我们这里手动配置了monitor MonitorFilter顾虑器,
// 另外两个自动激活的过滤器是FutureFilter,ConsumerContextFilter
//这里需要看spi机制的getActivateExtension方法相关代码
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
//实现invoker的 invoke方法
public Result invoke(Invocation invocation) throws RpcException {
//嵌套进过滤器链
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
所以有以下调用栈信息
at com.alibaba.dubbo.monitor.support.MonitorFilter.invoke(MonitorFilter.java:65)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:54)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:48)
at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:69)
接着ProtocolListenerWrapper的refer方法
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
//
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
//获取激活的监听器,目前dubbo没有 提供合适的监听器 只有一个DeprecatedInvokerListener实现类,还是个Deprecated的
//所以这里为空
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(InvokerListener.class).getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
这个可以解释下面这句堆栈信息:
at com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper.invoke(ListenerInvokerWrapper.java:74)
最后看下DubboProtocol类refer方法,这里创建了DubboInvoker对象
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
DubboInvoker的父类AbstractInvoker实现了invoke方法
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!");
}
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
//doInvoke 具体实现在子类中
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
DubboInvoker实现的doInvoke方法
DubboInvoker实现的doInvoke方法
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
//实际的请求语句 ,这里的currentClient是 自身对象属性clients[0]值(***看这里****)
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
所以会有这两句调用者栈输出
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:97)
at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:144)
接下来应该看用于发起请求的currentClient对象的的实现了,它的实现可以追踪到DubboProtocol类refer方法里
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
//getClients(url) 创建 DubboInvoker 属性clients对象,
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
getClients方法:
private ExchangeClient[] getClients(URL url) {
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
//获取共享连接
clients[i] = getSharedClient(url);
} else {
//初始化client ,本例子不是共享连接,走这个逻辑(****看这里**)
clients[i] = initClient(url);
}
}
return clients;
}
/**
* 创建新连接.
*/
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
//设置连接应该是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//(****看这里**)
//通过 Exchangers.connect(url, requestHandler); 构建client ,接下来跟踪Exchangers.connect方法
//这里会传入一个requestHandler,这个是客户端解救服务端方法返回回调的
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
这里用到了facade设计模式,Exchangers是个门面类,封装了具体查找合适的Exchanger实现,并调用connect方法返回ExchangeClient的过程,相关方法代码如下:
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//把codec key 设置为exchange
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
//通过exchanger key 获取 Exchanger的spi实现,默认是header,这里是HeaderExchanger类
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
//这里返回Exchanger接口的header扩展类HeaderExchanger
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
那么HeaderExchanger类connect方法
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
/**
* 客户端的连接操作
* @param url
* @param handler
* @return
* @throws RemotingException
*/
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
//这里返回了HeaderExchangeClient对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
}
所以有栈信息:
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeClient.request(HeaderExchangeClient.java:88)
再看HeaderExchangeClient.request方法
public ResponseFuture request(Object request) throws RemotingException {
//这里channel对象是从类构造函数中赋值,this.channel = new HeaderExchangeChannel(client);
return channel.request(request);
}
所以有栈信息
at com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel.request(HeaderExchangeChannel.java:112)
继续追查HeaderExchangeChannel类request方法
public ResponseFuture request(Object request) throws RemotingException {
return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT));
}
//重载后方法:
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
//通过具体channel 发送请求
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
这里有牵涉channel对象,这里的channel对象也是通过HeaderExchangeChannel类的构造函数,从上层方法传进来的,
而HeaderExchangeChannel是由HeaderExchangeClient构造的,
HeaderExchangeClient对象是由HeaderExchanger的connect方法里创建的
这里回到HeaderExchanger的connect方法:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
可以看到上文中HeaderExchangeChannel类中发送消息的channel对象是
Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
这句创建的。这里的Transporters也是个门面类,是facade设计模式的实现,Transporters具体connect方法实现如下:
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
//这里具体走 NettyTransporter.connect
// public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// return new NettyClient(url, listener);
// }
//所以这里默认返回的NettyClient
return getTransporter().connect(url, handler);
}
//这个方法根据spi返回NettyTransporter扩展类
public static Transporter getTransporter() {
//这里通过生成的Transporter$Adaptive 的实现如下:
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
所以最后是通过NettyClient类实例的send方法发送的具体请求,
NettyClient类send方法实现在其祖先类AbstractPeer中
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
}
这个实现又调用NettyClient父类AbstractClient的send方法实现
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
//获取具体channel实例
Channel channel = getChannel();
//TODO getChannel返回的状态是否包含null需要改进
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);
}
这里的getChannel()方法由NettyClient自身实现,如下:
protected com.alibaba.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected())
return null;
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
//再到NettyChannel.getOrAddChannel方法
static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
//返回NettyChannel类
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nc = new NettyChannel(ch, url, handler);
if (ch.isConnected()) {
ret = channelMap.putIfAbsent(ch, nc);
}
if (ret == null) {
ret = nc;
}
}
return ret;
}
所以有以下栈信息:
at com.alibaba.dubbo.remoting.transport.netty.NettyChannel.send(NettyChannel.java:98)
at com.alibaba.dubbo.remoting.transport.AbstractClient.send(AbstractClient.java:258)
at com.alibaba.dubbo.remoting.transport.AbstractPeer.send(AbstractPeer.java:54)
后面就是jboss内部的调用和消息转换:
at org.jboss.netty.channel.SimpleChannelHandler.handleDownstream(SimpleChannelHandler.java:266)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
at org.jboss.netty.channel.Channels.write(Channels.java:611)
at org.jboss.netty.channel.Channels.write(Channels.java:578)
at org.jboss.netty.channel.AbstractChannel.write(AbstractChannel.java:251)
最后就走到我们开始打断点的NettyHandler类writeRequested方法:
at com.alibaba.dubbo.remoting.transport.netty.NettyHandler.writeRequested(NettyHandler.java:99)