更好的阅读体验 \huge{\color{red}{更好的阅读体验}} 更好的阅读体验
ZooKeeper 是 Apache 软件基金会的一个软件项目,它为大型分布式计算提供开源的分布式配置服务、同步服务和命名注册,在架构上,通过冗余服务实现高可用性(CP)。
ZooKeeper 的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的原语集,并以一系列简单易用的接口提供给用户使用。
ZooKeeper 本身是一个树形目录服务(名称空间),非常类似于标准文件系统,key-value
的形式存储。名称 key
由斜线 /
分
割的一系列路径元素,例如:/node
,ZooKeeper 名称空间中的每个节点都是由一个路径来标识的。
注意:
key
(完整路径,名称)是唯一的,即同一级节点 key
名称是唯一的value
和对应的状态属性,其中属性可能有多个节点类型:
节点操作的基础命令:
ls
:查看某个路径下目录列表,可选参数 -s 返回状态信息, -w 监听节点变化,-R 递归查看某路径下目录列表create
:创建节点并赋值,可选参数和节点的类型相照应,注意临时节点不能创建子节点set
:修改节点存储的数据get
:获取节点数据和状态信息,可选参数 -s 返回状态信息, -w 返回数据并对对节点进行事件监听stat
:查看节点状态信息,也可选 -w 参数delete/deleteall
:删除某节点,如果某节点不为空,则不能用 delete
命令删除注意:-w 监听节点只能生效一次,在节点信息变化后返回变化信息并失效
ZooKeeper 实现简单的分布式锁:
但是上述方式存在问题——羊群效应:
为了解决上面产生的问题,我们给出更为完善的方案:
基于上述解决方案,我们再将临时顺序节点的创建进行细分,分为分为读锁节点和写锁节点:
这样就基于 ZooKeeper 实现了共享锁和排他锁,在使用时,我们一般利用 Curator 客户端实现:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class DistributedLockDemo {
// ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
private final String lockPath = "/distributed-lock";
// ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),
// 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
private String connectString;
// Curator 客户端重试策略
private RetryPolicy retry;
// Curator 客户端对象
private CuratorFramework client;
// client2 用户模拟其他客户端
private CuratorFramework client2;
// 初始化资源
@Before
public void init() throws Exception {
// 设置 ZooKeeper 服务地址为本机的 2181 端口
connectString = "192.168.200.168:2181";
// 重试策略
// 初始休眠时间为 1000ms, 最大重试次数为 3
retry = new ExponentialBackoffRetry(1000, 3);
// 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
// 创建会话
client.start();
client2.start();
}
// 释放资源
@After
public void close() {
CloseableUtils.closeQuietly(client);
}
@Test
public void sharedLock() throws Exception {
// 创建共享锁
final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
// lock2 用于模拟其他客户端
final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock.acquire();
System.out.println("======== client1 get lock ========");
// 测试锁重入
Thread.sleep(5 * 1000);
lock.release();
System.out.println("======== client1 release lock ========");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
// 获取锁对象
try {
lock2.acquire();
System.out.println("======== client2 get lock ========");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("======== client2 release lock ========");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(20 * 1000);
}
}
InterProcessMutex
:分布式可重入排它锁(可重入可以借助 LocalMap
存计数器)InterProcessSemaphoreMutex
:分布式排它锁InterProcessMultiLock
:将多个锁作为单个实体管理的容器InterProcessReadWriteLock
:分布式读写锁对于搭建 ZooKeeper 集群的节点往往采用奇数个:
综上可知,搭建集群所需的最少节点配置为 3,如果是 4 台,则发生脑裂时会造成没有 leader 节点的错误。
ZooKeeper 采用的是基于 Paxos 算法的 ZAB 协议,这里先提一下 Paxos 算法:
ZAB 协议在 Paxos 算法基础上进行了扩展,全称为原子消息广播协议(ZooKeeper Atomic Broadcast):
我们查看 ZooKeeper 的源码,在 FastLeaderElection.java
中:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug(
"id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
newId,
curId,
Long.toHexString(newZxid),
Long.toHexString(curZxid));
if (self.getQuorumVerifier().getWeight(newId) == 0) {
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
/*
* 对应上面代码的解释(两个节点之间使用比较的方法来决定选票给谁,三种比较规则)
* 1- 比较 epoche(zxid高32bit):
* 如果其他节点的epoche比自己的大,选举 epoch大的节点(理由:epoch 表示年代【投票次数越多,数据越新】,epoch越大表示数据越新)
* 代码:(newEpoch > curEpoch);
* 2- 比较 zxid,:
* 如果纪元相同,就比较两个节点的zxid的大小,选举 zxid大的节点(理由:zxid 表示节点所提交事务最大的id,zxid越大代表该节点的数据越完整)
* 代码:(newEpoch == curEpoch) && (newZxid > curZxid);
* 3- 比较 serviceId:
* 如果 epoch和zxid都相等,就比较服务的serverId,选举 serviceId大的节点(理由: serviceId 表示机器性能,他是在配置zookeeper集群时确定的,所以我们配置zookeeper集群的时候可以把服务性能更高的集群的serverId设置大些,让性能好的机器担任leader角色)
* 代码 :(newEpoch == curEpoch) && ((newZxid == curZxid) && (newId > curId))。
*/
return ((newEpoch > curEpoch)
|| ((newEpoch == curEpoch)
&& ((newZxid > curZxid)
|| ((newZxid == curZxid)
&& (newId > curId)))));
}
读请求:
写请求-Leader:
写请求-Follower: