二、Nacos源码系列:Nacos服务注册流程(一)

发布时间:2024年01月17日

目录

一、名词解释

二、何时触发Nacos的自动注册流程?

三、客户端发起服务注册请求

3.1、配置文件解析

3.2、创建NamingService

?3.3、获取Instance实例

3.4、注册服务实例

3.4.1、缓存实例

3.4.2、gRPC注册实例

四、Nacos服务端处理服务注册请求

4.1、Grpc处理类

4.2、处理Grpc注册实例请求

4.2.1、获取Service?

4.2.2、获取实例发布信息

4.2.3、添加服务实例到发布列表中

4.3、订阅者处理ClientRegisterServiceEvent事件

五、总结


一、名词解释

在学习Nacos源码之前,有必要再次熟悉下其中一些重要的名词:

  • 命名空间(Namespace):逻辑隔离服务组和服务,默认公共命名空间(public)。
  • 组(group):多个服务归类,如用户组有用户、积分、会员等服务。
  • 服务(Service)、集群(Cluster)、实例(Instance):对标分级(三级)存储模型。
  • 注册(注册者、注册表):充当服务提供者角色,向nacos注册自己的服务实例信息;注册者为服务提供者;注册表为存储注册服务的数据结构(ConcurrentMap<Service, Set<String>>)。
  • 订阅(订阅者、订阅表):充当服务消费者角色,向nacos拉取自己依赖的服务;订阅者为服务消费者;订阅表为存储订阅客户端的数据结构(ConcurrentMap<Service, Set<String>>)。
  • 延时任务执行引擎(NacosDelayTaskExecuteEngine):定时执行任务,一般通过ScheduledExecutorService实现的。
  • 执行任务执行引擎(NacosExecuteTaskExecuteEngine):执行任务,类似ThreadPoolExecutor。Nacos为了减少线程切换,一般采用Thread+BlockingQueue实现。

首先我们需要搭建好Nacos的源码阅读环境,大家可参考前面一篇文章去搭建。为了充分理解Nacos的整个服务注册过程,我们带着下面三个问题去阅读源码:

  • 1、Nacos客户端SpringBoot项目一启动,就自动注册到Nacos,这个过程是怎么触发的,何时触发的?
  • 2、Nacos客户端怎么提交注册服务请求?
  • 3、Nacos服务端怎么处理客户端提交过来的注册请求?

Nacos源码版本: 2.3.0

二、何时触发Nacos的自动注册流程?

我们首先要了解服务自动注册到Nacos是怎么触发的,为什么我们直接运行一下SpringBoot启动类的main方法之后,就可以自动完成服务注册到Nacos这一系列工作。

以下面的启动类为例,首先来分析一下Nacos注册的触发时机。

@SpringBootApplication
@EnableDiscoveryClient
public class DiscoveryProviderApplication {

    public static void main(String[] args) {
        SpringApplication.run(DiscoveryProviderApplication.class, args);
    }

}

?首先进入的是,SpringBoot的启动流程,在SpringBoot启动过程中,会进行spring的IOC容器创建过程,核心方法是refresh(),在refresh()方法的最后一步,有个finishRefresh()方法,负责完成容器刷新完成后的一些逻辑处理,其中包括this.getLifecycleProcessor().onRefresh();生命周期处理器的onRefresh()回调方法。

protected void finishRefresh() {
    this.clearResourceCaches();
    this.initLifecycleProcessor();
    // 生命周期处理器的回调处理
    this.getLifecycleProcessor().onRefresh();
    this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this)));
    if (!IN_NATIVE_IMAGE) {
        LiveBeansView.registerApplicationContext(this);
    }

}

这里存在一个WebServerStartStopLifecycle的生命周期处理器,有个start()方法:?

public void start() {
    this.webServer.start();
    this.running = true;
    this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
}

我们可以看到,WebServerStartStopLifecycle类发布了一个事件:ServletWebServerInitializedEvent,看名字的话,应该是Web容器初始化完成的事件。

到这一步的调用栈如下:

start:45, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context)
	doStart:178, DefaultLifecycleProcessor (org.springframework.context.support)
		access$200:54, DefaultLifecycleProcessor (org.springframework.context.support)
			start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
				accept:-1, 1423639915 (org.springframework.context.support.DefaultLifecycleProcessor$$Lambda$537)
					forEach:75, Iterable (java.lang)
						startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support)
							onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support)
								finishRefresh:940, AbstractApplicationContext (org.springframework.context.support)
                                    refresh:591, AbstractApplicationContext (org.springframework.context.support)
                                    	refresh:144, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)
                                    		refresh:767, SpringApplication (org.springframework.boot)
                                    			refresh:759, SpringApplication (org.springframework.boot)
                                    				refreshContext:426, SpringApplication (org.springframework.boot)
                                    					run:326, SpringApplication (org.springframework.boot)
                                                            run:1311, SpringApplication (org.springframework.boot)
                                                            	run:1300, SpringApplication (org.springframework.boot)
                                                            		main:12, DiscoveryProviderApplication (com.example.discoveryprovider)

