ZooKeeper 实战(四) Curator Watch事件监听

发布时间:2024年01月13日

ZooKeeper 实战(四) Curator Watch事件监听

0.前言

上一篇博客只介绍了有关Curator中对ZNode的CRUD操作,从本篇起开始逐步介绍更加高级的API操作。

1.Watch 事件监听概念

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。虽然ZooKeeper原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员反复注册Watcher,比较繁琐。

而 Curator 引入了Cache 来实现对 ZooKeeper 服务端事件的监听。

Curator 中提供了三种 Cache(Watcher)来监听不同节点变化类型:

  • NodeCache:监听指定的节点。
  • PathChildrenCache:监听指定节点的子节点。
  • TreeCache:监听指定节点及其子孙节点。

2.NodeCache

监听指定的节点,增删改都会监听。

2.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 */
public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed);

2.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client,path);
        // 添加监听器
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                ChildData currentData = nodeCache.getCurrentData();
                if (currentData != null){
                    String s = new String(currentData.getData(),StandardCharsets.UTF_8);
                    log.info("监听{}节点发生变化,数据内容:{}",path,s);
                }else {
                    log.info("监听{}节点被删除了",path);
                }
            }
        });
      	// 开启监听
        nodeCache.start();

        TimeUnit.SECONDS.sleep(2);
        // 创建节点
        client.create().creatingParentsIfNeeded().forPath(path,"第一次新增".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 更新节点
        client.setData().forPath(path,"数据修改了".getBytes(StandardCharsets.UTF_8));
        TimeUnit.SECONDS.sleep(2);
        // 删除节点
        client.delete().deletingChildrenIfNeeded().forPath(path);
    }

2.3.日志输出

在这里插入图片描述

3.PathChildrenCache

监听指定节点的子节点。当一个子节点增删改时, PathChildrenCache会包含最新的子节点的数据和状态。

3.1.全参构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 */
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

3.2.子节点监听时间类型

public enum Type
{
  	// 子节点添加
    CHILD_ADDED,
  	// 子节点的数据变更
    CHILD_UPDATED,
		// 子节点被删除
    CHILD_REMOVED,
 
  	// 以下三个事件类型表示:当连接断开时,PathChildrenCache将继续保持其断开连接之前的状态,并且在连接恢复后,PathChildrenCache将为断开连接期间发生的所有添加、删除和更新发出正常的子事件。
  	// 当连接状态处于ConnectionState.SUSPENDED。
    CONNECTION_SUSPENDED,
  	// 当连接状态处于ConnectionState.RECONNECTED
    CONNECTION_RECONNECTED,
  	// 当连接状态处于ConnectionState.LOST
    CONNECTION_LOST,
  	
  	// 当通过PathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)启动监听时,该事件表示PathChildrenCache初始化完成
  This event signals that the initial cache has been populated.
    INITIALIZED
}

3.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher";
        TimeUnit.SECONDS.sleep(3);

        // 创建PathChildrenCache对象
        // 此处的cacheData参数一定要设置为true,不然Curator不会缓存数据当本地,
        // 那么后续pathChildrenCache.getCurrentData()得到的数据都为null
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,path,true);
        // 添加监听器
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.INITIALIZED){
                    log.info("PathChildrenCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

        // 创建子节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path+"/c1");
        client.create().creatingParentsIfNeeded().forPath(path+"/c2");
        client.create().creatingParentsIfNeeded().forPath(path+"/c3/age");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path+"/c1","c1更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path+"/c2","c2更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path+"/c3");
    }

3.3.日志输出

可以看出,PathChildrenCache只会监听直属子节点的变化,其非直属子节点的后代节点如/c3/age,没有发布通知。

在这里插入图片描述

4.TreeCache

监听指定节点及其子孙节点。

4.1.构造器参数

/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 */
public TreeCache(CuratorFramework client, String path)
  
/**
 * @param: client 注册监听的客户端
 * @param: path 节点路径
 * @param: cacheData 是否缓存节点内容(包含节点状态)
 * @param: dataIsCompressed 是否开启数据压缩。传递的数据会进行压缩,传递速度快,但取数据时需要把压缩的数据进行转换,默认为false
 * @param: maxDepth 最大深度。最深的那个后代节点到path所需要经过的节点数
 * @param: executorService 用于PathChildrenCache的后台线程的线程池。该线程池应该是单线程的,否则缓存可能会看到不一致的结果
 * @param: createParentNodes 是否需要创建父节点。如果父节点不存在泽创建父节点(容器节点)
 * @param: TreeCacheSelector TreeCache选择器。根据指定的策略和条件,选择适合的缓存树来创建和维护TreeCache
 */
TreeCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, int maxDepth, final ExecutorService executorService, boolean createParentNodes, TreeCacheSelector selector)

4.2.代码DEMO

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("。。。。。。。。。。。。。容器初始化完毕。。。。。。。。。。。。。。");
        String path = "/ahao/watcher/tree";
        TimeUnit.SECONDS.sleep(3);

        // 创建TreeCache对象,也可通过TreeCache.newBuilder()创建
        TreeCache treeCache = new TreeCache(client,path);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                if (event.getType() == TreeCacheEvent.Type.INITIALIZED){
                    log.info("TreeCache初始化完,事件类型:{}", event.getType());
                }else {
                    ChildData currentData = event.getData();
                    log.info("事件类型:{},监听到的子节点发生变化:{}",event.getType(),currentData.getPath());
                }
            }
        });
        // 开启监听
        treeCache.start();

        // 创建节点
        TimeUnit.SECONDS.sleep(2);
        client.create().creatingParentsIfNeeded().forPath(path);
        client.create().creatingParentsIfNeeded().forPath(path +"/t1");
        client.create().creatingParentsIfNeeded().forPath(path +"/t2/ccc");
        // 修改子节点
        TimeUnit.SECONDS.sleep(2);
        client.setData().forPath(path,"根节点更新了".getBytes(StandardCharsets.UTF_8));
        client.setData().forPath(path +"/t2/ccc","/t2/ccc更新了".getBytes(StandardCharsets.UTF_8));
        // 删除子节点
        TimeUnit.SECONDS.sleep(2);
        client.delete().deletingChildrenIfNeeded().forPath(path +"/t2");
    }

4.3.日志输出

可以看出TreeCache会监听当前节点和后代节点的变化。

在这里插入图片描述

文章来源:https://blog.csdn.net/qq_51513626/article/details/135554928
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。