dubbo服务调用进程浅析,源码学习

作者: 编程应用  发布:2019-10-05

笔记简述本学习笔记接上篇Dubbo 服务调用 源码,上一篇已经成功了invoker的生成,接下去便是现实的法子调用了,包罗了mock测试、负载均衡、重试、netty调用、以及尾声的结果等待和过期检查测量检验等多少个步骤,依次操作,达成远程央浼并获取结果的全经过操作。越多内容可看[目录]Dubbo 源码学习

姣好了劳务的最初化之后,生成了代办,生成的动态代理类对接口的秘技开展了打包,每趟调用都会调用到InvocationHandler的invoke()方法,此方法中会进行远程服务调用一些列复杂进度,诸如互联网通讯,编码,解码,体系化等,然后将结果回到。在InvokerInvocationHandler.invoker()方法中,末了调用invoker.invoke(new 福睿斯pcInvocation(method, args)),首先invoker为组织InvokerInvocationHandler传入的,具体品种为MockClusterInvoker,然后调用MockClusterInvoker.invoke-->FailfastClusterInvoker.doInvoke()

目录

public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        if (value.length() == 0 || value.equalsIgnoreCase("false")){//没有mock
            //no mock
            result = this.invoker.invoke(invocation);
   ..................................................................
}

Dubbo 服务调用 源码1、InvokerInvocationHandler 入口2、MockClusterInvoker mock入口3、AbstractClusterInvoker 负载均衡4、FailoverClusterInvoker 重试机制5、DubboInvoker invoke6、NettyChannel netty乞请7、Future 结果处理 & 超时检验

下一场调用到FailfastClusterInvoker.invoke(),此指标具备接口服务的RegistryDirectory,里面含有远程提供者的切实可行音信,是在劳务花费者开头化时,通过订阅zk相应节点获得的。此格局的调用进度相比较复杂,会调用负载均衡算法,依据早晚的政策,选用一个提供者,生成DubboInvoker对象。调用进程:AbstractClusterInvoker.invoke()--> FailfastClusterInvoker.doInvoke()

基于动态代理的认知,最终反射推行的主意确定到InvokerInvocationHandler类的invoke方法中

public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance;//加载loadbalance
        List<Invoker<T>> invokers = list(invocation);//根据method加载invokers
        if (invokers != null && invokers.size() > 0) {//默认采用随机策略的loadbalance
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        /** invocation:方法,参数类型,参数
         *  invokers:此接口提供端的Invoker;
         *  loadbalance:均衡负载
         */
        return doInvoke(invocation, invokers, loadbalance);
    }

1、InvokerInvocationHandler 入口

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // 获取方法的名称以及方法的参数信息 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals; } // 此时invoker是MockClusterInvoker // 还拼接生成了一个RpcInvocation return invoker.invoke(new RpcInvocation(method, args)).recreate();}

赶来了MockClusterInvoker类,此时内需专一到MockClusterInvoker类的invoke是FailoverClusterInvokerFailoverClusterInvoker类能够拓宽重试操作,只要有影像的能够领会在一个reference的xml配置中,能够加上海重型机器厂试次数retries属性字段的值,默许是3次,固然设置了小于0的数字,则为1次,重试次数0位的意味就是只实行一遍操作

下边看什么加载invokers的,步入到 list(invocation)方法,调用进程:AbstractDirectory.list()-->RegistryDirectory.doList()

2、MockClusterInvoker mock入口