基于Spring的事件发布机制,肯定会有某个监听器对这个事件ServletWebServerInitializedEvent感兴趣,通过源码,我们发现org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration这个类监听了这种事件,所以一旦发布了事件,AbstractAutoServiceRegistration就能马上感知到,在AbstractAutoServiceRegistration#onApplicationEvent()方法中进行相应的处理。

// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {
    this.bind(event);
}

public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
        this.port.compareAndSet(0, event.getWebServer().getPort());
        // 调用了 start 方法
        this.start();
    }
}

public void start() {
    // 判断是否已开启注册
    if (!this.isEnabled()) {
        if (logger.isDebugEnabled()) {
            logger.debug("Discovery Lifecycle disabled. Not starting");
        }

    } else {
        if (!this.running.get()) {
            this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
            // 调用 register() 方法
            this.register();
            if (this.shouldRegisterManagement()) {
                this.registerManagement();
            }

            this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
            this.running.compareAndSet(false, true);
        }

    }
}

核心逻辑就在this.register()方法,AbstractAutoServiceRegistration存在一个子类NacosAutoServiceRegistration。查看NacosAutoServiceRegistration的类图:

查看整个继承关系,可以看到 AbstractAutoServiceRegistration 实现了 ApplicationListener 接口,这样印证了我们前面说的它对这个事件ServletWebServerInitializedEvent感兴趣。

NacosAutoServiceRegistration重写了register()方法,实现了服务向 Nacos 发起注册的功能,实际还是调用的父类的注册方法:

protected void register() {
    if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
        log.debug("Registration disabled.");
        return;
    }
    if (this.registration.getPort() < 0) {
        this.registration.setPort(getPort().get());
    }
    super.register();
}
// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
protected void register() {
    this.serviceRegistry.register(this.getRegistration());
}

父类AbstractAutoServiceRegistration中的这个serviceRegistry是NacosServiceRegistry的一个实例,它是在com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration中自动注入的,这个跟SpringBoot项目的自动配置原理相关。

在搭建Nacos源码阅读环境的时候,我们引入了下面的依赖:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    <version>2.2.9.RELEASE</version>
</dependency>

在SpringBoot项目运行时,SpringFactoriesLoader 类会去寻找META-INF/spring.factories文件,spring.factories用键值对的方式记录了所有需要加入容器的类,key为:org.springframework.boot.autoconfigure.EnableAutoConfiguration,value为各种xxxAutoConfiguration。

如下图:

我们发现了一个NacosServiceRegistryAutoConfiguration跟Nacos自动注册服务相关的配置类。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

    // Nacos自动注册的核心类
	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

}

通过查看NacosServiceRegistryAutoConfiguration自动配置类的源码,我们发现,往Spring注入了三个重要的类:

  • NacosServiceRegistry:完成服务注册功能,实现ServiceRegistry接口;
  • NacosRegistration:注册时用来存储nacos服务端的相关信息;
  • NacosAutoServiceRegistration:自动注册功能;

到这里,我们总算是找到了Nacos客户端自动注册是何时触发的。总结一下:

  • 1、项目pom.xml中引入了spring-cloud-starter-alibaba-nacos-discovery的依赖;
  • 2、spring-cloud-starter-alibaba-nacos-discovery-2.2.9.RELEASE.jar类路径下的META-INF/spring.factories中,指定了Nacos服务注册的自动配置类:NacosServiceRegistryAutoConfiguration
  • 3、NacosServiceRegistryAutoConfiguration往Spring注入了三个重要的bean:NacosServiceRegistry、NacosRegistration、NacosAutoServiceRegistration;
  • 4、NacosAutoServiceRegistration继承自AbstractAutoServiceRegistration类,并且实现了ApplicationListener<WebServerInitializedEvent>监听器接口;
  • 5、Spring IOC容器启动的时候,会回调AbstractAutoServiceRegistration#onApplicationEvent()方法,会执行register()方法;
  • 6、NacosAutoServiceRegistration重写了register()方法,最终会调用ServiceRegistry#register()开始自动注册流程,在NacosServiceRegistryAutoConfiguration自动配置类中,注入了NacosServiceRegistry类,所以会执行NacosServiceRegistry#register自动注册;

接下来,真正开始处理Nacos的服务注册功能,也就是客户端向Nacos服务端发起服务注册请求,核心代码就是com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register。

三、客户端发起服务注册请求

通过前面的分析,我们已经知道了何时触发Nacos服务注册的,接下来我们继续分析客户端向Nacos服务端提交注册请求的的核心处理方法

com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register:

