目录
1. 引入maven
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
2. 增删改查API?
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkJavaClient {
private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";
private static ZooKeeper zooKeeper;
// 初始化zk客户端
@BeforeClass
public static void initZookeeper() throws IOException, InterruptedException {
System.out.println("initZookeeper");
CountDownLatch countDownLatch =new CountDownLatch(1);
zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR, 3000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected==event.getState()
&& event.getType()== Event.EventType.None){
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
System.out.println("zookeeper连接建立");
}
}
});
System.out.println("zookeeper连接中...");
countDownLatch.await();
// 打印连接状态
System.out.println(zooKeeper.getState());
}
public static String getUniqueNode(String node) {
return node + "-" + System.currentTimeMillis();
}
private static String syncNode = getUniqueNode("/user");
// 新增-同步
@Test
public void a_createSync() throws InterruptedException, KeeperException {
String result = zooKeeper.create(syncNode, "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("同步创建node成功" + result);
}
// 新增-异步
@Test
public void b_createASync() throws InterruptedException, KeeperException {
zooKeeper.create(getUniqueNode("/user-sync"), "kk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println(String.format("异步创建node成功 rc %s, path %s,ctx %s,name %s",rc,path,ctx,name));
}
}, "context");
Thread.sleep(1000 * 2);
}
// 更新-同步
@Test
public void c_updateSync() throws InterruptedException, KeeperException {
Stat stat = new Stat();
byte[] data = zooKeeper.getData(syncNode, false, stat);
System.out.println("node = " + syncNode + "修改前: " + new String(data));
stat = zooKeeper.setData(syncNode, "kk2".getBytes(), stat.getVersion());
System.out.println("同步修改node = " + syncNode + "成功 " + stat);
data = zooKeeper.getData(syncNode, false, stat);
System.out.println("node = " + syncNode + "修改后: " + new String(data));
}
// 删除
@Test
public void d_delSync() throws InterruptedException, KeeperException {
Stat stat = new Stat();
byte[] data = zooKeeper.getData(syncNode, false, stat);
System.out.println("node = " + syncNode + "删除前: " + new String(data));
zooKeeper.delete(syncNode, stat.getVersion());
System.out.println("同步删除node = " + syncNode + "成功 ");
stat = zooKeeper.exists(syncNode, null);
System.out.println("node = " + syncNode + "删除后: " + stat);
}
}
?
Curator 包含了几个包:
<!-- zookeeper client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";
private static CuratorFramework client;
@BeforeClass
public static void initZookeeper() {
System.out.println("initZookeeper");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(CLUSTER_CONNECT_STR)
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("Curator-test") // 包含隔离名称
.build();
client.start();
}
// 定义重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
????????当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
策略名称 | 描述 |
ExponentialBackoffRetry | 重试一组次数,重试之间的睡眠时间增加 |
RetryNTimes | 重试最大次数 |
RetryOneTime | 只重试一次 |
RetryUntilElapsed | 在给定的时间结束之前重试 |
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorClient {
private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";
private static CuratorFramework client;
@BeforeClass
public static void initZookeeper() {
System.out.println("initZookeeper");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(CLUSTER_CONNECT_STR)
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("Curator-test") // 包含隔离名称
.build();
client.start();
}
private static String TMP_PATH = getUniqueNode("/curator-kk");;
// 创建单节点
@Test
public void a_Create() throws Exception {
// String path = client.create().forPath("/curator-node");
String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(TMP_PATH, "kk".getBytes());
System.out.println("同步创建node成功, path = " + TMP_PATH + " result = " + pathResult);
}
public static String getUniqueNode(String node) {
return node + "-" + System.currentTimeMillis();
}
// 创建父子节点
@Test
public void b_Create_Parent() throws Exception {
String pathWithParent = getUniqueNode("/kk-parent/kk-sub-1");
String pathResult = client.create().creatingParentsIfNeeded().forPath(pathWithParent, "kk_son".getBytes());
System.out.println("同步创建node成功, path = " + pathWithParent + " result = " + pathResult);
}
// 更新节点
@Test
public void c_SetData() throws Exception {
Stat stat = client.setData().forPath(TMP_PATH, "changed!".getBytes());
System.out.println("更新node成功, path = " + TMP_PATH + " result = " + stat);
}
// 查询节点
@Test
public void d_GetData() throws Exception {
byte[] bytes = client.getData().forPath(TMP_PATH);
System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));
}
// 删除节点
@Test
public void e_Delete() throws Exception {
String pathWithParent="/kk-parent";
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
// 查询节点 - 异步
@Test
public void f_GetData_Async() throws Exception {
client.getData().inBackground((item1, item2) -> {
System.out.println(" background: val " + new String(item2.getData()) + " item2 = " + item2);
}).forPath(TMP_PATH);
Thread.sleep(1000 * 2);
}
// 查询节点 - 异步 - 指定线程池
@Test
public void g_GetData_Async_Excutor() throws Exception {
ExecutorService executorService = Executors.newSingleThreadExecutor();
client.getData().inBackground((item1, item2) -> {
System.out.println(" background: val " + new String(item2.getData()) + " item2 = " + item2);
},executorService).forPath(TMP_PATH);
Thread.sleep(1000 * 2);
}
}
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class ZkCuratorWatchClient {
private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";
private static CuratorFramework client;
@BeforeClass
public static void initZookeeper() {
System.out.println("initZookeeper");
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(CLUSTER_CONNECT_STR)
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("Curator-test") // 包含隔离名称
.build();
client.start();
}
private static String TMP_PATH = "/curator-kk-w";;
// 添加单节点监听器-永久
@Test
public void a_addWatch() throws Exception {
createIfNeed(TMP_PATH);
NodeCache nodeCache = new NodeCache(client, TMP_PATH);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println(TMP_PATH + " path nodeChanged");
print_GetData();
}
});
nodeCache.start();
Thread.sleep(1000 * 300);
}
// 添加子节点(Child)监听器-永久
@Test
public void b_addWatch_Child() throws Exception {
createIfNeed(TMP_PATH);
PathChildrenCache nodeCache = new PathChildrenCache(client, TMP_PATH, true);
nodeCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework cli, PathChildrenCacheEvent event) throws Exception {
ChildData data = event.getData();
System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));
}
});
nodeCache.start();
Thread.sleep(1000 * 300);
}
// 添加所有子节点(Tree)监听器-永久
@Test
public void testTreeCache() throws Exception {
createIfNeed(TMP_PATH);
TreeCache treeCache = new TreeCache(client, TMP_PATH);
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
System.out.println(" path nodeChanged" + data.getPath() + " type = " + event.getType() + " val = " + new String(data.getData()));
}
});
treeCache.start();
Thread.sleep(1000 * 300);
}
private void createIfNeed(String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
System.out.println(stat);
if (stat == null) {
String pathResult = client.create().withMode(CreateMode.PERSISTENT).forPath(path, "kk".getBytes());
System.out.println("同步创建node成功, path = " + path + " result = " + pathResult);
}
}
public void print_GetData() throws Exception {
byte[] bytes = client.getData().forPath(TMP_PATH);
System.out.println("查询node成功, path = " + TMP_PATH + " result = " + new String(bytes));
}
}
public class IDMaker{
private static CuratorFramework client;
private final static String CLUSTER_CONNECT_STR="192.168.6.128:2181,192.168.6.128:2182,192.168.6.128:2183";
// 初始化客户端
static {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString(CLUSTER_CONNECT_STR)
.sessionTimeoutMs(5000) // 会话超时时间
.connectionTimeoutMs(5000) // 连接超时时间
.retryPolicy(retryPolicy)
.namespace("Curator-test") // 包含隔离名称
.build();
client.start();
}
// 创建临时顺序节点
private String createSeqNode(String pathPefix) throws Exception {
//创建一个临时顺序节点
String destPath = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(pathPefix);
return destPath;
}
// 生成分布式id
public String makeId(String path) throws Exception {
String str = createSeqNode(path);
if(null != str){
//获取末尾的序号
int index = str.lastIndexOf(path);
if(index>=0){
index+=path.length();
return index<=str.length() ? str.substring(index):"";
}
}
return str;
}
// 测试-多线程批量生成id
public static void main(String[] args) throws Exception {
String path = "/idmarker/id-";
IDMaker idMaker = new IDMaker();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (int j = 0; j < 10; j++) {
try {
String id = idMaker.makeId(path);
System.out.println(Thread.currentThread().getName() + " 第" + j + "生产的id = " +id);
} catch (Exception e) {
System.err.println(e);
}
}
}, "thread-" + i).start();
}
Thread.sleep(1000 * 300);
}
}
==