服务消费端Directory目录的创建与更新

发布时间:2024年01月05日

1?Directory目录概述

Directory代表多个invoker,其内部维护了一个list,并且这个list的内容时动态变化的(对于消费端来说,每个invoker代表一个服务提供者)。

在Dubbo中,RegistryDirectory和StaticDirectory都是Directory的实现类。

RegistryDirectory是一个动态服务目录,可以感知注册中心配置的变化,其持有的Invoker列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory都会动态地增删Invoker,并调用Router的route方法进行路由,过滤掉不符合路由规则的Invoker。相反,StaticDirectory是一个静态服务目录,它内部存放的Invoker是不会变动的。

RegistryDirectory是Dubbo中默认使用的Directory,适用于服务提供者和消费者都动态变化的情况。而StaticDirectory适用于服务提供者和消费者相对固定,不需要频繁变动的场景。

2 RegistryDirectory的创建

RegistryDirectory是在服务消费端启动时创建的。

消费端启动时,通过 ReferenceConfig#get() 创建对服务提供方的远程调用代理类。最终在通过RegistryProtocol#refer() 创建invoker时创建了RegistryDirectory。具体实现细节如下所示。

     public T get(boolean check) {
        // ...

        return ref;
    }   


    protected synchronized void init(boolean check) {
            // ...

            // 创建对服务提供方的远程调用代理类
            ref = createProxy(referenceParameters);

            // ...
    }


    private T createProxy(Map<String, String> referenceParameters) {
        // ...

        // 创建invoker
        createInvoker();

        // ...

        // create service proxy
        return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }


    private void createInvoker() {
        if (urls.size() == 1) {
            URL curUrl = urls.get(0);
            invoker = protocolSPI.refer(interfaceClass, curUrl);
            
            // ...
        } else {
            List<Invoker<?>> invokers = new ArrayList<>();
            URL registryUrl = null;
            for (URL url : urls) {
                invokers.add(protocolSPI.refer(interfaceClass, url));

                if (UrlUtils.isRegistry(url)) {
                    // use last registry url
                    registryUrl = url;
                }
            }

            // ...
        }
    }

创建invoker的核心方法为?

invoker = protocolSPI.refer(interfaceClass, curUrl);

服务注册和发现使用是register协议,因此上述方法实际上将调用 RegistryProtocol#refer方法,实现如下所示。

    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
        Registry registry = getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
        String group = qs.get(GROUP_KEY);
        if (StringUtils.isNotEmpty(group)) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
            }
        }

        Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));

        // 创建invoker
        return doRefer(cluster, registry, type, url, qs);
    }


    protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
        // ...

        // 创建invoker
        ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);

        // 拦截invoker
        return interceptInvoker(migrationInvoker, url, consumerUrl);
    }


    protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, invoker, consumerUrl, url);
        }
        return invoker;
    }


    public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) {
        MigrationRuleHandler<?> migrationRuleHandler = ConcurrentHashMapUtils.computeIfAbsent(handlers, (MigrationInvoker<?>) invoker, _key -> {
            ((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);
            return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
        });

        migrationRuleHandler.doMigrate(rule);
    }


    public synchronized void doMigrate(MigrationRule rule) {
        if (migrationInvoker instanceof ServiceDiscoveryMigrationInvoker) {
            refreshInvoker(MigrationStep.FORCE_APPLICATION, 1.0f, rule);
            return;
        }

        // initial step : APPLICATION_FIRST
        MigrationStep step = MigrationStep.APPLICATION_FIRST;
        float threshold = -1f;

        try {
            step = rule.getStep(consumerURL);
            threshold = rule.getThreshold(consumerURL);
        } catch (Exception e) {
            logger.error(REGISTRY_NO_PARAMETERS_URL, "", "", "Failed to get step and threshold info from rule: " + rule, e);
        }

        if (refreshInvoker(step, threshold, rule)) {
            // refresh success, update rule
            setMigrationRule(rule);
        }
    }


    private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
        if (step == null || threshold == null) {
            throw new IllegalStateException("Step or threshold of migration rule cannot be null");
        }
        MigrationStep originStep = currentStep;

        if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
            boolean success = true;
            switch (step) {
                case APPLICATION_FIRST:
                    migrationInvoker.migrateToApplicationFirstInvoker(newRule);
                    break;
                case FORCE_APPLICATION:
                    success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
                    break;
                case FORCE_INTERFACE:
                default:
                    // 服务启动时将调用此方法
                    success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
            }

            // ...
        return true;
    }


    public boolean migrateToForceInterfaceInvoker(MigrationRule newRule) {
        CountDownLatch latch = new CountDownLatch(1);
        refreshInterfaceInvoker(latch);

        if (serviceDiscoveryInvoker == null) {
            // serviceDiscoveryInvoker is absent, ignore threshold check
            this.currentAvailableInvoker = invoker;
            return true;
        }

        // ...

        return false;
    }


    protected void refreshInterfaceInvoker(CountDownLatch latch) {
        clearListener(invoker);
        if (needRefresh(invoker)) {
            if (logger.isDebugEnabled()) {
                logger.debug("Re-subscribing interface addresses for interface " + type.getName());
            }

            if (invoker != null) {
                invoker.destroy();
            }
            invoker = registryProtocol.getInvoker(cluster, registry, type, url);
        }
        
        //...
    }

最终将调用下述 InterfaceCompatibleRegistryProtocol#getInvoker() 方法创建RegistryDirectory。

    public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
        DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
        return doCreateInvoker(directory, cluster, registry, type);
    }

其中InterfaceCompatibleRegistryProtocol为RegistryProtocol的子类。

3 RegistryDirectory中invoker列表的更新

文章来源:https://blog.csdn.net/J_bean/article/details/135400121
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。