//com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
public void register(Registration registration) {
	// 判断注册实例信息中的service ID是否为空,如果为空,说明不是有效的服务实例,不允许注册,直接返回
    if (StringUtils.isEmpty(registration.getServiceId())) {
        log.warn("No service to register for nacos client...");
        return;
    }
	// 创建NamingService名称空间,如果存在,则直接返回,否则通过反射创建一个NacosNamingService
    NamingService namingService = namingService();
	// 获取service ID,在本例中就是discovery-provider
    String serviceId = registration.getServiceId();
	// 获取名称空间的组名称,在本例中就是DEFAULT_GROUP
    String group = nacosDiscoveryProperties.getGroup();
	// 构建Instance服务实例
    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        // 注册服务实例
        namingService.registerInstance(serviceId, group, instance);
        // 服务实例注册成功后,打印日志,从springboot启动成功后的控制台可以看到
        log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                instance.getIp(), instance.getPort());
    }
    catch (Exception e) {
        if (nacosDiscoveryProperties.isFailFast()) {
            log.error("nacos registry, {} register failed...{},", serviceId,
                    registration.toString(), e);
            rethrowRuntimeException(e);
        }
        else {
            log.warn("Failfast is false. {} register failed...{},", serviceId,
                    registration.toString(), e);
        }
    }
}

3.1、配置文件解析

这一步其实就是SpringBoot提供的配置文件映射到实体类的功能,通过@ConfigurationProperties("spring.cloud.nacos.discovery")将配置文件中以"spring.cloud.nacos.discovery"开头的属性映射到NacosDiscoveryProperties实体的同名字的属性中。

@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {
	/**
	 * Prefix of {@link NacosDiscoveryProperties}.
	 */
	public static final String PREFIX = "spring.cloud.nacos.discovery";

	private static final Pattern PATTERN = Pattern.compile("-(\\w)");

	/**
	 * nacos discovery server address.
	 */
	private String serverAddr;

	/**
	 * the nacos authentication username.
	 */
	private String username;

	/**
	 * the nacos authentication password.
	 */
	private String password;

	/**
	 * the domain name of a service, through which the server address can be dynamically
	 * obtained.
	 */
	private String endpoint;

	/**
	 * namespace, separation registry of different environments.
	 */
	private String namespace;

	/**
	 * watch delay,duration to pull new service from nacos server.
	 */
	private long watchDelay = 30000;

	/**
	 * nacos naming log file name.
	 */
	private String logName;

	/**
	 * service name to registry.
	 */
	@Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
	private String service;

	/**
	 * weight for service instance, the larger the value, the larger the weight.
	 */
	private float weight = 1;

	/**
	 * cluster name for nacos .
	 */
	private String clusterName = "DEFAULT";

	/**
	 * group name for nacos.
	 */
	private String group = "DEFAULT_GROUP";

	/**
	 * naming load from local cache at application start. true is load.
	 */
	private String namingLoadCacheAtStart = "false";

	/**
	 * extra metadata to register.
	 */
	private Map<String, String> metadata = new HashMap<>();

	/**
	 * if you just want to subscribe, but don't want to register your service, set it to
	 * false.
	 */
	private boolean registerEnabled = true;

	/**
	 * The ip address your want to register for your service instance, needn't to set it
	 * if the auto detect ip works well.
	 */
	private String ip;

	/**
	 * which network interface's ip you want to register.
	 */
	private String networkInterface = "";

	/**
	 * The port your want to register for your service instance, needn't to set it if the
	 * auto detect port works well.
	 */
	private int port = -1;

	/**
	 * whether your service is a https service.
	 */
	private boolean secure = false;

	/**
	 * access key for namespace.
	 */
	private String accessKey;

	/**
	 * secret key for namespace.
	 */
	private String secretKey;

	/**
	 * Heart beat interval. Time unit: millisecond.
	 */
	private Integer heartBeatInterval;

	/**
	 * Heart beat timeout. Time unit: millisecond.
	 */
	private Integer heartBeatTimeout;

	/**
	 * Ip delete timeout. Time unit: millisecond.
	 */
	private Integer ipDeleteTimeout;

	/**
	 * If instance is enabled to accept request. The default value is true.
	 */
	private boolean instanceEnabled = true;

	/**
	 * If instance is ephemeral.The default value is true.
	 */
	private boolean ephemeral = true;

    // 省略部分代码

}

这样在项目启动后,配置文件的内容就被封装到nacosDiscoveryProperties实体中,我们只需要在类中注入,就可以直接使用了。

3.2、创建NamingService