public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 注册中心,服务提供方、服务调用方的信息都存储在directory中,后期均衡负责也是处理这里面的数据 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString; // 就是查看 `methodName.mock`或者`mock`的属性值,默认是“false” if (value.length() == 0 || value.equalsIgnoreCase{ // 不需要走Mock测试,进入到FailoverClusterInvoker中 result = this.invoker.invoke(invocation); } else if (value.startsWith { if (logger.isWarnEnabled { logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl; } //force:direct mock result = doMockInvoke(invocation, null); } else { //fail-mock try { result = this.invoker.invoke(invocation); }catch (RpcException e) { if ) { throw e; } else { if (logger.isWarnEnabled { logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl; } result = doMockInvoke(invocation, e); } } } return result;}
 public List<Invoker<T>> doList(Invocation invocation) {
         ........................................................
        List<Invoker<T>> invokers = null;
        // 本地method-->lnvokers对应关系
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; 
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
             //获取方法名称和参数
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if(args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
            }
            if(invokers == null) {//根据方法获取
                invokers = localMethodInvokerMap.get(methodName);
            }
            if(invokers == null) {//获取*匹配的
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if(invokers == null) {//获取所有
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }

3、AbstractClusterInvoker 负载均衡

跻身到FailoverClusterInvoker类从前先步向到AbstractClusterInvoker类中

public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); // 筛选出合适的invokers列表,基于方法和路由信息 // 其中路由信息则是通过MockInvokersSelector类处理获取到invocation中attachments保存的mock信息去筛选合适的invoker,所以重点是筛选 // 调试发现,一般情况下在这里面attachments字段并没有数据 if (invokers != null && invokers.size { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get.getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } // 创建合适的均衡负责类loanbalance信息,一般情况是RandomLoadBalance类 RpcUtils.attachInvocationIdIfAsync, invocation); // 确保幂等,如果是异步则需要往attachment参数中添加自增ID(这个自增ID是AtomicLong类,线程安全) // 这里就有往invocation的attachment填充数据的操作 return doInvoke(invocation, invokers, loadbalance); // 现在进入到FailoverClusterInvoker类中了}

上边回到AbstractDirectory.list(),在那个法子中进行路由准绳的合作,相配准则,路由法则也是spi机制的,有脚本、文件、表达式,日常经过劳动治理基本布局,与接口服务关系,具体参见dubbo官方文档。路由优秀成功未来,获取到provider端的全部可用的Invokers,回到AbstractClusterInvoker.invoke()方法,最后回到provider端可用的Invokers,然后调用FailfastClusterInvoker.doInvoke():

4、FailoverClusterInvoker 重试机制

上边已经说了,那一个类的主要功用是重试操作

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { // 第一个是执行的参数信息,包含了函数名等信息 // 第二个是被调用执行的参数,包含了服务提供方的IP:PORT信息 // 第三个是均衡负责,在选择调用的服务方时,会根据该对象选择一个合适的服务方 List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); // 检测invokers是否存在,如果不存在则提示没有可用的服务提供方被使用,请检查服务提供方是否被注册 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; // 获取重试的次数,如果设置的值<=0,则只有1次操作机会,默认是3次 if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size; // invoked invokers. Set<String> providers = new HashSet<String>; for (int i = 0; i < len; i++) { //重试时,进行重新选择,避免重试时invoker列表已发生变化. //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 if  { checkWheatherDestoried(); copyinvokers = list(invocation); //重新检查一下 // 注意一下这个list操作,这个list操作是重新更新可用的invoker列表 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); // 选择合适的服务提供方的invoker,在AbstractClusterInvoker类中去完成均衡负责的选择操作 // 关于均衡负责,后面考虑分为一篇笔记去学习几种不同的负载方法,其中还包含了sticky 粘性连接 invoked.add; RpcContext.getContext().setInvokersinvoked); try { Result result = invoker.invoke(invocation); // 这一步才是真正的执行调用远程方法的开始&入口 if (le != null && logger.isWarnEnabled { // 存在重试了3次才终于成功的情况,这时候会告警提醒之前存在的错误信息输出 logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage; } return result; } catch (RpcException e) { // 遇到了RPCEXCEPTION 而且是biz类型的,则不重试直接抛出该异常 if ) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage; } finally { providers.add(invoker.getUrl().getAddress; } } // 重试多次依旧没有正常的结果返回,则抛出该异常 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage, le != null && le.getCause() != null ? le.getCause;}
 public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);//检查invokers的可用性
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
           .........................
        }
    }

5、DubboInvoker invoke

上边说的Result result = invoker.invoke(invocation);,经过层层转发,来到了FutureFilter类

public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 看是否为异步的方法,添加有sync字段信息 // 先从invocation的attachment中查看是否存在async字段,再看看url中的methodName.async ,再看看url的async属性 fireInvokeCallback(invoker, invocation); Result result = invoker.invoke(invocation); // 进一步invoke操作 if  { asyncCallback(invoker, invocation); } else { syncCallback(invoker, invocation, result); } return result;}

