引入有依赖
< dependency>
< groupId> org.apache.zookeeper</ groupId>
< artifactId> zookeeper</ artifactId>
< version> 3.8.3</ version>
</ dependency>
创建连接
static ZooKeeper init ( ) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch ( 1 ) ;
ZooKeeper zooKeeper = new ZooKeeper ( "192.168.4.64:2181" , 60000 , watchedEvent -> {
if ( Watcher. Event. KeeperState. SyncConnected == watchedEvent. getState ( ) ) {
countDownLatch. countDown ( ) ;
}
} ) ;
System . out. println ( zooKeeper. getState ( ) ) ;
countDownLatch. await ( ) ;
System . out. println ( zooKeeper. getState ( ) ) ;
return zooKeeper ;
}
创建节点
@Test
public void create ( ) throws Exception {
String path = zooKeeper. create ( ZK_NODE , "dataSync" . getBytes ( ) , ZooDefs. Ids . OPEN_ACL_UNSAFE , CreateMode . PERSISTENT ) ;
log. info ( "created path: {}" , path) ;
}
删除节点
@Test
public void delete ( ) throws Exception {
Stat stat = zooKeeper. exists ( ZK_NODE , false ) ;
if ( stat != null ) {
zooKeeper. delete ( ZK_NODE , stat. getVersion ( ) ) ;
log. info ( "节点删除成功" ) ;
} else {
log. info ( "节点不存在" ) ;
}
}
修改节点
@Test
public void update ( ) throws Exception {
Stat stat = new Stat ( ) ;
byte [ ] data = zooKeeper. getData ( ZK_NODE , false , stat) ;
log. info ( "修改前: {}" , new String ( data) ) ;
zooKeeper. setData ( ZK_NODE , "changed!" . getBytes ( ) , stat. getVersion ( ) ) ;
byte [ ] dataAfter = zooKeeper. getData ( ZK_NODE , false , stat) ;
log. info ( "修改后: {}" , new String ( dataAfter) ) ;
}
查询节点
@Test
public void query ( ) throws Exception {
List < String > children = zooKeeper. getChildren ( "/config" , true ) ;
log. info ( "{}" , children) ;
Stat stat = new Stat ( ) ;
byte [ ] data = zooKeeper. getData ( ZK_NODE , true , stat) ;
log. info ( "data : {}" , new String ( data) ) ;
}
完整示例
@Slf4j
public class ZkClientTest {
private static final String ZK_NODE = "/test" ;
private static ZooKeeper zooKeeper;
@BeforeAll
static void init ( ) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch ( 1 ) ;
zooKeeper = new ZooKeeper ( "192.168.4.64:2181" , 60000 , watchedEvent -> {
if ( Watcher. Event. KeeperState. SyncConnected == watchedEvent. getState ( ) ) {
countDownLatch. countDown ( ) ;
}
} ) ;
System . out. println ( zooKeeper. getState ( ) ) ;
countDownLatch. await ( ) ;
System . out. println ( zooKeeper. getState ( ) ) ;
}
@AfterAll
static void end ( ) throws InterruptedException {
zooKeeper. close ( ) ;
}
@Test
public void create ( ) throws Exception {
String path = zooKeeper. create ( ZK_NODE , "dataSync" . getBytes ( ) , ZooDefs. Ids . OPEN_ACL_UNSAFE , CreateMode . PERSISTENT ) ;
log. info ( "created path: {}" , path) ;
}
@Test
public void delete ( ) throws Exception {
Stat stat = zooKeeper. exists ( ZK_NODE , false ) ;
if ( stat != null ) {
zooKeeper. delete ( ZK_NODE , stat. getVersion ( ) ) ;
log. info ( "节点删除成功" ) ;
} else {
log. info ( "节点不存在" ) ;
}
}
@Test
public void update ( ) throws Exception {
Stat stat = new Stat ( ) ;
byte [ ] data = zooKeeper. getData ( ZK_NODE , false , stat) ;
log. info ( "修改前: {}" , new String ( data) ) ;
zooKeeper. setData ( ZK_NODE , "changed!" . getBytes ( ) , stat. getVersion ( ) ) ;
byte [ ] dataAfter = zooKeeper. getData ( ZK_NODE , false , stat) ;
log. info ( "修改后: {}" , new String ( dataAfter) ) ;
}
@Test
public void query ( ) throws Exception {
List < String > children = zooKeeper. getChildren ( "/config" , true ) ;
log. info ( "{}" , children) ;
Stat stat = new Stat ( ) ;
byte [ ] data = zooKeeper. getData ( ZK_NODE , true , stat) ;
log. info ( "data : {}" , new String ( data) ) ;
}
@Test
public void createAsync ( ) throws Exception {
zooKeeper. create ( ZK_NODE , "dataAsync" . getBytes ( ) , ZooDefs. Ids . OPEN_ACL_UNSAFE , CreateMode . PERSISTENT ,
( rc, path, ctx, name) -> log. info ( "rc {},path {},ctx {},name {}" , rc, path, ctx, name) , "context" ) ;
TimeUnit . SECONDS . sleep ( 5 ) ;
}
}