如果存在,则直接返回,否则通过反射创建一个NacosNamingService。NamingService接口是Nacos命名服务对外提供的一个统一接口,看对应的源码就可以发现,它提供了大量实例相关的接口方法,比如:

  • 服务实例注册;
  • 服务实例注销;
  • 获取服务实例列表;
  • 获取服务单个实例;
  • 订阅服务事件;
  • 取消订阅服务事件;
  • 获取所有(或指定)服务名称;
  • 获取所有订阅的服务;
  • 获取Nacos服务的状态;
  • 主动关闭服务;

其中部分功能提供了大量的重载方法,应用于不同场景和不同类型实例或服务的筛选。这个就不逐一说明,按照需要或注释进行使用即可。

NamingService的实例化是通过NacosFactory.createNamingService(properties);实现的,内部源码是通过反射来实现实例化过程。

private NamingService namingService() {
    return nacosServiceManager.getNamingService();
}

// com.alibaba.cloud.nacos.NacosServiceManager#getNamingService()
public NamingService getNamingService(Properties properties) {
    if (Objects.isNull(this.namingService)) {
        buildNamingService(properties);
    }
    return namingService;
}

private NamingService buildNamingService(Properties properties) {
    if (Objects.isNull(namingService)) {
        synchronized (NacosServiceManager.class) {
            if (Objects.isNull(namingService)) {
                namingService = createNewNamingService(properties);
            }
        }
    }
    return namingService;
}

public static NamingService createNamingService(Properties properties) throws NacosException {
    return NamingFactory.createNamingService(properties);
}

// com.alibaba.nacos.api.naming.NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
    try {
        Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
        Constructor constructor = driverImplClass.getConstructor(Properties.class);
        return (NamingService) constructor.newInstance(properties);
    } catch (Throwable e) {
        throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
    }
}

?3.3、获取Instance实例

private Instance getNacosInstanceFromRegistration(Registration registration) {
    Instance instance = new Instance();	
	// 设置IP地址
    instance.setIp(registration.getHost());
	// 设置服务端口号
    instance.setPort(registration.getPort());
	// 设置服务权重
    instance.setWeight(nacosDiscoveryProperties.getWeight());
	// 设置集群名称
    instance.setClusterName(nacosDiscoveryProperties.getClusterName());
	// 是否开启
    instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
	// 设置元数据
    instance.setMetadata(registration.getMetadata());
	// 设置是否临时实例
    instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
    return instance;
}

构建后的Instance的样子:

3.4、注册服务实例

通过NacosNamingService#registerInstance方法进行实例注册

// com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查instance相关属性的合法性
    NamingUtils.checkInstanceIsLegal(instance);

    // 通过客户端代理去注册服务
    // 在构造方法中调用了init()初始化方法,创建了一个clientProxy代理类对象,具体的实现是NamingClientProxyDelegate
    // this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
    clientProxy.registerService(serviceName, groupName, instance);
}

?这个方法实现了两个功能:第一,检查心跳时间设置的对不对,配置的超时时间必须要大于心跳间隔时间。第二,通过NamingClientProxy这个代理来执行服务注册操作。

通过源码,我们了解到clientProxy代理接口是通过NamingClientProxyDelegate来完成,我们可以在init构造方法中得出,具体的实例对象:

// com.alibaba.nacos.client.naming.NacosNamingService#init
private void init(Properties properties) throws NacosException {
    ValidatorUtils.checkInitParam(properties);
    this.namespace = InitUtils.initNamespaceForNaming(properties);
    InitUtils.initSerialization();
    InitUtils.initWebRootContext(properties);
    initLogName(properties);
    
    this.changeNotifier = new InstancesChangeNotifier();
    NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
    NotifyCenter.registerSubscriber(changeNotifier);
    this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
    
    // 创建代理类,使用NamingClientProxyDelegate来完成
    this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}

为啥叫代理的委托类呢?

原来这个类并不是真正的代理类,真正的代理类是grpcClientProxy和httpClientProxy,这个类仅仅是做了一个委托功能,将处理的方法委托给了这两个代理类去处理。所以这个类叫代理委托类。

所以我们查看NamingClientProxyDelegate#registerService方法:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 根据是否是临时实例,选择不同的代理类去注册   grpcClientProxy or  httpClientProxy
    getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}

// 判断当前实例是否为临时实例后,来选择对应的客户端代理来进行请求, nacos 2.0改动中将http的请求换成了gRpc了
// 如果当前实例是临时实例,则采用gRPC协议(NamingGrpcClientProxy)进行请求,否则采用Http协议(NamingHttpClientProxy),默认为临时实例,在2.0版本中默认采用gRPC协议进行与Nacos服务进行交互
// 在本例中,instance.isEphemeral()为true,是临时实例,所以使用grpc协议进行通信(grpcClientProxy)
private NamingClientProxy getExecuteClientProxy(Instance instance) {
    // 如果是临时实例,使用grpc方式,否则使用http方式
    // ephemeral默认为true,也就是返回grpcClientProxy,Nacos 2.0版本将http的请求换成了gRpc了

    // 临时instance:gRPC长连接
    // 持久instance:http短连接
    return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}

继续查看NamingGrpcClientProxy#registerService方法:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
    // 将实例信息缓存到一个map中,后面需要使用
    // ConcurrentMap<String, InstanceRedoData> registeredInstances
    redoService.cacheInstanceForRedo(serviceName, groupName, instance);

    // 注册服务
    doRegisterService(serviceName, groupName, instance);
}

在NamingGrpcClientProxy中做了两件事,一件事是缓存了当前注册的实例重做信息。缓存的数据结构为ConcurrentMap<String, Instance>,key为“serviceName@@groupName”,value就是前面封装的实例重做信息。另外一件事就是封装了参数,基于gRPC协议进行服务的调用和结果的处理。

3.4.1、缓存实例

// com.alibaba.nacos.client.naming.remote.gprc.redo.NamingGrpcRedoService#cacheInstanceForRedo
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
    String key = NamingUtils.getGroupedName(serviceName, groupName);
    // 缓存重做数据,定时使用redoData重新注册,代码在RedoScheduledTask(定时调用),最终调用的也是NamingGrpcClientProxy#doRegisterService
    InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
    synchronized (registeredInstances) {
        // 缓存重做数据
        // ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
        // key: groupName@@serviceName
        registeredInstances.put(key, redoData);
    }
}

缓存完成后,registeredInstances的样子:

3.4.2、gRPC注册实例

// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // type=registerInstance

    // 构建一个实例请求对象
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    // 通过grpc方法进行远程服务调用,请求Nacos服务端注册
    requestToServer(request, Response.class);
    // 标识重做数据(redoData)已注册
    redoService.instanceRegistered(serviceName, groupName);
}

核心逻辑在requestToServer():?

private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
        throws NacosException {
    try {
        // 设置header
        request.putAllHeader(
                getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        // 发起rpc请求  本例中requestTimeout=-1
        Response response =
                requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                response.getClass().getName(), responseClass.getName());
    } catch (Exception e) {
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
    throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}

// com.alibaba.nacos.common.remote.client.RpcClient#request(com.alibaba.nacos.api.remote.request.Request)
public Response request(Request request) throws NacosException {
    // 获取超时时间,如果没有配置,默认3s超时
    return request(request, DEFAULT_TIMEOUT_MILLS);
}

我们查看request()方法:

public Response request(Request request, long timeoutMills) throws NacosException {
    int retryTimes = 0;
    Response response;
    Exception exceptionThrow = null;
    long start = System.currentTimeMillis();
    // 最多重试三次
    while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
        boolean waitReconnect = false;
        try {
            if (this.currentConnection == null || !isRunning()) {
                waitReconnect = true;
                throw new NacosException(NacosException.CLIENT_DISCONNECT,
                        "Client not connected, current status:" + rpcClientStatus.get());
            }
            // 拿到连接去请求Nacos服务端完成注册
            response = this.currentConnection.request(request, timeoutMills);
            if (response == null) {
                throw new NacosException(SERVER_ERROR, "Unknown Exception.");
            }
            if (response instanceof ErrorResponse) {
                if (response.getErrorCode() == NacosException.UN_REGISTER) {
                    synchronized (this) {
                        waitReconnect = true;
                        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                            LoggerUtils.printIfErrorEnabled(LOGGER,
                                    "Connection is unregistered, switch server, connectionId = {}, request = {}",
                                    currentConnection.getConnectionId(), request.getClass().getSimpleName());
                            switchServerAsync();
                        }
                    }
                    
                }
                throw new NacosException(response.getErrorCode(), response.getMessage());
            }
            // return response.
            lastActiveTimeStamp = System.currentTimeMillis();
            return response;
            
        } catch (Exception e) {
            if (waitReconnect) {
                try {
                    // wait client to reconnect.
                    Thread.sleep(Math.min(100, timeoutMills / 3));
                } catch (Exception exception) {
                    // Do nothing.
                }
            }
            
            LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
                    request, retryTimes, e.getMessage());
            
            exceptionThrow = e;
            
        }
        // 重试次数+1
        retryTimes++;
        
    }
    
    if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
        switchServerAsyncOnRequestFail();
    }
    
    if (exceptionThrow != null) {
        throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                : new NacosException(SERVER_ERROR, exceptionThrow);
    } else {
        throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
    }
}

到这里,客户端通过grpc给Nacos服务端发送了一个服务注册请求,接下来就看服务端如何处理了。

四、Nacos服务端处理服务注册请求

通过前面的分析,我们知道了客户端通过调用RpcClient#request()方法完成向Nacos服务端提交注册服务的请求,通过Grpc远程调用,现在客户端发起调用,那么Nacos服务端肯定也得有个Grpc的服务端来处理这个注册服务的请求。

