Dubbo的几个负载均衡类-随机

发布时间:2024年01月17日

-----------------看过之前一致性哈希和最少活跃书的可以跳过-----------------?

链接在此:Dubbo的几个负载均衡类--一致性哈希

Dubbo的几个负载均衡类--最少活跃数

消费者发起调用过程中涉及如下几步

1:接口调用,比如DemoService.demoMethod

2:InvokerInvocationHandler.invoker:消费端启动时,通过JavassistProxyFactory.getProxy反射获取代理类,之后服务调用就直接调用这个Handler

3:MigrationInvoker.invoke:Dubbo 发起调用非常重要的一步,如果失败了,通过这个invoker做切换

4:其他

5:FailoverClusterInvoker.invoke(目前我们使用的,实际在AbstractClusterInvoker里面invoke逻辑是固定的)

6:其他

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
 
        // binding attachments into invocation.
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }
        //如果设置了标签规则,则通过list方法过滤出来符合标签的几个invoker
        List<Invoker<T>> invokers = list(invocation);
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //请求负载并且做好灾备降级
        return doInvoke(invocation, invokers, loadbalance);
    }
 
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        int len = calculateInvokeTimes(methodName);
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            //这里通过loadBalance做负载
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers
                            + " (" + providers.size() + "/" + copyInvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method "
                + methodName + " in the service " + getInterface().getName()
                + ". Tried " + len + " times of the providers " + providers
                + " (" + providers.size() + "/" + copyInvokers.size()
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }

-----------------看过之前一致性哈希的可以跳过-----------------?
随机其实并不是我们想象的那么随机

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // Every invoker has the same weight?
        boolean sameWeight = true;
        // the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
        int[] weights = new int[length];
        // The sum of weights
        int totalWeight = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // Sum
            totalWeight += weight;
            // save for later use
            weights[i] = totalWeight;
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
                if (offset < weights[i]) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }

整体代码不多,还是要先获取权重值,这个我们在上一章的最少活跃数里面讲过

方法getWeight是处理那些启动在10分钟内的服务,做一定的服务负载降级。超过十分钟,那么返回的负载都是100,如果小于1分钟那么就是1,1分钟到10分钟之间分别的是分钟数的平方值。

基本针对的就是我们在版本发布的时候,需要启动某个服务的所有实例,这时候越早启动的权重值越大,这么做的原因其实也很合理,越早启动的越稳定,包括各种缓存热点的加载,连接数/链接池的初始化等等

?什么时候totalWeight==weight*(i+1)呢?

就是所有的服务方服务都在1分钟内重启

然后和最少活跃数中的逻辑一样,如果权重值一样,那么就随机一个;如果不一样,就在总权重和中随机一个值,直到减完小于0

总结:随机并不是我们认为的随机,而是取决于线上服务的启动时间,如果短时间内所有的服务同时启动或者服务启动很长一段时间了,那么就是随机,如果短时间内,服务间断启动,那么还是会有一定的权重的

文章来源:https://blog.csdn.net/u010187815/article/details/135638107
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。