🌈🌈🌈🌈🌈🌈🌈🌈
欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术
的推送
发送 资料
可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景
、中间件系列笔记
和编程高频电子书
!
文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁
zk 运维常用命令:
命令执行模板:echo [参数] | nc localhost 2181
,常用参数如下:
基于 jstat 命令监控和检查 zk 的 JVM 运行情况:
# 先查询 zk 的进程 id
ps -ef | grep zookeeper
# 进程 ID 515460 ,采样间隔 250 ms,采样数 4
jstat -gc 515460 250 4
开启 zk 的 JMX 端口以及使用 JConsole 观察 JVM 内存使用:
zk 中启动 JMX 功能,可以去暴露一组端口,以供监控和管理 Java 应用程序
在 zkServer.sh
中,找到 ZOOMAIN
变量,将 ZOOMAIN 后边的内容替换为下边的三条内容:
-Dcom.sum.management.jmxremote.port=21811
-Dcom.sum.management.jmxremote.ssl=false
-Dcom.sum.management.jmxremote.authenticate=false
打开 JMX 端口之后,就可以通过 JDK 自带的可视化 JVM 进程内存分析工具 JConsole
进行内存分析了
zk 操作常用命令:
# 启动服务端
sh zkServer.sh start
# 启动客户端
sh zkCli.sh
# 创建 test 节点,数据为 1010
create /test 1010
# 查看目录
ls /
# 读取数据
get /test
# 更新数据
set /test 1020
# 删除节点
delete /test
目前,业内使用最广泛的 zk 客户端框架为:Curator
Curator 是由 Netflix 开源出来的,提供了丰富的操作 zk 的 API
如果想使用 Curator,需要引入的 maven 依赖如下(zk 使用 3.4.5 版本,对应 curator 框架版本 2.4.2):
目前 Curator 官网说最新版本的 Curator 已经取消了对 zookeeper 3.4.x 的支持,不过原理都是一致的,先了解原理,再看源码就很容易看懂
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
Curator 连接 zk 使用示例如下:
public class CuratorZkExample {
public static void main(String[] args) {
// 创建一个 CuratorFramework 实例
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
5000, // 客户端跟 zk 进行心跳,如果心跳断开超过 5 秒,绘画就会断开
3000, // 连接 zk 的超时时间
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
try {
// 创建一个 ZooKeeper 节点
String path = "/example/path";
client.create().creatingParentsIfNeeded().forPath(path, "data".getBytes());
// 获取节点数据
byte[] data = client.getData().forPath(path);
System.out.println("Node data: " + new String(data));
// 更新节点数据
byte[] newData = "new data".getBytes();
client.setData().forPath(path, newData);
// 读取更新后的节点数据
data = client.getData().forPath(path);
System.out.println("Updated node data: " + new String(data));
// 删除节点
client.delete().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭客户端
client.close();
}
}
}
zk 一般如何实现元数据存储?
一般是将元数据转成 json 字符串
,去创建一个 znode,将元数据的 json 串写到这个 znode 的值中,如果需要就直接从这个 znode 中读取即可
下边主要讲解一下 Curator 中常用的一些功能如何使用:
Curator 中 Leader 的选举
Curator 的 Leader 选举机制有两种:
Leader Latch 使用讲解:
首先创建一个 Leader Latch 如下,在 /leader/latch
目录下创建节点进行 Leader 的选举:
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
所有竞选 Leader 目录为 /leader/latch
的节点会进行协商,选择一个 Leader 出来,可以通过 hasLeaderShip
判断自己是否时 Leader:
System.out.println("是否成为 Leader:" + leaderLatch.hasLeadership());
调用 start
方法之后,节点会开始竞选 Leader,之后阻塞在 await
方法上,直到成为 Leader 之后,才可以执行 await 之后的代码:
// 开始竞选 Leader
leaderLatch.start();
// 阻塞等待成为 Leader
leaderLatch.await();
// 执行 Leader 操作
...
// 将当前实例从 Leader 选举中关闭
leaderLatch.close();
完整代码如下:
public class LeaderLatchDemo {
public static void main(String[] args) throws Exception {
// 创建一个 CuratorFramework 实例
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");
leaderLatch.start();
leaderLatch.await();
System.out.println("是否成为 Leader:" + leaderLatch.hasLeadership());
leaderLatch.close();
}
}
Leader Election 使用讲解:
使用 Leader Election 时,需要先创建 LeaderSelector
实例,之后调用 start
方法去启动竞选,当成为 Leader 之后,在 LeaderSelector 中注册的监听器中的 takeLeadership
方法将会被调用,在该方法中执行 Leader 所需要的操作,执行完毕之后,当方法退出时,就意味着当前实例放弃了 Leader 地位,完整代码如下:
public class LeaderElectionDemo {
public static void main(String[] args) throws Exception {
// 创建一个 CuratorFramework 实例
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
LeaderSelector leaderSelector = new LeaderSelector(client, "/leader/selector", new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
System.out.println("你已经成为 Leader");
// 在这里做 Leader 要做的事情,此时不能让方法退出
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("连接状态发生变化");
if (newState.equals(ConnectionState.LOST)) {
throw new CancelLeadershipException();
}
}
});
leaderSelector.start();
Thread.sleep(Integer.MAX_VALUE);
}
}
Curator 实现分布式锁原理:
Curator 实现 zk 的分布式锁,就是通过 临时顺序节点
实现
简单来说,就是所有人去竞争一个锁
指定一个锁的目录,在此目录下创建一个临时顺序节点
如果是第一个节点的话,表示获取到了锁
如果不是第一个节点,就对上一个节点加一个监听,上一个节点释放锁之后,第二个节点会得到通知,就去拿到锁
Curator 提供的节点监听机制:
Curator 事件有两种模式:
标准的观察模式
,使用 Watcher 监听器
缓存事件监听
,类似于将本地缓存视图和 ZooKeeper 视图进行对比的过程,Curator 提供了 3 个接口:
Watcher 监听器处理事件:
需要定义一个事件,再通过 usingWatcher
即可对某个路径进行监听
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
log.info("监听器watchedEvent:" + watchedEvent);
}
};
byte[] content = client.getData().usingWatcher(watcher).forPath("/test/watcher");
System.out.println(new String(content));
利用 Watcher 对节点进行监听的话,缺点就是只可以监听 1 次,监听到 1 次事件之后这个监听器就会失效
如果要反复监听,那么就需要反复调用 usingWatcher
来注册
缓存事件监听:
PathCache 和 NodeCache 接口的使用都放在下边(这里使用的 Curator 客户端版本为 2.4.2,因此还没有 Tree Cache 功能,这里就不演示了):
// Path Cache 监听
public static void main(String[] args) throws Exception {
// 创建一个 CuratorFramework 实例
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("监听到子节点发生变化,收到事件:" + pathChildrenCacheEvent);
}
});
pathChildrenCache.start();
Thread.sleep(Integer.MAX_VALUE);
}
// NodeCache 监听
public static void main(String[] args) throws Exception {
// 创建一个 CuratorFramework 实例
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
final NodeCache nodeCache = new NodeCache(client, "/nodecache");
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
Stat stat = client.checkExists().forPath("/nodecache");
if (stat != null) {
byte[] bytes = client.getData().forPath("/nodecache");
System.out.println("节点数据发生变化" + new String(bytes));
System.out.println();
} else {
System.out.println("节点被删除...");
}
}
});
nodeCache.start();
Thread.sleep(Integer.MAX_VALUE);
}
ZooKeeper 客户端功能总结:
使用客户端工具 Curator 主要是用来做节点的 CRUD + 监听 + Leader 选举,用在分布式业务系统中
其中 Curator 提供的像 Barrier、分布式计数器、分布式队列这些功能使用场景很少,并且同样的功能还有更好的选择
比如更推荐使用 Redis 做分布式计数、RabbitMQ/RocketMQ 做分布式队列!