4.1、Grpc处理类

实际上,这个Grpc的处理类就是com.alibaba.nacos.core.remote.BaseRpcServer,它是Grpc服务端的一个抽象类,其有一个PostConstruct,表示在构造方法后执行:

// com.alibaba.nacos.core.remote.BaseRpcServer#start
@PostConstruct
public void start() throws Exception {
    String serverName = getClass().getSimpleName();
    String tlsConfig = JacksonUtils.toJson(rpcServerTlsConfig);
    Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(),
            tlsConfig);

    // 启动Grpc服务
    startServer();
    
    if (RpcServerSslContextRefresherHolder.getInstance() != null) {
        RpcServerSslContextRefresherHolder.getInstance().refresh(this);
    }
    
    Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(),
            tlsConfig);
    // JVM关闭时,关闭Grpc服务
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
        try {
            BaseRpcServer.this.stopServer();
            Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
        } catch (Exception e) {
            Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
        }
    }));
    
}

重点就在startServer()方法,它是一个抽象方法,具体由其子类进行实现,查看子类,发现是由BaseGrpcServer实现:

// com.alibaba.nacos.core.remote.grpc.BaseGrpcServer#startServer
public void startServer() throws Exception {
    final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
    // 注册服务
    addServices(handlerRegistry, new GrpcConnectionInterceptor());

    NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
    
    if (rpcServerTlsConfig.getEnableTls()) {
        builder.protocolNegotiator(
                new OptionalTlsProtocolNegotiator(getSslContextBuilder(), rpcServerTlsConfig.getCompatibility()));
        
    }
    
    // 创建Grpc的server端
    server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
            .compressorRegistry(CompressorRegistry.getDefaultInstance())
            .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
            .addTransportFilter(new AddressTransportFilter(connectionManager))
            .keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
            .keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
            .permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();
    
    // 启动Grpc的server端
    server.start();
}

?注册服务的过程中就能看到添加了Grpc的处理方式:

private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {

    // unary common call register.
    final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
    .setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(
        MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
                                                GrpcServerConstants.REQUEST_METHOD_NAME))
    .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
    .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

    // 处理类
    final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
        (request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));

    final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
        GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));

    // bi stream register.
    final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
        (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));

    final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
    .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(
        MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
                                                GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))
    .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
    .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();

    final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(
        GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));

}

重点关注grpcCommonRequestAcceptor.request(request, responseObserver)这个处理方法:

public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
    
    traceIfNecessary(grpcRequest, true);
    String type = grpcRequest.getMetadata().getType();
    
    //server is on starting.
    if (!ApplicationUtils.isStarted()) {
        Payload payloadResponse = GrpcUtils.convert(
                ErrorResponse.build(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        
        responseObserver.onCompleted();
        return;
    }
    
    // server check.
    if (ServerCheckRequest.class.getSimpleName().equals(type)) {
        Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get()));
        traceIfNecessary(serverCheckResponseP, false);
        responseObserver.onNext(serverCheckResponseP);
        responseObserver.onCompleted();
        return;
    }
    
    RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
    //no handler found.
    if (requestHandler == null) {
        Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    //check connection status.
    String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
    boolean requestValid = connectionManager.checkValid(connectionId);
    if (!requestValid) {
        Loggers.REMOTE_DIGEST
                .warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    Object parseObj = null;
    try {
        parseObj = GrpcUtils.parse(grpcRequest);
    } catch (Exception e) {
        Loggers.REMOTE_DIGEST
                .warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
        Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    if (parseObj == null) {
        Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive  ,parse request is null", connectionId);
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    if (!(parseObj instanceof Request)) {
        Loggers.REMOTE_DIGEST
                .warn("[{}] Invalid request receive  ,parsed payload is not a request,parseObj={}", connectionId,
                        parseObj);
        Payload payloadResponse = GrpcUtils
                .convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
        return;
    }
    
    Request request = (Request) parseObj;
    try {
        Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
        RequestMeta requestMeta = new RequestMeta();
        requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
        requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
        requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
        requestMeta.setLabels(connection.getMetaInfo().getLabels());
        connectionManager.refreshActiveTime(requestMeta.getConnectionId());

        // 真正处理请求
        Response response = requestHandler.handleRequest(request, requestMeta);

        
        Payload payloadResponse = GrpcUtils.convert(response);
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    } catch (Throwable e) {
        Loggers.REMOTE_DIGEST
                .error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
                        e);
        Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(e));
        traceIfNecessary(payloadResponse, false);
        responseObserver.onNext(payloadResponse);
        responseObserver.onCompleted();
    }
    
}

真正处理请求是在 Response response = requestHandler.handleRequest(request, requestMeta);方法:

// com.alibaba.nacos.core.remote.RequestHandler#handleRequest
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
    for (AbstractRequestFilter filter : requestFilters.filters) {
        try {
            Response filterResult = filter.filter(request, meta, this.getClass());
            if (filterResult != null && !filterResult.isSuccess()) {
                return filterResult;
            }
        } catch (Throwable throwable) {
            Loggers.REMOTE.error("filter error", throwable);
        }
        
    }
    // 调用处理方法
    return handle(request, meta);
}