赶到了MonitorFilter过滤器查看是还是不是供给打开监察(通过查阅url是或不是留存monitor字段,假如为true,则是索要监察和控制)

再来到了AbstractInvoker类的invoke方法,自个儿是DubboInvoker

public Result invoke(Invocation inv) throws RpcException { if(destroyed) { throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is DESTROYED, can not be invoked any more!"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker; if (attachment != null && attachment.size { invocation.addAttachmentsIfAbsent(attachment); // 添加attachment信息,调试中发现添加的是interface和token } Map<String, String> context = RpcContext.getContext().getAttachments(); // 这个是利用了ThreadLocal持有的数据中获取 if (context != null) { // 这代码写的冗余了,而且为啥不再加个empty的检测呢? invocation.addAttachmentsIfAbsent; } if .getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){ invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString; // 如果是异步的方法,添加async字段, } RpcUtils.attachInvocationIdIfAsync, invocation); // 如果是异步则添加自增ID try { return doInvoke(invocation); // 进入到DubboInvoker执行invoke操作了 } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return new RpcResult; } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return new RpcResult; } } catch (RpcException e) { if ) { return new RpcResult; } else { throw e; } } catch (Throwable e) { return new RpcResult; }}

DubboInvoker 类

protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath; inv.setAttachment(Constants.VERSION_KEY, version); // 添加路径和版本概念,如果没有添加则是0.0.0 ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } // currentClient 是 后续需要连接netty操作的客户端 try { boolean isAsync = RpcUtils.isAsync, invocation); // 是否为异步操作。。。。为啥确认个异步操作这么多重复操作 boolean isOneway = RpcUtils.isOneway, invocation); // 是否设置了return=false 这个操作 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); // 超时设置的时间,默认为1s if  { // 如果强制设置了return=false,异步的future都不需要设置了,也不需要关注超时字段 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture; return new RpcResult(); } else if  { ResponseFuture future = currentClient.request(inv, timeout) ; // 调用的是request方法,异步的设置future RpcContext.getContext().setFuture(new FutureAdapter<Object>; return new RpcResult(); } else { RpcContext.getContext().setFuture; // 同步方法,设置超时时间,等待返回 // 其实也是异步方法,只是最后调用了get去获取future的结果 return  currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage; } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage; }}

select(loadbalance, invocation, invokers, null)会依赖loadbalance举办负荷均衡算法相称,此处为随意挑选:
com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance

6、NettyChannel netty请求

上述的request以及send方法,都被转接到HeaderExchangeChannel类中,这么些类有贰个不行关键的字段是channel,是NettyClient类,富含了劳务提供方的IP:PORT音信

实际上稳重看request方法和send方法最终的兑现差不太多,只是request须求检查评定三番五次的channel是不是存在,而send单独自己是没有须求开展那几个操作的。

public ResponseFuture request(Object request, int timeout) throws RemotingException { if  { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion; req.setTwoWay; req.setData; // 生成的req有个线程安全的自增的ID,可以通过这个统计出调用的次数 DefaultFuture future = new DefaultFuture(channel, req, timeout); try{ channel.send; // 进入到NettyChannel类中 }catch (RemotingException e) { future.cancel(); throw e; } return future; // 返回future,后续的超时就是通过对future操作}

NettyChannel 类

public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { ChannelFuture future = channel.write; // 这个就是调用的netty的write操作完成数据发送操作 // 这个就是经过层层嵌套包装向外发送数据的最终操作 if  { // url配置的send字段属性,如果为true // 则通过await等待超时的世界去查看请求是否成功 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await; } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage; // 抛出远程发送消息失败的错误,打印出发送参数以及远程IP } if(! success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); }}
 protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int totalWeight = 0; // 总权重
        boolean sameWeight = true; // 权重是否都一样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 计算所有权重是否一样
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果权重不相同且权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 并确定随机值落在哪个片断上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }

7、Future 结果管理 & 超时检查评定

会见异步得到结果,判别是不是过期等检查实验操作

DefaultFuture 类

public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (! isDone { // 这个时候还是异步执行的,会立即执行到这里(时间非常的端,相比RPC的几百毫秒而言) long start = System.currentTimeMillis(); lock.lock(); try { while (! isDone { // 时刻观察是否拿到response done.await(timeout, TimeUnit.MILLISECONDS); if  || System.currentTimeMillis() - start > timeout) { // 如果拿到结果或者超时了,跳出循环 break; } } } catch (InterruptedException e) { throw new RuntimeException; } finally { lock.unlock(); } if (! isDone { // 这个时候还没拿到结果,肯定是认为超时了,抛出TimeoutException throw new TimeoutException(sent > 0, channel, getTimeoutMessage; } } return returnFromResponse();}private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { // 拿到的结果是无效的 throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { // 这才是真的调用成功,返回数据了 return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 客户端超时或者服务端超时,抛出TimeoutException throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage; } // 其他的就抛出RemotingException异常,并从res获取错误原因 throw new RemotingException(channel, res.getErrorMessage;}

迄今截至整个的远程调用就全部甘休了

回到FailfastClusterInvoker.doInvoke(),依据负荷均衡算法重临贰个Invoker,为dubboInvoker,上面进行DubboInvoke.invoke()的调用

  protected Result doInvoke(final Invocation invocation) throws Throwable {
        //RpcInvocation封装了调用方法的对象和参数类型,具体参数
        RpcInvocation inv = (RpcInvocation) invocation;
         //获取具体的方法名称
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        //调用客户端,在refer中已经初始化好了,为HeaderExchangeClient
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {//没有返回参数
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {//异步
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {//同步返回
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

上边剖析常用的同步重回,踏向HeaderExchangeClient.request()方法:

 public ResponseFuture request(Object request, int timeout) throws RemotingException {
        return channel.request(request, timeout);
    }

上边方法的channel为HeaderExchangeChannel,在HeaderExchangeClient开首化的时候一向最初化的,里面封装了NettyClient:
com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel

 public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);//request为RpcInvocation对象
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

地方方法中channel为NettyClient,步入NettyClient.send()-->NettyChannel.send(),channel是netty的贰个零部件,担当顾客端与服务端之间的链路传递,调用Netty框架的IO事件现在会触发Netty框架的IO事件处理链。
com.alibaba.dubbo.remoting.transport.netty.NettyChannel:

public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        boolean success = true;
        int timeout = 0;
        try {
            //调用netty channel写入请求 
            ChannelFuture future = channel.write(message);
            if (sent) {//是否等待请求消息发出,默认不等待
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout);
            }
            Throwable cause = future.getCause();
            if (cause != null) {// 等待消息发出,消息发送失败将抛出异常
                throw cause;
            }
        } catch (Throwable e) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
        }

        if(! success) {
            throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                    + "in timeout(" + timeout + "ms) limit");
        }
    }

下边来深入分析费用端和提供端互连网诉求的局地,利用netty的encoder和decoder,来写入公约内容和乞请数据,这里提到到类别化学工业机械制,dubbo暗中认可使用hessian系列化,进度为:开支端将诉求编码-->发送-->提供端将须求解码-->响应诉求-->提供端响应编码-->发送-->费用端响应解码-->结果回到

1) 花费端诉求编码

花费端初步化nettyclient时,添加了事件管理链:NettyCodecAdapter.decoder-->NettyCodecAdapter.encoder-->NettyHandler,NettyCodecAdapter.decoder为上行事件管理器,NettyCodecAdapter.encoder为下行事件管理器,NettyHandler为上下行事件管理器,chanel.write()是贰个下水事件,NettyCodecAdapter.decoder和NettyHandler将会被调用,调用顺序为NettyHandler-->NettyCodecAdapter.decoder
调用NettyHandler中的writeRequested方法:(从注释能够见见)

   /**
     * Invoked when a message object (e.g: {@link ChannelBuffer}) was received
     * from a remote peer.
     */
  @Override
    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        super.writeRequested(ctx, e);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.sent(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

handler.sent()调用链路为:NettyClient.send()-->AbstractPeer.send()

  public void sent(Channel ch, Object msg) throws RemotingException {
        if (closed) {
            return;
        }
        handler.sent(ch, msg);
    }

此handler为DubboProtocol一路传过来的,包装了几层

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

调用链路为:DecodeHandler.send()-->HeaderExchangeHandler.send()-->ExchangeHandlerAdapter.send(),最终调到ChannelHandlerAdapter.send()未有其余操作,NettyHandler就调用完成了。下边看核心encoder的拍卖,调用方法为:InternalEncoder.encode()

protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
            return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
        }

codec是NettyCodecAdapter开端化时传过来的,类型为DubboCountCodec,DubboCountCodec.encode()会调用到DubboCodec.encode()

 public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

紧接着调用到encodeRequest(),下边包车型地铁议程是根据dubbo协议举行编码,dubbo左券头,长度为拾九个字节,共129个人,的剧情如下:

图片 1

协议头.PNG

0-15:魔数
16-20:序列化ID
21:是还是不是事件数量
22:需求响应标记
23:标记是伸手依旧响应
24-31:状态(req为空)
32-95:请求id
96-127:实体数据长度

     protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
        Serialization serialization = getSerialization(channel);
        // header.
        byte[] header = new byte[HEADER_LENGTH];//16个字节的消息头
        // set magic number.
        Bytes.short2bytes(MAGIC, header);//前两个字节魔数

        // 第三个字节标识序列化id、请求信息等,对应16-23位
        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

        if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;//
        if (req.isEvent()) header[2] |= FLAG_EVENT;

        //32-95 请求id
        Bytes.long2bytes(req.getId(), header, 4);

        // encode request data.
        int savedWriteIndex = buffer.writerIndex();
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        if (req.isEvent()) {//序列化数据
            encodeEventData(channel, out, req.getData());
        } else {
            encodeRequestData(channel, out, req.getData());
        }
        out.flushBuffer();
        bos.flush();
        bos.close();
        int len = bos.writtenBytes();
        checkPayload(channel, len);//检查请求数据实体长度
        Bytes.int2bytes(len, header, 12);//数据实体长度写入请求头

        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header.
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    }

下边步向 encodeRequestData(channel, out, req.getData()),看看类别化,暗许使用hessian2种类化,新闻体数据包涵dubbo版本号、接口名称、接口版本、方法名称、参数类型列表、参数、附加新闻,把它们按梯次依次体系化,数据写入到品种为ChannelBuffer的buffer参数中,接着将ChannelBuffer封装成netty的org.jboss.netty.buffer.ChannelBuffer,netty会将数据写到链路发送到服务端。

 @Override
    protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation) data;

        out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
        out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
        out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null)
        for (int i = 0; i < args.length; i++){
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
        out.writeObject(inv.getAttachments());
    }
2) 提供端响应恳求

此步骤包蕴:提供端将要求解码,响应乞请,将结果编码,重返到花费端
2.1 提供端将央求解码
顾客端将数据发送到服务端,netty服务端会触发decoder和handler八个电脑,踏入com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter.InternalDecoder.messageReceived()方法,
先是将netty的ChannelBuffer转成dubbo的ChannelBuffer,这里需求管理半包难点。

 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
   .................................................
   NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            Object msg;
            int saveReaderIndex;

            try {
                // decode object.
                do {//循环处理半包问题
                    saveReaderIndex = message.readerIndex();//保存当前解析的位置等待链路的下次IO事件
                    try {
                        msg = codec.decode(channel, message);如果数据不完整返回NEED_MORE_INPUT
                    } catch (IOException e) {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                        throw e;
                    }
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {//从上次解析位置读取
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        if (saveReaderIndex == message.readerIndex()) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw new IOException("Decode without read data.");
                        }
                        if (msg != null) {
                            Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                        }
                    }
                } while (message.readable());
     .................................................
  }

DubboCodec.decode()->ExchangeCodec.decode(),进入到ExchangeCodec.decode():

 protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
   ....................................................//检查协议头中的长度、魔数等
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    return decodeBody(channel, is, header);
}

下边走入decodeBody(),首先初步化贰个Request对象,创设DecodeablePRADOpcInvocation对象,此目标为rpcInvocation的子类,调用此指标的decode()连串化出央浼的method,paramtype等音讯set到DecodeableLANDpcInvocation中,最终将此指标set到request的data对象中。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
        }else {
              Object data;
             ......................................
               DecodeableRpcInvocation inv;
               if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
               } else {
                        inv = new DecodeableRpcInvocation(channel, req,
                                                          new UnsafeByteArrayInputStream(readMessageData(is)), proto);
               }
                    data = inv;
            }
          req.setData(data);
         ..................................
          return req;
        }

