Directory代表多个invoker,其内部维护了一个list,并且这个list的内容时动态变化的(对于消费端来说,每个invoker代表一个服务提供者)。
在Dubbo中,RegistryDirectory和StaticDirectory都是Directory的实现类。
RegistryDirectory是一个动态服务目录,可以感知注册中心配置的变化,其持有的Invoker列表会随着注册中心内容的变化而变化。每次变化后,RegistryDirectory都会动态地增删Invoker,并调用Router的route方法进行路由,过滤掉不符合路由规则的Invoker。相反,StaticDirectory是一个静态服务目录,它内部存放的Invoker是不会变动的。
RegistryDirectory是Dubbo中默认使用的Directory,适用于服务提供者和消费者都动态变化的情况。而StaticDirectory适用于服务提供者和消费者相对固定,不需要频繁变动的场景。
RegistryDirectory是在服务消费端启动时创建的。
消费端启动时,通过 ReferenceConfig#get() 创建对服务提供方的远程调用代理类。最终在通过RegistryProtocol#refer() 创建invoker时创建了RegistryDirectory。具体实现细节如下所示。
public T get(boolean check) {
// ...
return ref;
}
protected synchronized void init(boolean check) {
// ...
// 创建对服务提供方的远程调用代理类
ref = createProxy(referenceParameters);
// ...
}
private T createProxy(Map<String, String> referenceParameters) {
// ...
// 创建invoker
createInvoker();
// ...
// create service proxy
return (T) proxyFactory.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
private void createInvoker() {
if (urls.size() == 1) {
URL curUrl = urls.get(0);
invoker = protocolSPI.refer(interfaceClass, curUrl);
// ...
} else {
List<Invoker<?>> invokers = new ArrayList<>();
URL registryUrl = null;
for (URL url : urls) {
invokers.add(protocolSPI.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
// use last registry url
registryUrl = url;
}
}
// ...
}
}
创建invoker的核心方法为?
invoker = protocolSPI.refer(interfaceClass, curUrl);
服务注册和发现使用是register协议,因此上述方法实际上将调用 RegistryProtocol#refer方法,实现如下所示。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = getRegistryUrl(url);
Registry registry = getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = (Map<String, String>) url.getAttribute(REFER_KEY);
String group = qs.get(GROUP_KEY);
if (StringUtils.isNotEmpty(group)) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(url.getScopeModel(), MergeableCluster.NAME), registry, type, url, qs);
}
}
Cluster cluster = Cluster.getCluster(url.getScopeModel(), qs.get(CLUSTER_KEY));
// 创建invoker
return doRefer(cluster, registry, type, url, qs);
}
protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url, Map<String, String> parameters) {
// ...
// 创建invoker
ClusterInvoker<T> migrationInvoker = getMigrationInvoker(this, cluster, registry, type, url, consumerUrl);
// 拦截invoker
return interceptInvoker(migrationInvoker, url, consumerUrl);
}
protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url, URL consumerUrl) {
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, invoker, consumerUrl, url);
}
return invoker;
}
public void onRefer(RegistryProtocol registryProtocol, ClusterInvoker<?> invoker, URL consumerUrl, URL registryURL) {
MigrationRuleHandler<?> migrationRuleHandler = ConcurrentHashMapUtils.computeIfAbsent(handlers, (MigrationInvoker<?>) invoker, _key -> {
((MigrationInvoker<?>) invoker).setMigrationRuleListener(this);
return new MigrationRuleHandler<>((MigrationInvoker<?>) invoker, consumerUrl);
});
migrationRuleHandler.doMigrate(rule);
}
public synchronized void doMigrate(MigrationRule rule) {
if (migrationInvoker instanceof ServiceDiscoveryMigrationInvoker) {
refreshInvoker(MigrationStep.FORCE_APPLICATION, 1.0f, rule);
return;
}
// initial step : APPLICATION_FIRST
MigrationStep step = MigrationStep.APPLICATION_FIRST;
float threshold = -1f;
try {
step = rule.getStep(consumerURL);
threshold = rule.getThreshold(consumerURL);
} catch (Exception e) {
logger.error(REGISTRY_NO_PARAMETERS_URL, "", "", "Failed to get step and threshold info from rule: " + rule, e);
}
if (refreshInvoker(step, threshold, rule)) {
// refresh success, update rule
setMigrationRule(rule);
}
}
private boolean refreshInvoker(MigrationStep step, Float threshold, MigrationRule newRule) {
if (step == null || threshold == null) {
throw new IllegalStateException("Step or threshold of migration rule cannot be null");
}
MigrationStep originStep = currentStep;
if ((currentStep == null || currentStep != step) || !currentThreshold.equals(threshold)) {
boolean success = true;
switch (step) {
case APPLICATION_FIRST:
migrationInvoker.migrateToApplicationFirstInvoker(newRule);
break;
case FORCE_APPLICATION:
success = migrationInvoker.migrateToForceApplicationInvoker(newRule);
break;
case FORCE_INTERFACE:
default:
// 服务启动时将调用此方法
success = migrationInvoker.migrateToForceInterfaceInvoker(newRule);
}
// ...
return true;
}
public boolean migrateToForceInterfaceInvoker(MigrationRule newRule) {
CountDownLatch latch = new CountDownLatch(1);
refreshInterfaceInvoker(latch);
if (serviceDiscoveryInvoker == null) {
// serviceDiscoveryInvoker is absent, ignore threshold check
this.currentAvailableInvoker = invoker;
return true;
}
// ...
return false;
}
protected void refreshInterfaceInvoker(CountDownLatch latch) {
clearListener(invoker);
if (needRefresh(invoker)) {
if (logger.isDebugEnabled()) {
logger.debug("Re-subscribing interface addresses for interface " + type.getName());
}
if (invoker != null) {
invoker.destroy();
}
invoker = registryProtocol.getInvoker(cluster, registry, type, url);
}
//...
}
最终将调用下述 InterfaceCompatibleRegistryProtocol#getInvoker() 方法创建RegistryDirectory。
public <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
DynamicDirectory<T> directory = new RegistryDirectory<>(type, url);
return doCreateInvoker(directory, cluster, registry, type);
}
其中InterfaceCompatibleRegistryProtocol为RegistryProtocol的子类。