// 抽象方法,由子类处理
public abstract S handle(T request, RequestMeta meta) throws NacosException;

handle()方法是一个抽象方法,由子类去实现。我们注意到handle()方法有一个T extends Request的请求参数,我们再次回到,客户端通过Grpc注册服务的时候,当时的request是什么类型的,就可以找到对应的处理器。

客户端提交注册服务请求:com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService:

// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 构建一个请求
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    // rpcClient去进行Rpc请求,没有抛出异常说明调用成功
    requestToServer(request, Response.class);
    // 实例注册成功,将缓存(上一步将实例缓存在registeredInstances中)中的注册状态标记为true
    redoService.instanceRegistered(serviceName, groupName);
}

通过这个InstanceRequest对象,我们就可以很快找到具体的处理类了:

?

4.2、处理Grpc注册实例请求

也就是com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#handle这个方法是真正处理Grpc注册实例请求的:

?

public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
    // 根据命名空间ID、组名、服务名、是否临时实例(默认为true)创建一个Service
    Service service = Service
            .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
    // 根据客户端请求类型进行不同的处理: 注册 or 下线
    switch (request.getType()) {
        // 处理注册实例请求
        case NamingRemoteConstants.REGISTER_INSTANCE:
            return registerInstance(service, request, meta);
        // 处理下线实例请求
        case NamingRemoteConstants.DE_REGISTER_INSTANCE:
            return deregisterInstance(service, request, meta);
        default:
            throw new NacosException(NacosException.INVALID_PARAM,
                    String.format("Unsupported request type %s", request.getType()));
    }
}

private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
        throws NacosException {
    // 注册实例
    // service: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
    // instance: Instance{instanceId='null', ip='172.110.0.134', port=1001, weight=1.0, healthy=true, enabled=true, ephemeral=true, clusterName='DEFAULT', serviceName='null', metadata={preserved.register.source=SPRING_CLOUD}}
    clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
    
    // 发布注册实例跟踪事件
    NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
            meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
            request.getInstance().getIp(), request.getInstance().getPort()));
    return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}

?完成注册具体是在EphemeralClientOperationServiceImpl#registerInstance:

public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
    // 实例活动状态检查  即实例需要满足:心跳超时时间 > 心跳间隔、IP删除超时时间 > 心跳间隔
    NamingUtils.checkInstanceIsLegal(instance);

    // ServiceManager.getInstance()使用饿汉式单例返回一个ServiceManager对象
    // getSingleton从缓存singletonRepository中获取一个单例Service, 已存在的时候直接从缓存获取(注意Service的equals和hasCode方法)
    Service singleton = ServiceManager.getInstance().getSingleton(service);

    // 判断获取到的service是否是临时实例,如果不是,则报错,因为当前的service(EphemeralClientOperationServiceImpl)就是处理临时实例的
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                String.format("Current service %s is persistent service, can't register ephemeral instance.",
                        singleton.getGroupedServiceName()));
    }

    // 根据客户端ID从客户端管理器中找到客户端信息,这个关系在连接建立的时候存储
    // clientId=172.110.0.138:1001#true
    Client client = clientManager.getClient(clientId);

    // 判断Client是否合法:
    // 1、client是否为空,为空代表客户端已经断开连接,非法
    // 2、client是否为临时的,如果非临时的连接,非法,直接返回
    if (!clientIsLegal(client, clientId)) {
        return;
    }

    // 获取实例发布信息,包含一些属性,如实例IP、实例ID、端口号等等
    InstancePublishInfo instanceInfo = getPublishInfo(instance);

    /**
     * 负责存储当前客户端服务注册表,也就是 service和instance的关系。
     * 1. 客户端将自己注册到了服务器端的ClientManager中;
     * 2. Client客户端内部有一个Map: ConcurrentHashMap<Service, InstancePublishInfo> publishers. 即发布者列表
     * 3. 将实例信息放入发布者列表中( key: Service 、 value: 实例发布信息)
     */

    // com.alibaba.nacos.naming.core.v2.client.AbstractClient.publishers
    // protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    client.addServiceInstance(singleton, instanceInfo);

    // 设置客户端最后更新时间为当前时间
    client.setLastUpdatedTime();
    client.recalculateRevision();
    // 发布客户端注册事件通知订阅者, Nacos使用了发布-订阅模式来处理. 简单理解就是,事件发布器发布相应的事件,然后对事件感兴趣的订阅者就会进行相应的处理,解耦
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    // 发布实例元数据事件通知订阅者
    NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

