目录
4.3、订阅者处理ClientRegisterServiceEvent事件
在学习Nacos源码之前,有必要再次熟悉下其中一些重要的名词:
ConcurrentMap<Service, Set<String>>
)。ConcurrentMap<Service, Set<String>>
)。首先我们需要搭建好Nacos的源码阅读环境,大家可参考前面一篇文章去搭建。为了充分理解Nacos的整个服务注册过程,我们带着下面三个问题去阅读源码:
Nacos源码版本: 2.3.0
我们首先要了解服务自动注册到Nacos是怎么触发的,为什么我们直接运行一下SpringBoot启动类的main方法之后,就可以自动完成服务注册到Nacos这一系列工作。
以下面的启动类为例,首先来分析一下Nacos注册的触发时机。
@SpringBootApplication
@EnableDiscoveryClient
public class DiscoveryProviderApplication {
public static void main(String[] args) {
SpringApplication.run(DiscoveryProviderApplication.class, args);
}
}
?首先进入的是,SpringBoot的启动流程,在SpringBoot启动过程中,会进行spring的IOC容器创建过程,核心方法是refresh(),在refresh()方法的最后一步,有个finishRefresh()方法,负责完成容器刷新完成后的一些逻辑处理,其中包括this.getLifecycleProcessor().onRefresh();生命周期处理器的onRefresh()回调方法。
protected void finishRefresh() {
this.clearResourceCaches();
this.initLifecycleProcessor();
// 生命周期处理器的回调处理
this.getLifecycleProcessor().onRefresh();
this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this)));
if (!IN_NATIVE_IMAGE) {
LiveBeansView.registerApplicationContext(this);
}
}
这里存在一个WebServerStartStopLifecycle的生命周期处理器,有个start()方法:?
public void start() {
this.webServer.start();
this.running = true;
this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext));
}
我们可以看到,WebServerStartStopLifecycle类发布了一个事件:ServletWebServerInitializedEvent,看名字的话,应该是Web容器初始化完成的事件。
到这一步的调用栈如下:
start:45, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context)
doStart:178, DefaultLifecycleProcessor (org.springframework.context.support)
access$200:54, DefaultLifecycleProcessor (org.springframework.context.support)
start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
accept:-1, 1423639915 (org.springframework.context.support.DefaultLifecycleProcessor$$Lambda$537)
forEach:75, Iterable (java.lang)
startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support)
onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support)
finishRefresh:940, AbstractApplicationContext (org.springframework.context.support)
refresh:591, AbstractApplicationContext (org.springframework.context.support)
refresh:144, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)
refresh:767, SpringApplication (org.springframework.boot)
refresh:759, SpringApplication (org.springframework.boot)
refreshContext:426, SpringApplication (org.springframework.boot)
run:326, SpringApplication (org.springframework.boot)
run:1311, SpringApplication (org.springframework.boot)
run:1300, SpringApplication (org.springframework.boot)
main:12, DiscoveryProviderApplication (com.example.discoveryprovider)
基于Spring的事件发布机制,肯定会有某个监听器对这个事件ServletWebServerInitializedEvent感兴趣,通过源码,我们发现org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration这个类监听了这种事件,所以一旦发布了事件,AbstractAutoServiceRegistration就能马上感知到,在AbstractAutoServiceRegistration#onApplicationEvent()方法中进行相应的处理。
// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
this.port.compareAndSet(0, event.getWebServer().getPort());
// 调用了 start 方法
this.start();
}
}
public void start() {
// 判断是否已开启注册
if (!this.isEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("Discovery Lifecycle disabled. Not starting");
}
} else {
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
// 调用 register() 方法
this.register();
if (this.shouldRegisterManagement()) {
this.registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
核心逻辑就在this.register()方法,AbstractAutoServiceRegistration存在一个子类NacosAutoServiceRegistration。查看NacosAutoServiceRegistration的类图:
查看整个继承关系,可以看到 AbstractAutoServiceRegistration 实现了 ApplicationListener 接口,这样印证了我们前面说的它对这个事件ServletWebServerInitializedEvent感兴趣。
NacosAutoServiceRegistration重写了register()方法,实现了服务向 Nacos 发起注册的功能,实际还是调用的父类的注册方法:
protected void register() {
if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
log.debug("Registration disabled.");
return;
}
if (this.registration.getPort() < 0) {
this.registration.setPort(getPort().get());
}
super.register();
}
// org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#register
protected void register() {
this.serviceRegistry.register(this.getRegistration());
}
父类AbstractAutoServiceRegistration中的这个serviceRegistry是NacosServiceRegistry的一个实例,它是在com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration中自动注入的,这个跟SpringBoot项目的自动配置原理相关。
在搭建Nacos源码阅读环境的时候,我们引入了下面的依赖:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2.2.9.RELEASE</version>
</dependency>
在SpringBoot项目运行时,SpringFactoriesLoader 类会去寻找META-INF/spring.factories文件,spring.factories用键值对的方式记录了所有需要加入容器的类,key为:org.springframework.boot.autoconfigure.EnableAutoConfiguration,value为各种xxxAutoConfiguration。
如下图:
我们发现了一个NacosServiceRegistryAutoConfiguration跟Nacos自动注册服务相关的配置类。
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
AutoServiceRegistrationAutoConfiguration.class,
NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosServiceManager nacosServiceManager,
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(registrationCustomizers.getIfAvailable(),
nacosDiscoveryProperties, context);
}
// Nacos自动注册的核心类
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
return new NacosAutoServiceRegistration(registry,
autoServiceRegistrationProperties, registration);
}
}
通过查看NacosServiceRegistryAutoConfiguration自动配置类的源码,我们发现,往Spring注入了三个重要的类:
到这里,我们总算是找到了Nacos客户端自动注册是何时触发的。总结一下:
接下来,真正开始处理Nacos的服务注册功能,也就是客户端向Nacos服务端发起服务注册请求,核心代码就是com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register。
通过前面的分析,我们已经知道了何时触发Nacos服务注册的,接下来我们继续分析客户端向Nacos服务端提交注册请求的的核心处理方法
com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register:
//com.alibaba.cloud.nacos.registry.NacosServiceRegistry#register
public void register(Registration registration) {
// 判断注册实例信息中的service ID是否为空,如果为空,说明不是有效的服务实例,不允许注册,直接返回
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
// 创建NamingService名称空间,如果存在,则直接返回,否则通过反射创建一个NacosNamingService
NamingService namingService = namingService();
// 获取service ID,在本例中就是discovery-provider
String serviceId = registration.getServiceId();
// 获取名称空间的组名称,在本例中就是DEFAULT_GROUP
String group = nacosDiscoveryProperties.getGroup();
// 构建Instance服务实例
Instance instance = getNacosInstanceFromRegistration(registration);
try {
// 注册服务实例
namingService.registerInstance(serviceId, group, instance);
// 服务实例注册成功后,打印日志,从springboot启动成功后的控制台可以看到
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
if (nacosDiscoveryProperties.isFailFast()) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
rethrowRuntimeException(e);
}
else {
log.warn("Failfast is false. {} register failed...{},", serviceId,
registration.toString(), e);
}
}
}
这一步其实就是SpringBoot提供的配置文件映射到实体类的功能,通过@ConfigurationProperties("spring.cloud.nacos.discovery")将配置文件中以"spring.cloud.nacos.discovery"开头的属性映射到NacosDiscoveryProperties实体的同名字的属性中。
@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties {
/**
* Prefix of {@link NacosDiscoveryProperties}.
*/
public static final String PREFIX = "spring.cloud.nacos.discovery";
private static final Pattern PATTERN = Pattern.compile("-(\\w)");
/**
* nacos discovery server address.
*/
private String serverAddr;
/**
* the nacos authentication username.
*/
private String username;
/**
* the nacos authentication password.
*/
private String password;
/**
* the domain name of a service, through which the server address can be dynamically
* obtained.
*/
private String endpoint;
/**
* namespace, separation registry of different environments.
*/
private String namespace;
/**
* watch delay,duration to pull new service from nacos server.
*/
private long watchDelay = 30000;
/**
* nacos naming log file name.
*/
private String logName;
/**
* service name to registry.
*/
@Value("${spring.cloud.nacos.discovery.service:${spring.application.name:}}")
private String service;
/**
* weight for service instance, the larger the value, the larger the weight.
*/
private float weight = 1;
/**
* cluster name for nacos .
*/
private String clusterName = "DEFAULT";
/**
* group name for nacos.
*/
private String group = "DEFAULT_GROUP";
/**
* naming load from local cache at application start. true is load.
*/
private String namingLoadCacheAtStart = "false";
/**
* extra metadata to register.
*/
private Map<String, String> metadata = new HashMap<>();
/**
* if you just want to subscribe, but don't want to register your service, set it to
* false.
*/
private boolean registerEnabled = true;
/**
* The ip address your want to register for your service instance, needn't to set it
* if the auto detect ip works well.
*/
private String ip;
/**
* which network interface's ip you want to register.
*/
private String networkInterface = "";
/**
* The port your want to register for your service instance, needn't to set it if the
* auto detect port works well.
*/
private int port = -1;
/**
* whether your service is a https service.
*/
private boolean secure = false;
/**
* access key for namespace.
*/
private String accessKey;
/**
* secret key for namespace.
*/
private String secretKey;
/**
* Heart beat interval. Time unit: millisecond.
*/
private Integer heartBeatInterval;
/**
* Heart beat timeout. Time unit: millisecond.
*/
private Integer heartBeatTimeout;
/**
* Ip delete timeout. Time unit: millisecond.
*/
private Integer ipDeleteTimeout;
/**
* If instance is enabled to accept request. The default value is true.
*/
private boolean instanceEnabled = true;
/**
* If instance is ephemeral.The default value is true.
*/
private boolean ephemeral = true;
// 省略部分代码
}
这样在项目启动后,配置文件的内容就被封装到nacosDiscoveryProperties实体中,我们只需要在类中注入,就可以直接使用了。
如果存在,则直接返回,否则通过反射创建一个NacosNamingService。NamingService接口是Nacos命名服务对外提供的一个统一接口,看对应的源码就可以发现,它提供了大量实例相关的接口方法,比如:
其中部分功能提供了大量的重载方法,应用于不同场景和不同类型实例或服务的筛选。这个就不逐一说明,按照需要或注释进行使用即可。
NamingService的实例化是通过NacosFactory.createNamingService(properties);实现的,内部源码是通过反射来实现实例化过程。
private NamingService namingService() {
return nacosServiceManager.getNamingService();
}
// com.alibaba.cloud.nacos.NacosServiceManager#getNamingService()
public NamingService getNamingService(Properties properties) {
if (Objects.isNull(this.namingService)) {
buildNamingService(properties);
}
return namingService;
}
private NamingService buildNamingService(Properties properties) {
if (Objects.isNull(namingService)) {
synchronized (NacosServiceManager.class) {
if (Objects.isNull(namingService)) {
namingService = createNewNamingService(properties);
}
}
}
return namingService;
}
public static NamingService createNamingService(Properties properties) throws NacosException {
return NamingFactory.createNamingService(properties);
}
// com.alibaba.nacos.api.naming.NamingFactory#createNamingService(java.util.Properties)
public static NamingService createNamingService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
return (NamingService) constructor.newInstance(properties);
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
private Instance getNacosInstanceFromRegistration(Registration registration) {
Instance instance = new Instance();
// 设置IP地址
instance.setIp(registration.getHost());
// 设置服务端口号
instance.setPort(registration.getPort());
// 设置服务权重
instance.setWeight(nacosDiscoveryProperties.getWeight());
// 设置集群名称
instance.setClusterName(nacosDiscoveryProperties.getClusterName());
// 是否开启
instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
// 设置元数据
instance.setMetadata(registration.getMetadata());
// 设置是否临时实例
instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
return instance;
}
构建后的Instance的样子:
通过NacosNamingService#registerInstance方法进行实例注册
// com.alibaba.nacos.client.naming.NacosNamingService#registerInstance(java.lang.String, java.lang.String, com.alibaba.nacos.api.naming.pojo.Instance)
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
// 检查instance相关属性的合法性
NamingUtils.checkInstanceIsLegal(instance);
// 通过客户端代理去注册服务
// 在构造方法中调用了init()初始化方法,创建了一个clientProxy代理类对象,具体的实现是NamingClientProxyDelegate
// this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties, changeNotifier);
clientProxy.registerService(serviceName, groupName, instance);
}
?这个方法实现了两个功能:第一,检查心跳时间设置的对不对,配置的超时时间必须要大于心跳间隔时间。第二,通过NamingClientProxy这个代理来执行服务注册操作。
通过源码,我们了解到clientProxy代理接口是通过NamingClientProxyDelegate来完成,我们可以在init构造方法中得出,具体的实例对象:
// com.alibaba.nacos.client.naming.NacosNamingService#init
private void init(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
this.namespace = InitUtils.initNamespaceForNaming(properties);
InitUtils.initSerialization();
InitUtils.initWebRootContext(properties);
initLogName(properties);
this.changeNotifier = new InstancesChangeNotifier();
NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
NotifyCenter.registerSubscriber(changeNotifier);
this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
// 创建代理类,使用NamingClientProxyDelegate来完成
this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
}
为啥叫代理的委托类呢?
原来这个类并不是真正的代理类,真正的代理类是grpcClientProxy和httpClientProxy,这个类仅仅是做了一个委托功能,将处理的方法委托给了这两个代理类去处理。所以这个类叫代理委托类。
所以我们查看NamingClientProxyDelegate#registerService方法:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
// 根据是否是临时实例,选择不同的代理类去注册 grpcClientProxy or httpClientProxy
getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}
// 判断当前实例是否为临时实例后,来选择对应的客户端代理来进行请求, nacos 2.0改动中将http的请求换成了gRpc了
// 如果当前实例是临时实例,则采用gRPC协议(NamingGrpcClientProxy)进行请求,否则采用Http协议(NamingHttpClientProxy),默认为临时实例,在2.0版本中默认采用gRPC协议进行与Nacos服务进行交互
// 在本例中,instance.isEphemeral()为true,是临时实例,所以使用grpc协议进行通信(grpcClientProxy)
private NamingClientProxy getExecuteClientProxy(Instance instance) {
// 如果是临时实例,使用grpc方式,否则使用http方式
// ephemeral默认为true,也就是返回grpcClientProxy,Nacos 2.0版本将http的请求换成了gRpc了
// 临时instance:gRPC长连接
// 持久instance:http短连接
return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
}
继续查看NamingGrpcClientProxy#registerService方法:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
instance);
// 将实例信息缓存到一个map中,后面需要使用
// ConcurrentMap<String, InstanceRedoData> registeredInstances
redoService.cacheInstanceForRedo(serviceName, groupName, instance);
// 注册服务
doRegisterService(serviceName, groupName, instance);
}
在NamingGrpcClientProxy中做了两件事,一件事是缓存了当前注册的实例重做信息。缓存的数据结构为ConcurrentMap<String, Instance>,key为“serviceName@@groupName”,value就是前面封装的实例重做信息。另外一件事就是封装了参数,基于gRPC协议进行服务的调用和结果的处理。
// com.alibaba.nacos.client.naming.remote.gprc.redo.NamingGrpcRedoService#cacheInstanceForRedo
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
String key = NamingUtils.getGroupedName(serviceName, groupName);
// 缓存重做数据,定时使用redoData重新注册,代码在RedoScheduledTask(定时调用),最终调用的也是NamingGrpcClientProxy#doRegisterService
InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);
synchronized (registeredInstances) {
// 缓存重做数据
// ConcurrentMap<String, InstanceRedoData> registeredInstances = new ConcurrentHashMap<>();
// key: groupName@@serviceName
registeredInstances.put(key, redoData);
}
}
缓存完成后,registeredInstances的样子:
// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
// type=registerInstance
// 构建一个实例请求对象
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
// 通过grpc方法进行远程服务调用,请求Nacos服务端注册
requestToServer(request, Response.class);
// 标识重做数据(redoData)已注册
redoService.instanceRegistered(serviceName, groupName);
}
核心逻辑在requestToServer():?
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
throws NacosException {
try {
// 设置header
request.putAllHeader(
getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
// 发起rpc请求 本例中requestTimeout=-1
Response response =
requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
throw new NacosException(response.getErrorCode(), response.getMessage());
}
if (responseClass.isAssignableFrom(response.getClass())) {
return (T) response;
}
NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
response.getClass().getName(), responseClass.getName());
} catch (Exception e) {
throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
}
throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
// com.alibaba.nacos.common.remote.client.RpcClient#request(com.alibaba.nacos.api.remote.request.Request)
public Response request(Request request) throws NacosException {
// 获取超时时间,如果没有配置,默认3s超时
return request(request, DEFAULT_TIMEOUT_MILLS);
}
我们查看request()方法:
public Response request(Request request, long timeoutMills) throws NacosException {
int retryTimes = 0;
Response response;
Exception exceptionThrow = null;
long start = System.currentTimeMillis();
// 最多重试三次
while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
boolean waitReconnect = false;
try {
if (this.currentConnection == null || !isRunning()) {
waitReconnect = true;
throw new NacosException(NacosException.CLIENT_DISCONNECT,
"Client not connected, current status:" + rpcClientStatus.get());
}
// 拿到连接去请求Nacos服务端完成注册
response = this.currentConnection.request(request, timeoutMills);
if (response == null) {
throw new NacosException(SERVER_ERROR, "Unknown Exception.");
}
if (response instanceof ErrorResponse) {
if (response.getErrorCode() == NacosException.UN_REGISTER) {
synchronized (this) {
waitReconnect = true;
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
LoggerUtils.printIfErrorEnabled(LOGGER,
"Connection is unregistered, switch server, connectionId = {}, request = {}",
currentConnection.getConnectionId(), request.getClass().getSimpleName());
switchServerAsync();
}
}
}
throw new NacosException(response.getErrorCode(), response.getMessage());
}
// return response.
lastActiveTimeStamp = System.currentTimeMillis();
return response;
} catch (Exception e) {
if (waitReconnect) {
try {
// wait client to reconnect.
Thread.sleep(Math.min(100, timeoutMills / 3));
} catch (Exception exception) {
// Do nothing.
}
}
LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
exceptionThrow = e;
}
// 重试次数+1
retryTimes++;
}
if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
switchServerAsyncOnRequestFail();
}
if (exceptionThrow != null) {
throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
: new NacosException(SERVER_ERROR, exceptionThrow);
} else {
throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
}
}
到这里,客户端通过grpc给Nacos服务端发送了一个服务注册请求,接下来就看服务端如何处理了。
通过前面的分析,我们知道了客户端通过调用RpcClient#request()方法完成向Nacos服务端提交注册服务的请求,通过Grpc远程调用,现在客户端发起调用,那么Nacos服务端肯定也得有个Grpc的服务端来处理这个注册服务的请求。
实际上,这个Grpc的处理类就是com.alibaba.nacos.core.remote.BaseRpcServer,它是Grpc服务端的一个抽象类,其有一个PostConstruct,表示在构造方法后执行:
// com.alibaba.nacos.core.remote.BaseRpcServer#start
@PostConstruct
public void start() throws Exception {
String serverName = getClass().getSimpleName();
String tlsConfig = JacksonUtils.toJson(rpcServerTlsConfig);
Loggers.REMOTE.info("Nacos {} Rpc server starting at port {} and tls config:{}", serverName, getServicePort(),
tlsConfig);
// 启动Grpc服务
startServer();
if (RpcServerSslContextRefresherHolder.getInstance() != null) {
RpcServerSslContextRefresherHolder.getInstance().refresh(this);
}
Loggers.REMOTE.info("Nacos {} Rpc server started at port {} and tls config:{}", serverName, getServicePort(),
tlsConfig);
// JVM关闭时,关闭Grpc服务
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
try {
BaseRpcServer.this.stopServer();
Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
} catch (Exception e) {
Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
}
}));
}
重点就在startServer()方法,它是一个抽象方法,具体由其子类进行实现,查看子类,发现是由BaseGrpcServer实现:
// com.alibaba.nacos.core.remote.grpc.BaseGrpcServer#startServer
public void startServer() throws Exception {
final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
// 注册服务
addServices(handlerRegistry, new GrpcConnectionInterceptor());
NettyServerBuilder builder = NettyServerBuilder.forPort(getServicePort()).executor(getRpcExecutor());
if (rpcServerTlsConfig.getEnableTls()) {
builder.protocolNegotiator(
new OptionalTlsProtocolNegotiator(getSslContextBuilder(), rpcServerTlsConfig.getCompatibility()));
}
// 创建Grpc的server端
server = builder.maxInboundMessageSize(getMaxInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
.compressorRegistry(CompressorRegistry.getDefaultInstance())
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
.addTransportFilter(new AddressTransportFilter(connectionManager))
.keepAliveTime(getKeepAliveTime(), TimeUnit.MILLISECONDS)
.keepAliveTimeout(getKeepAliveTimeout(), TimeUnit.MILLISECONDS)
.permitKeepAliveTime(getPermitKeepAliveTime(), TimeUnit.MILLISECONDS).build();
// 启动Grpc的server端
server.start();
}
?注册服务的过程中就能看到添加了Grpc的处理方式:
private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
// unary common call register.
final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY).setFullMethodName(
MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_SERVICE_NAME,
GrpcServerConstants.REQUEST_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
// 处理类
final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls.asyncUnaryCall(
(request, responseObserver) -> grpcCommonRequestAcceptor.request(request, responseObserver));
final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(
GrpcServerConstants.REQUEST_SERVICE_NAME).addMethod(unaryPayloadMethod, payloadHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
// bi stream register.
final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
(responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
.setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(
MethodDescriptor.generateFullMethodName(GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME,
GrpcServerConstants.REQUEST_BI_STREAM_METHOD_NAME))
.setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
.setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition.builder(
GrpcServerConstants.REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
}
重点关注grpcCommonRequestAcceptor.request(request, responseObserver)这个处理方法:
public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
traceIfNecessary(grpcRequest, true);
String type = grpcRequest.getMetadata().getType();
//server is on starting.
if (!ApplicationUtils.isStarted()) {
Payload payloadResponse = GrpcUtils.convert(
ErrorResponse.build(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
// server check.
if (ServerCheckRequest.class.getSimpleName().equals(type)) {
Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get()));
traceIfNecessary(serverCheckResponseP, false);
responseObserver.onNext(serverCheckResponseP);
responseObserver.onCompleted();
return;
}
RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
//no handler found.
if (requestHandler == null) {
Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.NO_HANDLER, "RequestHandler Not Found"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
//check connection status.
String connectionId = GrpcServerConstants.CONTEXT_KEY_CONN_ID.get();
boolean requestValid = connectionManager.checkValid(connectionId);
if (!requestValid) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.UN_REGISTER, "Connection is unregistered."));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
Object parseObj = null;
try {
parseObj = GrpcUtils.parse(grpcRequest);
} catch (Exception e) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, e.getMessage()));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
if (parseObj == null) {
Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive ,parse request is null", connectionId);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
if (!(parseObj instanceof Request)) {
Loggers.REMOTE_DIGEST
.warn("[{}] Invalid request receive ,parsed payload is not a request,parseObj={}", connectionId,
parseObj);
Payload payloadResponse = GrpcUtils
.convert(ErrorResponse.build(NacosException.BAD_GATEWAY, "Invalid request"));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
return;
}
Request request = (Request) parseObj;
try {
Connection connection = connectionManager.getConnection(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
RequestMeta requestMeta = new RequestMeta();
requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
requestMeta.setConnectionId(GrpcServerConstants.CONTEXT_KEY_CONN_ID.get());
requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
requestMeta.setLabels(connection.getMetaInfo().getLabels());
connectionManager.refreshActiveTime(requestMeta.getConnectionId());
// 真正处理请求
Response response = requestHandler.handleRequest(request, requestMeta);
Payload payloadResponse = GrpcUtils.convert(response);
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
} catch (Throwable e) {
Loggers.REMOTE_DIGEST
.error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
e);
Payload payloadResponse = GrpcUtils.convert(ErrorResponse.build(e));
traceIfNecessary(payloadResponse, false);
responseObserver.onNext(payloadResponse);
responseObserver.onCompleted();
}
}
真正处理请求是在 Response response = requestHandler.handleRequest(request, requestMeta);方法:
// com.alibaba.nacos.core.remote.RequestHandler#handleRequest
public Response handleRequest(T request, RequestMeta meta) throws NacosException {
for (AbstractRequestFilter filter : requestFilters.filters) {
try {
Response filterResult = filter.filter(request, meta, this.getClass());
if (filterResult != null && !filterResult.isSuccess()) {
return filterResult;
}
} catch (Throwable throwable) {
Loggers.REMOTE.error("filter error", throwable);
}
}
// 调用处理方法
return handle(request, meta);
}
// 抽象方法,由子类处理
public abstract S handle(T request, RequestMeta meta) throws NacosException;
handle()方法是一个抽象方法,由子类去实现。我们注意到handle()方法有一个T extends Request的请求参数,我们再次回到,客户端通过Grpc注册服务的时候,当时的request是什么类型的,就可以找到对应的处理器。
客户端提交注册服务请求:com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService:
// com.alibaba.nacos.client.naming.remote.gprc.NamingGrpcClientProxy#doRegisterService
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
// 构建一个请求
InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
NamingRemoteConstants.REGISTER_INSTANCE, instance);
// rpcClient去进行Rpc请求,没有抛出异常说明调用成功
requestToServer(request, Response.class);
// 实例注册成功,将缓存(上一步将实例缓存在registeredInstances中)中的注册状态标记为true
redoService.instanceRegistered(serviceName, groupName);
}
通过这个InstanceRequest对象,我们就可以很快找到具体的处理类了:
?
也就是com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#handle这个方法是真正处理Grpc注册实例请求的:
?
public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
// 根据命名空间ID、组名、服务名、是否临时实例(默认为true)创建一个Service
Service service = Service
.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
// 根据客户端请求类型进行不同的处理: 注册 or 下线
switch (request.getType()) {
// 处理注册实例请求
case NamingRemoteConstants.REGISTER_INSTANCE:
return registerInstance(service, request, meta);
// 处理下线实例请求
case NamingRemoteConstants.DE_REGISTER_INSTANCE:
return deregisterInstance(service, request, meta);
default:
throw new NacosException(NacosException.INVALID_PARAM,
String.format("Unsupported request type %s", request.getType()));
}
}
private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
throws NacosException {
// 注册实例
// service: Service{namespace='public', group='DEFAULT_GROUP', name='discovery-provider', ephemeral=true, revision=0}
// instance: Instance{instanceId='null', ip='172.110.0.134', port=1001, weight=1.0, healthy=true, enabled=true, ephemeral=true, clusterName='DEFAULT', serviceName='null', metadata={preserved.register.source=SPRING_CLOUD}}
clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
// 发布注册实例跟踪事件
NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
request.getInstance().getIp(), request.getInstance().getPort()));
return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
}
?完成注册具体是在EphemeralClientOperationServiceImpl#registerInstance:
public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
// 实例活动状态检查 即实例需要满足:心跳超时时间 > 心跳间隔、IP删除超时时间 > 心跳间隔
NamingUtils.checkInstanceIsLegal(instance);
// ServiceManager.getInstance()使用饿汉式单例返回一个ServiceManager对象
// getSingleton从缓存singletonRepository中获取一个单例Service, 已存在的时候直接从缓存获取(注意Service的equals和hasCode方法)
Service singleton = ServiceManager.getInstance().getSingleton(service);
// 判断获取到的service是否是临时实例,如果不是,则报错,因为当前的service(EphemeralClientOperationServiceImpl)就是处理临时实例的
if (!singleton.isEphemeral()) {
throw new NacosRuntimeException(NacosException.INVALID_PARAM,
String.format("Current service %s is persistent service, can't register ephemeral instance.",
singleton.getGroupedServiceName()));
}
// 根据客户端ID从客户端管理器中找到客户端信息,这个关系在连接建立的时候存储
// clientId=172.110.0.138:1001#true
Client client = clientManager.getClient(clientId);
// 判断Client是否合法:
// 1、client是否为空,为空代表客户端已经断开连接,非法
// 2、client是否为临时的,如果非临时的连接,非法,直接返回
if (!clientIsLegal(client, clientId)) {
return;
}
// 获取实例发布信息,包含一些属性,如实例IP、实例ID、端口号等等
InstancePublishInfo instanceInfo = getPublishInfo(instance);
/**
* 负责存储当前客户端服务注册表,也就是 service和instance的关系。
* 1. 客户端将自己注册到了服务器端的ClientManager中;
* 2. Client客户端内部有一个Map: ConcurrentHashMap<Service, InstancePublishInfo> publishers. 即发布者列表
* 3. 将实例信息放入发布者列表中( key: Service 、 value: 实例发布信息)
*/
// com.alibaba.nacos.naming.core.v2.client.AbstractClient.publishers
// protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
client.addServiceInstance(singleton, instanceInfo);
// 设置客户端最后更新时间为当前时间
client.setLastUpdatedTime();
client.recalculateRevision();
// 发布客户端注册事件通知订阅者, Nacos使用了发布-订阅模式来处理. 简单理解就是,事件发布器发布相应的事件,然后对事件感兴趣的订阅者就会进行相应的处理,解耦
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
// 发布实例元数据事件通知订阅者
NotifyCenter
.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}
Service singleton = ServiceManager.getInstance().getSingleton(service)
// ServiceManager.getInstance()使用饿汉式单例返回一个ServiceManager对象
private static final ServiceManager INSTANCE = new ServiceManager();
public static ServiceManager getInstance() {
return INSTANCE;
}
public Service getSingleton(Service service) {
// 如果service在singletonRepository中找不到,则存入到singletonRepository中;否则返回已存在的Service
// 怎么判断service是否已经存在,service重写了equal和hasCode方法,namespace+group+serviceName在服务端是一个单例Service
singletonRepository.computeIfAbsent(service, key -> {
NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
return service;
});
// 然后从singletonRepository中拿出来
// ConcurrentHashMap<Service, Service> singletonRepository
Service result = singletonRepository.get(service);
// namespaceSingletonMaps其实是按照NamespaceId来存放所有的Service
// ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
namespaceSingletonMaps.get(result.getNamespace()).add(result);
return result;
}
singletonRepository和namespaceSingletonMaps都是ConcurrentHashMap,分别存放Service信息、同一个命令空间下的所有Service集合:
/**
* 保存单例的服务,注意,singletonRepository仅保存服务这个维度
* 因为Service里面是的属性是没有端口,ip等属性的,意味着这里的Service是指服务(例如:server.name = discovery-provider),
* 就算有几个discovery-provider,这里的Service都是泛指的discovery-provider
*/
private final ConcurrentHashMap<Service, Service> singletonRepository;
/**
* 命名空间下所有的服务集合
* key: namespaceId
* value: Set<Service>
*/
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
执行完上面的逻辑后,两个map中的样子如下所示:
将Instance实例信息封装成发布信息实体InstancePublishInfo。
default InstancePublishInfo getPublishInfo(Instance instance) {
InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
Map<String, Object> extendDatum = result.getExtendDatum();
if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
extendDatum.putAll(instance.getMetadata());
}
if (StringUtils.isNotEmpty(instance.getInstanceId())) {
extendDatum.put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
}
if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
extendDatum.put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
}
if (!instance.isEnabled()) {
extendDatum.put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
}
String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
: instance.getClusterName();
result.setHealthy(instance.isHealthy());
result.setCluster(clusterName);
return result;
}
这里提到的发布列表也是一个map:
protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
// com.alibaba.nacos.naming.core.v2.client.AbstractClient#addServiceInstance
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
// protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
// 将实例信息添加到发布者列表publishers中
if (null == publishers.put(service, instancePublishInfo)) {
if (instancePublishInfo instanceof BatchInstancePublishInfo) {
MetricsMonitor.incrementIpCountWithBatchRegister(instancePublishInfo);
} else {
MetricsMonitor.incrementInstanceCount();
}
}
// 发布一个客户端注册事件通知订阅者
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
return true;
}
添加完实例后的发布列表publishers如下:
可以看到注册逻辑好像并没有处理什么内容,实际都是在发布事件后,接收事件的处理逻辑进行处理的,这里使用到了事件-发布模型,这里暂且先不讨论NotifyCenter,后面单独分析。
还记得注册实例时,在EphemeralClientOperationServiceImpl#registerInstance中发布了一个事件:
// 发布客户端注册事件通知订阅者
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
通过ClientRegisterServiceEvent的引用路径,我们找到是下面这个类监听了ClientRegisterServiceEvent这种事件:
com.alibaba.nacos.naming.core.v2.index.ClientServiceIndexesManager#handleClientOperation
在ClientServiceIndexesManager类存在一个map,用于存放服务实例信息:?
/**
* 服务与发布这个服务的客户端的关联关系
*/
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
private void handleClientOperation(ClientOperationEvent event) {
// 获取服务
Service service = event.getService();
// 获取客户端ID
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
// 处理ClientRegisterServiceEvent事件(服务注册)
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
// 处理ClientDeregisterServiceEvent事件(服务下线)
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
// 处理ClientSubscribeServiceEvent事件(订阅)
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
// 处理ClientUnsubscribeServiceEvent事件(取消订阅)
removeSubscriberIndexes(service, clientId);
}
}
服务注册处理后的发布者列表如下图所示:
本篇文章中,我们从Nacos客户端自动注册在何时触发开始,然后分析了客户端向Nacos服务端提交服务注册请求,最后到Nacos服务端处理这个注册请求,整个流程大概梳理了一下,下一篇文章,我们将会详细分析NotifyCenter这个事件-发布机制的原理,然后继续分析服务端处理注册请求过程中涉及到的后续流程处理。
?