回调InternalDecoder.messageReceived()方法,调用Channels.fireMessageReceived()方法,激活下二个Computer的messageReceived事件,并且把解码后的靶子封装在Message伊夫nt中。

2.2 提供端管理诉求
decoder之后,继续出发netty的下三个管理链handler,步向NettyHandler的messageReceived()方法,

   @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

此handler为createServer传递过来的,封装了多层,涉及到劳动调用线程的开首化,具体可看《dubbo 线程模型浅析》,此处只分析职分丢到线程池的施行进程:DecodeHandler->HeaderExchangeHandler->ExchangeHandler

 public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

末段调到HeaderExchangeHandler.receive()方法,此处的message为刚才decoder传递过来的request类型

 public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            }
         .......................................................
    }

跻身handleRequest(),收取request中的data对象,为invocation,然后步向到DubboProtocol内部类ExchangeHandler艾达pter的reply()方法

 Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        ..........................................................
        // find handler by message class.
        Object msg = req.getData();
        try {
            // handle data.
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }

在DubboProtocol.ExchangeHandlerAdapter.reply()中,传入的参数分别是ExchangeChannel、Invocation,根据这四个参数拼装serviceKey,然后从exporterMap中找到Invoker,exportMap在服务发表的时候,DubboProtocol.export()方法put进去的。

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);//执行调用
            }
      ...................................................