4.2.1、获取Service?

Service singleton = ServiceManager.getInstance().getSingleton(service)

// ServiceManager.getInstance()使用饿汉式单例返回一个ServiceManager对象
private static final ServiceManager INSTANCE = new ServiceManager();
public static ServiceManager getInstance() {
    return INSTANCE;
}
public Service getSingleton(Service service) {
    // 如果service在singletonRepository中找不到,则存入到singletonRepository中;否则返回已存在的Service
    // 怎么判断service是否已经存在,service重写了equal和hasCode方法,namespace+group+serviceName在服务端是一个单例Service
    singletonRepository.computeIfAbsent(service, key -> {
        NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
        return service;
    });

    // 然后从singletonRepository中拿出来
    // ConcurrentHashMap<Service, Service> singletonRepository
    Service result = singletonRepository.get(service);

    // namespaceSingletonMaps其实是按照NamespaceId来存放所有的Service
    // ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
}

singletonRepository和namespaceSingletonMaps都是ConcurrentHashMap,分别存放Service信息、同一个命令空间下的所有Service集合:

/**
 * 保存单例的服务,注意,singletonRepository仅保存服务这个维度
 * 因为Service里面是的属性是没有端口,ip等属性的,意味着这里的Service是指服务(例如:server.name = discovery-provider),
 * 就算有几个discovery-provider,这里的Service都是泛指的discovery-provider
 */
private final ConcurrentHashMap<Service, Service> singletonRepository;

/**
 * 命名空间下所有的服务集合
 * key: namespaceId
 * value: Set<Service>
 */
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;

执行完上面的逻辑后,两个map中的样子如下所示:

4.2.2、获取实例发布信息

将Instance实例信息封装成发布信息实体InstancePublishInfo。

default InstancePublishInfo getPublishInfo(Instance instance) {
    InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
    Map<String, Object> extendDatum = result.getExtendDatum();
    if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
        extendDatum.putAll(instance.getMetadata());
    }
    if (StringUtils.isNotEmpty(instance.getInstanceId())) {
        extendDatum.put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
    }
    if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
        extendDatum.put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
    }
    if (!instance.isEnabled()) {
        extendDatum.put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
    }
    String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
            : instance.getClusterName();
    result.setHealthy(instance.isHealthy());
    result.setCluster(clusterName);
    return result;
}

4.2.3、添加服务实例到发布列表中

这里提到的发布列表也是一个map:

protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
// com.alibaba.nacos.naming.core.v2.client.AbstractClient#addServiceInstance
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    //  protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    // 将实例信息添加到发布者列表publishers中
    if (null == publishers.put(service, instancePublishInfo)) {
        if (instancePublishInfo instanceof BatchInstancePublishInfo) {
            MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
        } else {
            MetricsMonitor.incrementInstanceCount();
        }
    }
    // 发布一个客户端注册事件通知订阅者
    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}

添加完实例后的发布列表publishers如下:

4.3、订阅者处理ClientRegisterServiceEvent事件

可以看到注册逻辑好像并没有处理什么内容,实际都是在发布事件后,接收事件的处理逻辑进行处理的,这里使用到了事件-发布模型,这里暂且先不讨论NotifyCenter,后面单独分析。

还记得注册实例时,在EphemeralClientOperationServiceImpl#registerInstance中发布了一个事件:

// 发布客户端注册事件通知订阅者
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));

通过ClientRegisterServiceEvent的引用路径,我们找到是下面这个类监听了ClientRegisterServiceEvent这种事件:

com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation

在ClientServiceIndexesManager类存在一个map,用于存放服务实例信息:?

/**
 * 服务与发布这个服务的客户端的关联关系
 */
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
    // 获取服务
    Service service = event.getService();
    // 获取客户端ID
    String clientId = event.getClientId();
    if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
        // 处理ClientRegisterServiceEvent事件(服务注册)
        addPublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
        // 处理ClientDeregisterServiceEvent事件(服务下线)
        removePublisherIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
        // 处理ClientSubscribeServiceEvent事件(订阅)
        addSubscriberIndexes(service, clientId);
    } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
        // 处理ClientUnsubscribeServiceEvent事件(取消订阅)
        removeSubscriberIndexes(service, clientId);
    }
}

服务注册处理后的发布者列表如下图所示:

五、总结

本篇文章中,我们从Nacos客户端自动注册在何时触发开始,然后分析了客户端向Nacos服务端提交服务注册请求,最后到Nacos服务端处理这个注册请求,整个流程大概梳理了一下,下一篇文章,我们将会详细分析NotifyCenter这个事件-发布机制的原理,然后继续分析服务端处理注册请求过程中涉及到的后续流程处理。

?

文章来源:https://blog.csdn.net/Weixiaohuai/article/details/135654876
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。