本篇文章主要还是让人快速上手入门,想要深入的话可以通过书籍系统的学习。
可用于协调、构建分布式应用。
本质上是一个分布式的小文件存储系统。提供基于类似于文件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理。从而用来维护和监控你存储的数据的状态变化。通过监控这些数据状态的变化,从而可以达到基于数据的集群管理。
ZooKeeper 适用于存储和协同相关的关键数据,不适合用于大数据量存储。
下载地址:https://zookeeper.apache.org/releases.html
创建数据存储目录
zoo_sample.cfg
并重命名为zoo_.cfg
,打开并配置数据存储目录类似于文件系统的结构。
Zookeeper 的层次模型称作DataTree。DataTree 的每个节点叫作znode。不同于文件系统,每个节点都可以保存数据。每个节点都有一个版本(version),版本从0 开始计数。
是否持久?
是否有序?
每个顺序节点,都分配一个唯一且是在之前基础上递增的整数。
查看根节点:ls /
查看子节点:ls /app1
create /app1
create -s /app2
create -e /app3
create -e -s /app4
get /app1
stat /app1
## -------------------------节点的状态信息,也称为stat结构体---------------------
cZxid = 0x17f ## 该数据节点被创建时的事务id
#其中zxid表示的是zookeeper的事务ID,由64位数字组成,分为高32位和低32位
ctime = Sat Dec 21 19:47:36 CST 2019 ## 该数据节点创建时间
mZxid = 0x17f ## 该数据节点被修改时最新的事物id
mtime = Sat Dec 21 19:47:36 CST 2019 ## 该数据节点最后更新时间
pZxid = 0x183 ## 当前节点的父级节点事务ID
cversion = 4 ## znode子节点变化号,znode子节点修改次数
dataVersion = 0 ## znode数据变化号
aclVersion = 0 ## 访问控制列表的变化号 access control
ephemeralOwner = 0x0 ## 如果临时节点,表示当前节点的拥有者的sessionId。如果不是临时节点,则值为0
dataLength = 6 ## 数据长度
numChildren = 4 ## 子节点数据
set /app1 12345
delete /app20000000002
deleteall /app1
拥有子节点的父节点,无法使用delete删除
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
@Before
public void initClient() {
//初始化Zookeeper的客户端对象client
String connectString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 100);
int sessionTimeoutMs = 60 * 1000;//当前客户端会话超时时间
int connectionTimeoutMs = 15 * 1000;//连接超时时间
client = CuratorFrameworkFactory
.newClient(connectString, sessionTimeoutMs,
connectionTimeoutMs, retryPolicy);
}
@After
public void destroy() {
client.close();
}
@After
public void destroy() {
client.close();
}
@Test
public void createNode() throws Exception {
//3种方式,四种节点类型
client.start();
//方式1: 创建空节点
// client.create().forPath("/app3");
//方式2: 创建有内容节点
// client.create().forPath("/app4", "app3Node".getBytes());
//方式3: 创建多层节点
// client.create().creatingParentsIfNeeded().forPath("/app5/a", "aa".getBytes());
//节点节点类型1 : 持久节点 CreateMode.PERSISTENT
//节点节点类型2 : 临时节点【客户端关闭则节点消失】CreateMode.EPHEMERAL
// client.create().withMode(CreateMode.EPHEMERAL).forPath("/app6", "app5Node".getBytes());
//节点节点类型3 : 持久节点+自带序号 CreateMode.PERSISTENT_SEQUENTIAL
// client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/app7", "app6Node".getBytes());
//节点节点类型4 : 临时节点+自带序号【客户端关闭则节点消失】CreateMode.EPHEMERAL_SEQUENTIAL
// client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/app8", "app6Node".getBytes());
//Thread.sleep(9000);//休眠9秒,观察效果
//3.关闭客户端,释放资源
client.close();
}
@Test
public void updateNode() throws Exception {
client.start();
client.setData().forPath("/app1", "app1Node".getBytes());
client.close();
}
@Test
public void getNode() throws Exception {
//1.开启客户端
client.start();
//2.获取节点数据
byte[] nodeDateBytes = client.getData().forPath("/app1");
System.out.println("节点数据 = " + new String(nodeDateBytes));
//3.关闭客户端,释放资源
client.close();
}
@Test
public void deleteNode() throws Exception {
//1.开启客户端
client.start();
//2.获取节点数据
//方式1: 删除一个节点
//client.delete().forPath("/app3");
//方式2: 递归删除多个节点
//client.delete().deletingChildrenIfNeeded().forPath("/app3");
//方式3: 强制删除【避免一些因为网络传输导致的删除不成功】
client.delete().guaranteed().forPath("/app3");
//3.关闭客户端,释放资源
client.close();
}
watch机制本质就是订阅发布!采用了推拉结合的模式。
Watch机制允许客户端在ZooKeeper上的数据节点发生变化时获得通知。它提供了一种事件驱动的编程模型,使得应用程序可以实时地监控和响应数据的变化,而无需持续地查询。
当客户端在某个数据节点上注册了一个Watch,它会在以下三种情况下被触发:
Zookeeper观察者Watcher由三个部分组成,涉及消息通信及数据存储。
监听过程:
假如我们监听B节点,那么也可以监听B1、B2,但是孙子节点B11无法监听到。
注意:watcher设置后一旦触发一次后就会失效,如果要想一直监听,需要在process回调函数里重新注册相同的 watcher。
Apache Curator框架的监听实现:
Curator对watcher机制做了优化,Curator引入了Cache的概念用来实现对Zookeeper服务器端进行事件监听。Cache是Curator对事件监听的包装,其对事件的监听可以近似看做是一个本地缓存视图和远程Zookeeper视图的对比过程。而且Curator会自动的再次监听,我们就不需要自己手动的重复监听了。
**Curator客户端的Cache共有三种模式: **
public class Demo02Watch {
CuratorFramework client;
@Before
public void initClient() {
//初始化Zookeeper的客户端对象client
String connectString = "127.0.0.1:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 100);
int sessionTimeoutMs = 60 * 1000;//当前客户端会话超时时间
int connectionTimeoutMs = 15 * 1000;//连接超时时间
client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
}
@After
public void destroy(){
client.close();
}
/**
* 1.监听节点数据变化
*/
@Test
public void listenNode() throws Exception {
//1.启动客户端
client.start();
System.out.println("连接Zookeeper成功~~~~~~~~~~~~~~~~~~~");
//2.创建节点监听对象NodeCache : 设置监听节点、监听回调方法
NodeCache nodeCache = new NodeCache(client, "/hero");
//设置监听节点
ChildData currentData = nodeCache.getCurrentData();
System.out.println("当前节点数据 = " + currentData);
//开启监听
nodeCache.start(true);
//设置监听回调方法
nodeCache.getListenable().addListener(new NodeCacheListener() {
/**
* 如果节点数据有变化,回调当前方法
*/
@Override
public void nodeChanged() throws Exception {
//获取当前节点的数据
ChildData currentData = nodeCache.getCurrentData();
//获取最新节点名称
String path = currentData.getPath();
System.out.println("节点名称 = " + path);
//获取最新节点数据
byte[] currentDataByte = currentData.getData();
System.out.println("修改后节点数据" + new String(currentDataByte));
System.out.println("--------------->>");
}
});
//从输入流中读取数据的下一个字节。
//阻塞主线程
//System.in.read();
Thread.sleep(100000);
}
}
PathChildrenCache是用来监听指定节点的子节点变化情况【新增、修改、删除】。
启动模式:
如何选用合适的模式?
触发回调的事件类型有哪些?
/**
* 2.监听当前节点的子节点变化,不含节点数据
*/
@Test
public void listenSubNode2() throws Exception {
//1.启动客户端
client.start();
System.out.println("连接Zookeeper成功~~~~~~~~~~~~~~~~~~~");
//2.创建节点监听对象PathChildrenCache :
//参数3,是否缓存数据
PathChildrenCache childrenCache = new PathChildrenCache(client, "/hero", true);
// 开启监听
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// 设置监听回调方法
childrenCache.getListenable().addListener((client, event) -> {
//获取修改的数据
byte[] bytes = event.getData().getData();
System.out.println("节点内数据 = " + new String(bytes));
//获取被修改的子节点
System.out.println("节点名称 = " + event.getData().getPath());
//获取事件类型
//PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED 重新连接
//PathChildrenCacheEvent.Type.connection_lost 连接丢失
//PathChildrenCacheEvent.Type.connection_suspended 连接暂停
//PathChildrenCacheEvent.Type.INITIALIZED 初始化
//PathChildrenCacheEvent.Type.CHILD_REMOVED 子节点移除
//PathChildrenCacheEvent.Type.CHILD_ADDED 子节点添加
//PathChildrenCacheEvent.Type.CHILD_UPDATED 子节点修改
PathChildrenCacheEvent.Type type = event.getType();
System.out.println("事件触发类型 = " + type);
System.out.println("--------------------------------------------------->>");
});
//3.阻塞程序
System.in.read();
}
/**
* 3.树形监听所有下级节点变化【模式1+模式2】,含节点数据变更
*/
@Test
public void treeCache() throws Exception {
//1.启动客户端
client.start();
System.out.println("连接Zookeeper成功~~~~~~~~~~~~~~~~~~~");
//2.创建节点监听对象TreeCache :
TreeCache treeCache = new TreeCache(client, "/hero");
//启动缓存
treeCache.start();
//添加监听回调方法
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
//获取修改的数据
byte[] bytes = event.getData().getData();
System.out.println("节点内数据 = " + new String(bytes));
//获取被修改的子节点
System.out.println("节点名称 = " + event.getData().getPath());
//获取事件类型
//TreeCacheEvent.Type.CONNECTION_RECONNECTED 重新连接
//TreeCacheEvent.Type.connection_lost 连接丢失
//TreeCacheEvent.Type.connection_suspended 连接暂停
//TreeCacheEvent.Type.INITIALIZED 初始化
//TreeCacheEvent.Type.NODE_REMOVED 子节点移除
//TreeCacheEvent.Type.NODE_ADDED 子节点添加
//TreeCacheEvent.Type.NODE_UPDATED 子节点修改
TreeCacheEvent.Type type = event.getType();
System.out.println("事件触发类型 = " + type);
System.out.println("--------------------------------------------------->>");
}
});
//3.阻塞程序
System.in.read();
}
假如你认真读完本篇,那么你已经超越很多人了。
如果你对其感兴趣,可以看看下一篇,我们基于ZK手写一个简单的分布式锁,加强实践,巩固知识。https://blog.csdn.net/qq_38974073/article/details/135293504