此invoker为包装档期的顺序,包括filter实践链、提供端接口实现类的包裹类,实行invoke()方法,先调用filter推行链,然后调用AbstractProxyInvoker完毕类的doInvoke()方法,举行调用的包装类的invokeMethod()方法,包装类具备具体达成类,至此调用实现,回到HeaderExchangeHandler.receive()方法,调用channel.send()写到客商端

 Response response = handleRequest(exchangeChannel, request);
 channel.send(response);

写到顾客端依旧会触发netty事件链,接着调用NettyCodecAdapter.encoder和NettyHandler,InternalEncoder.encode()-->ExchangeCodec.encode():

public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
        if (msg instanceof Request) {
            encodeRequest(channel, buffer, (Request) msg);
        } else if (msg instanceof Response) {
            encodeResponse(channel, buffer, (Response) msg);
        } else {
            super.encode(channel, buffer, msg);
        }
    }

在encodeResponse()中,与花费端同样,将合计头,结果作为信息体写入ChannelBuffer中,在encodeResponse()中调用了encodeResponseData(),进行种类化

 protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result) data;

        Throwable th = result.getException();
        if (th == null) {//处理结果没有异常
            Object ret = result.getValue();
            if (ret == null) {//结果为null
                out.writeByte(RESPONSE_NULL_VALUE);
            } else {//结果不为空
                out.writeByte(RESPONSE_VALUE);
                out.writeObject(ret);
            }
        } else {//处理结果异常
            out.writeByte(RESPONSE_WITH_EXCEPTION);
            out.writeObject(th);
        }
    }

回来InternalEncoder.encode()中,调用ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()),将ChannelBuffer写回链路,服务端全数拍卖终结。

2.3花费端响应结果
服务端写多少到客商端调用netty事件管理器:NettyCodecAdapter.decoder和NettyHandler,调用链路:NettyCodecAdapter.InternalDecoder. messageReceived()-->ExchangeCodec.decode()-->DubboCodec.decodeBody()

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
 byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
        // get request id.
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // decode response.
            Response res = new Response(id);
            if ((flag & FLAG_EVENT) != 0) {
                res.setEvent(Response.HEARTBEAT_EVENT);
            }
            // get status.
            byte status = header[3];
            res.setStatus(status);
            if (status == Response.OK) {
                try {
                    Object data;
                    ............................................
                        DecodeableRpcResult result;
                        if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                            result = new DecodeableRpcResult(channel, res, is,
                                                             (Invocation)getRequestData(id), proto);
                            result.decode();//DecodeableRpcResult.decode()
                        } 
                      .......................................
                        data = result;
                    }
                    res.setResult(data);
              .....................................................
            return res;
        } else{
      }

}

