目录
通过对Nacos服务端处理客户端的服务注册请求的分析,我们看到好像并没有处理很多事情。大体的过程就是:客户端将自己注册到了服务器端的ClientManager中,然后在客户端Client中存在一个Map: ConcurrentHashMap<Service, InstancePublishInfo> publishers. 即发布者列表。Nacos将服务实例信息存入到发布者列表后,然后发布了一些事件,其实大部分逻辑都是在发布事件后,订阅事件的那些订阅者进行处理的。
而具体是怎么处理的呢?那就需要深入分析NotifyCenter这个类了。
NotifyCenter是Nacos中的统一事件通知中心。
// 典型的单例模式(饿汉式)
private static final NotifyCenter INSTANCE = new NotifyCenter();
通过源码,我们可以发现,NotifyCenter使用了单例,说明整个进程共享这个实例,所有的内部变量归所有线程共享,我们需要考虑并发问题。也就是操作内部变量的时候,需要加锁去操作,防止其它线程也同时操作导致出错。
我们看下NotifyCenter的一些成员变量:
/**
* ringBufferSize/shareBufferSize这两个变量因为在静态块中初始化,然后并没有改变其初始值,所以不需要特殊处理
*/
public static int ringBufferSize;
public static int shareBufferSize;
private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/**
* EventPublisher工厂
*/
private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;
/**
* 共享EventPublisher工厂
*/
private DefaultSharePublisher sharePublisher;
private static Class<? extends EventPublisher> clazz;
/**
* 事件发布器管理容器
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
继续向下看,看到了一个静态代码块,进行一些初始化工作。当NotifyCenter类被加载时,静态代码块会被执行,且只会执行一次。
static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
// 默认的生产者的阻塞队列大小
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
// Integer.getInteger获取系统属性的值 System.getProperty("xxx")
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
// 共享生产者阻塞队列大小
// The size of the public publisher's message staging queue buffer
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
// Integer.getInteger获取系统属性的值 System.getProperty("xxx")
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
// 这里采用了SPI的扩展机制,里面包含了JDK原生的SPI扩展机制,不过Nacos对这部分的内容加了缓存
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = publishers.iterator();
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
// 没有配置,就用默认的DefaultPublisher生产者
clazz = DefaultPublisher.class;
}
// 创建一个EventPublisher的工厂,主要是对EventPublisher进行创建并初始化
// 这里是一个Lambda表达式,只是声明了一个回调函数,并不会马上执行内部逻辑
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
// 实际上是对于每个publisher都启动了一个线程
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};
try {
// 创建并初始化共享的生产者
// Create and init DefaultSharePublisher instance.
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
}
// JVM关闭的回调钩子
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}
在静态代码块中,对一些静态成员变量进行了初始化,我们重点关注DefaultPublisher和DefaultSharePublisher。
先简单介绍一下这两个类:
每一个DefaultPublisher是一个线程,处理的是某一类的事件,去通知对这一类事件感兴趣的订阅者。?
public class DefaultPublisher extends Thread implements EventPublisher {
}
DefaultPublisher继承Thread类,它是一个后台线程,内部包含一个阻塞队列,一直在等待事件的来到。当事件到来后,立马去通知订阅了这个事件的消费者去进行消费。
先来看看DefaultPublisher的成员变量:
/**
* 标识事件发布者是否初始化的状态位
*/
private volatile boolean initialized = false;
/**
* 标识事件发布者是否关闭的状态位
*/
private volatile boolean shutdown = false;
/**
* 事件类型,其非集合,说明一个线程只能绑定一种事件类型
*/
private Class<? extends Event> eventType;
/**
* 监听这种事件的订阅者集合
*/
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();
/**
* 阻塞队列大小
*/
private int queueMaxSize = -1;
/**
* 阻塞队列
*/
private BlockingQueue<Event> queue;
/**
* 最大的事件序号,事件每次产生都会有个事件序号
*/
protected volatile Long lastEventSequence = -1L;
前面我们介绍到,在NotifyCenter的静态代码块中有初始化一个DEFAULT_PUBLISHER_FACTORY工厂,内部逻辑其实就是初始化DefaultPublisher。我们看下DefaultPublisher#init()初始化方法:
public void init(Class<? extends Event> type, int bufferSize) {
// 设置后台守护线程,当服务关闭的时候会自动关闭
setDaemon(true);
// 设置线程的名称,当我们打印JVM线程的时候可以快速查找到
setName("nacos.publisher-" + type.getName());
// 指定事件类型
this.eventType = type;
// 指定阻塞队列的大小
this.queueMaxSize = bufferSize;
// 初始化阻塞队列
this.queue = new ArrayBlockingQueue<>(bufferSize);
// 启动线程
start();
}
public synchronized void start() {
// initialized变量使用volatile修饰,保证可见性,其它线程能及时发现initialized已经被改为true
if (!initialized) {
// start just called once
// 保证只启动一次,调用父类,告诉JVM去进行线程启动
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
// 线程启动后,将initialized置为true,保证只会启动一次
initialized = true;
}
}
start()方法中,通过super.start()启动了这个线程,既然是线程,肯定有run()方法,这是核心处理方法:
public void run() {
// 线程真正执行的逻辑
openEventHandler();
}
void openEventHandler() {
try {
// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
while (!shutdown && !hasSubscriber() && waitTimes > 0) {
ThreadUtils.sleep(1000L);
waitTimes--;
}
// shutdown初始值为false,执行一个死循环,一直运行在后台,直到在shutdown()方法中将shutdown置为true,才会停止
// 当然,如果阻塞队列中没有任务的话,线程会被阻塞,阻塞队列的线程是不占用CPU资源的,也就是占了一部分空间和资源
while (!shutdown) {
// 阻塞队列中如果没有事件,这里进行阻塞
final Event event = queue.take();
// 获取到事件进行处理. 当阻塞队列中有数据的时候,就会立马触发获取到队列元素,开始处理。
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}
可以看到,在run()方法中,有个死循环,一直在后台运行着,一直在判断阻塞队列中是否有需要处理的事件,如果队列中没有事件要处理,那么线程会阻塞在那里,一旦队列中有事件需要处理,则立马触发获取到队列元素,执行receiveEvent(event)处理方法。
/**
* 接收事件,并通知订阅者处理事件
*/
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
// 没有订阅者,则直接返回
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}
// Notification single event listener
for (Subscriber subscriber : subscribers) {
// 判断事件是否匹配,跳过不匹配的那些订阅者,不进行处理
if (!subscriber.scopeMatches(event)) {
continue;
}
// 是否忽略过期的事件
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}
receiveEvent()的核心逻辑就是循环遍历所有的订阅者,判断这种事件订阅者是否有订阅,跳过不匹配的那些订阅者,不进行处理。
然后调用notifySubscriber()通知每个订阅者。
public void notifySubscriber(final Subscriber subscriber, final Event event) {
LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
// 包装成一个任务去处理事件
final Runnable job = () -> subscriber.onEvent(event);
// 支持配置线程池,很灵活,让每个订阅者决定是同步调用还是异步调用
final Executor executor = subscriber.executor();
if (executor != null) {
// 如果订阅者指定了线程池,将Runnable放入线程池,等待异步执行
executor.execute(job);
} else {
try {
// 直接调用方法,即同步调用
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
在notifySubscriber()方法中,我们将事件包装成一个runnable任务去执行,同时Nacos还支持每个订阅者配置自己的执行线程池,来决定是异步执行还是同步执行订阅者的onEvent()方法。
小结:DefaultPublisher是一个后台守护线程,其内部存在一个阻塞队列用于存放生产者发布的事件,还存在一个订阅这种事件的订阅者集合,在初始化DefaultPublisher的时候,会启动这个线程,也就是执行线程的run()方法,在run()方法中,有个死循环,一直运行在后台,一直尝试从阻塞队列中获取事件,如果队列中没事件,则阻塞;直到队列中有事件,立马触发取出事件开始处理。事件处理的过程,就是遍历所有的订阅者集合,然后挨个通知它们,并支持配置线程池来决定是同步还是异步执行,挨个回调订阅者的onEvent()方法。
DefaultSharePublisher是继承于DefaultPublisher,只是多了一个私有变量:?
/**
* key是SlowEvent, value是Set<Subscriber>
* 每一种事件,一个Set,共享同一个DefaultSharePublisher,处理比较耗时的一些事件(SlowEvent事件类型)
*/
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();
DefaultSharePublisher内部维护了一个Map,key是SlowEvent,value是Set<Subscriber>。也就是说其维护了一个Set,每种SlowEvent事件对应一个订阅者集合。他们共享了这个DefaultSharePublisher,既然定义为SlowEvent,应该是处理比较耗时或者可以慢点处理,所以都放在一个Set,可以慢慢轮训处理。
通过对DefaultPublisher的分析,我们知道了DefaultPublisher就是后台跑着一个线程,不断尝试从阻塞队列中拿事件出来执行,队列为空,那就阻塞,等到队列有元素,就触发执行后续的一系列流程。
那么,DefaultPublisher中阻塞队列中的事件是如何放入的呢?其实就是通过com.alibaba.nacos.common.notify.DefaultPublisher#publish存入的。
下面我们以Nacos服务端处理客户端提交的服务注册请求过程中的一个事件为例来分析,详细代码见:com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl#registerInstance。?
NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
这里通过NotifyCenter的publishEvent()发布了一个ClientRegisterServiceEvent事件,我们跟踪一下源码:
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}
// topic = com.alibaba.nacos.naming.core.v2.event.client.ClientOperationEvent.ClientRegisterServiceEvent
final String topic = ClassUtils.getCanonicalName(eventType);
// 获取对应的EventPublisher进行发布,也就是通知订阅者去处理
// Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16)
// INSTANCE.publisherMap就是事件发布器管理容器
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
// 调用publish()发布事件
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}
public boolean publish(Event event) {
// 校验publisher是否初始化
checkIsStart();
// 将事件存入阻塞队列中,这样订阅者就可以从队列中取出任务执行
boolean success = this.queue.offer(event);
// 如果存入队列失败,则直接通知订阅者处理事件
if (!success) {
Loggers.EVT_LOG.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
handleEvent(event);
}
return true;
}
在本例中,从INSTANCE.publisherMap.get(topic)获取到的EventPublisher其实是NamingEventPublisher,NamingEventPublisher是一种共享的发布者,其实大体逻辑跟DefaultPublisher一样。
本例中,调用NamingEventPublisher#publish发布事件:?
public boolean publish(Event event) {
// 校验publisher是否初始化
checkIsStart();
// 将事件存入阻塞队列中,这样订阅者就可以从队列中取出任务执行
boolean success = this.queue.offer(event);
// 如果存入队列失败,则直接通知订阅者处理事件
if (!success) {
Loggers.EVT_LOG.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
handleEvent(event);
}
return true;
}
通过this.queue.offer(event);将事件存入到了发布者内部的阻塞队列中。到这里,我们应该对这个生产者的整个处理流程应该有了比较清晰的认识。
还是以前面的ClientRegisterServiceEvent服务注册事件为例来分析,Nacos服务端怎么处理这个事件的。
首先我们得找到ClientRegisterServiceEvent事件的订阅者。因为订阅者既然订阅了事件,那说明它要对事件做出响应的处理。对于程序代码来说,肯定就是onEvent(Event event)的方法去处理逻辑。
我们使用IDEA点击类名,查看调用,可以看到有好几处引用了。
可以看到,除了两个是处理方法外,其他的都是发布事件。而处理方法的两个都属于类ClientServiceIndexesManager,我们就继续分析ClientServiceIndexesManager。如下图,也可以看到ClientServiceIndexesManager确实是订阅了ClientRegisterServiceEvent事件。
在ClientServiceIndexesManager的构造方法中可以看到其在NotifyCenter中注册了自己。
public ClientServiceIndexesManager() {
// 向NotifyCenter中注册自己
// NamingEventPublisherFactory.getInstance()也是用到了单例,返回了NamingEventPublisherFactory
NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
}
// com.alibaba.nacos.common.notify.NotifyCenter#registerSubscriber(com.alibaba.nacos.common.notify.listener.Subscriber, com.alibaba.nacos.common.notify.EventPublisherFactory)
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher.
if (consumer instanceof SmartSubscriber) {
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// For case, producer: defaultPublisher -> consumer: subscriber.
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
// 添加订阅者
addSubscriber(consumer, subscribeType, factory);
}
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {
final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
// 将订阅者放入EventPublisher中
if (publisher instanceof ShardedEventPublisher) {
// 共享事件发布器
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
// DefaultPublisher
publisher.addSubscriber(consumer);
}
}
如上,就完成了订阅者注册到发布器中的处理。
在前面分析DefaultPublisher的时候,我们知道了,一旦阻塞队列中有事件需要处理,则会触发回调每个订阅者的onEvent()方法,那我们就来看看ClientServiceIndexesManager的onEvent()方法:?
public void onEvent(Event event) {
if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
// 处理客户端断开连接事件
handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
} else if (event instanceof ClientOperationEvent) {
// 处理排除ClientReleaseEvent后的其它客户端操作事件
handleClientOperation((ClientOperationEvent) event);
}
}
private void handleClientOperation(ClientOperationEvent event) {
// 获取服务
Service service = event.getService();
// 获取客户端ID
String clientId = event.getClientId();
if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
// 处理ClientRegisterServiceEvent事件(服务注册)
addPublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
// 处理ClientDeregisterServiceEvent事件(服务下线)
removePublisherIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
// 处理ClientSubscribeServiceEvent事件(订阅)
addSubscriberIndexes(service, clientId);
} else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
// 处理ClientUnsubscribeServiceEvent事件(取消订阅)
removeSubscriberIndexes(service, clientId);
}
}
?可以看到,每个事件类型执行不同的逻辑。这里我们以服务注册为例:
private void addPublisherIndexes(Service service, String clientId) {
// 将客户端添加到注册服务中
// ConcurrentMap<Service, Set<String>> publisherIndexes
publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
publisherIndexes.get(service).add(clientId);
// 发布服务变更事件,这样其它客户端才能及时知道service这个服务可用了
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
}
将服务存入到一个map中,如下:key是service,value就是clientId的集合。
private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
然后发布一个服务变更事件ServiceChangedEvent,这样其它客户端就能及时知道service这个服务可用,可以来订阅了。
分析到这里,我们应该大致搞懂了NotifyCenter的工作原理,通过巧妙的设计,将生产者和消费者进行了解耦,在开发过程中,我们也可以时刻思考能否借鉴这种思路进行设计等等。下一篇文章,我们从发布一个服务变更事件ServiceChangedEvent这里继续分析,Nacos服务注册剩下的一些流程。?