上面包车型地铁方法会调用DecodeableENCOREpcResult.decode(),调完事后,将DecodeableHighlanderpcResult set到response对象的data属性中,DecodeableEscortpcResult为索罗德pcResult的子类,步入到DecodeableLANDpcResult.decode():

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);

        byte flag = in.readByte();
        switch (flag) {
            case DubboCodec.RESPONSE_NULL_VALUE://正常返回,空值
                break;
            case DubboCodec.RESPONSE_VALUE://正常返回,有值
                try {
                    Type[] returnType = RpcUtils.getReturnTypes(invocation);
                    setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                                 (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                     : in.readObject((Class<?>) returnType[0], returnType[1])));
                } catch (ClassNotFoundException e) {
                    throw new IOException(StringUtils.toString("Read response data failed.", e));
                }
                break;
            case DubboCodec.RESPONSE_WITH_EXCEPTION://异常返回
             ..................................
            default:
                throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }
        return this;
    }

在推行结果有值,平常重临的意况下,会连串化出重返值,调用setValue()设置到result中。
下一场触发NettyHandler事件,调用NettyHandler中的messageReceived():

 @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }

其一handler的包装关系在前头早就表达:DecodeHandler->HeaderExchangeHandler->ExchangeHandler,一路调用直到HeaderExchangeHandler.received():

 public void received(Channel channel, Object message) throws RemotingException {
  ........................
  else if (message instanceof Response) {
      handleResponse(channel, (Response) message);
   }
  .......................
}

handleResponse()的调用链路为:DefaultFuture.received()-->DefaultFuture.doReceive():

private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;//将resposne设置到DefaultFuture中
            if (done != null) {
                done.signal();//唤起消费端调用等待线程
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }

调用 done.signal()的法力是孳生花费端调用等待线程,在费用端向提供端发送调用央浼后,会调用DefaultFuture.get阻塞等待响应结果,回到DubboInvoker.doInvoke()方法:

 protected Result doInvoke(final Invocation invocation) throws Throwable {
            ..............................
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
              .............................
            } else if (isAsync) {
            .............................
            } else {
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
   }

currentClient.request()最后调到HeaderExchangeChannel.request(Object request, int timeout):

public ResponseFuture request(Object request, int timeout) throws RemotingException {
       ...........................................
        // create request.
        Request req = new Request();//初始化的时候会设置一个id
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);//RpcInvocation对象
        //DefaultFuture初始化的时候会将id 当前对象保存起来,以备返回的时候使用rpcInvocation
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            channel.send(req);//调用netty发送
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

能够见见重回的是DefaultFuture对象,步入此指标的get()方法:

  public Object get(int timeout) throws RemotingException {
       ..............................................
         while (! isDone()) {//等待的条件为response为null
              done.await(timeout, TimeUnit.MILLISECONDS);
              if (isDone() || System.currentTimeMillis() - start > timeout) {
                   break;
                }
             }
         } 
    ..............................................
        return returnFromResponse();
    }

  private Object returnFromResponse() throws RemotingException {
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null");
        }
        if (res.getStatus() == Response.OK) {
            //see Dubbocodec.decodebody(),返回 DecodeableRpcResult
            return res.getResult();
        }
        if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
        }
        throw new RemotingException(channel, res.getErrorMessage());
    }

归来DubboInvoker.doInvoke(),将结果回到,回到最早的InvokerInvocationHandler.invoke()

 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

通过一些列复杂的进程,推行 invoker.invoke(new 哈弗pcInvocation(method, args))完结,再次来到ecodeable凯雷德pcResult,然后调用它的recreate()获取result值,此值就是提供端的试行结果,具体能够纪念上面包车型大巴流程,将现实的结果再次回到,整个rpc调用流程完结。

参考:
http://blog.csdn.net/pentiumchen/article/details/53227844
http://www.imooc.com/article/details/id/22597
http://blog.kazaff.me/2015/02/02/dubbo%E7%9A%84%E6%9C%8D%E5%8A%A1%E6%B2%BB%E7%90%86%E7%BB%86%E8%8A%82/
http://blog.csdn.net/manzhizhen/article/details/73436619

本文由金沙澳门官网送注册58发布于编程应用,转载请注明出处:dubbo服务调用进程浅析,源码学习

关键词:

上一篇:没有了
下一篇:没有了