🌈🌈🌈🌈🌈🌈🌈🌈
欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术
的推送
发送 资料
可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景
、中间件系列笔记
和编程高频电子书
!
文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁
先看使用示范:
public class ReadWriteLockDemo {
public static void main(String[] args) throws Exception {
// 创建一个 CuratorFramework 实例
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
5000, // 客户端跟 zk 进行心跳,如果心跳断开超过 5 秒,绘画就会断开
3000, // 连接 zk 的超时时间
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(
client,
"/locks/lock");
// 写锁
InterProcessMutex writeLock = readWriteLock.writeLock();
writeLock.acquire();
writeLock.release();
// 读锁
InterProcessMutex readLock = readWriteLock.readLock();
readLock.acquire();
readLock.release();
}
}
Curator 如何实现已有写锁之后无法加读锁?
读写锁的特性就是,读锁之间不互斥,而读锁和写锁之间互斥
读锁
时,如果排在第一个写锁前,就直接加锁,否则,监听第一个写锁写锁
时,如果是第一个则直接加锁,否则对前一个节点添加监听其实在 Curator 中添加读写锁的时候,会去我们指定的目录 /locks/lock
中去创建读锁和写锁的节点,读锁是以 __READ__
开头,写锁以 __WRIT__
开头,在尝试加锁时,会将该目录下的所有节点排序,去判断是否 符合获取锁的要求
,拿下边举个例子:
读锁
没办法获取锁,需要等待第 3 个 写锁
释放后,才可以拿到锁读锁
需要等待前边两个 写锁
释放才可以拿到Curator 读写锁针对 羊群效应
的优化:
首先说一下 羊群效应
是什么:
就比如多个机器对同一个节点进行加锁,加锁顺序为:[锁,锁,锁,锁],如果后边所有的锁都对第一把锁进行监听,当第一把锁释放了之后,会去通知后边所有机器可以去竞争锁了,这样存在一个问题就是如果有几十个节点竞争同一把分布式锁,那么[锁,锁,锁…],几十个节点都去监听第一把锁,当第一把锁释放后,需要去通知很多节点,这会导致 网络瞬时流量
很高,这就是 羊群效应
Curator 是怎么解决羊群效应的呢?
普通互斥锁
来说,节点排序之后为:[锁,锁,锁,锁],每把锁都去 监听自己的上一个节点
,即第二把锁监听第一把锁,第三把锁监听第二把锁,从而避免羊群效应读写锁
来说,节点排序之后为:[读锁,写锁,读锁,读锁],每把读锁都去 监听第一个写锁
,每把写锁都监听前一个节点,这样避免羊群效应Curator 读写锁目前 仍然存在羊群效应
以及优化思路:
假如节点排序之后为:[写锁,写锁,读锁,读锁,读锁,写锁,读锁,读锁]
如果所有读锁都去 监听第一个写锁
,那么第一个写锁释放后会去通知大量的读锁
优化思路
:让读锁都去监听离自己最近的一个写锁,如中间的 3 个读锁去监听第 2 个写锁,最后的 2 个读锁去监听第 6 个写锁
面试中如何聊分布式锁?
在面试中
,和面试官聊分布式锁,可以从 Curator 源码实现思路出发,以及存在问题
最后总结一下,Curator 获取读写锁的思路也很简单
读锁
,就去判断前边是否有写锁,如果有写锁,就 监听
这把写锁并且 阻塞
等待写锁释放,否则可以直接获取锁写锁
,判断前边是否有读锁,如果有读锁,就 监听
这把读锁并且 阻塞
等待读锁释放,否则可以直接获取锁获取读锁和写锁都是在指定的目录中创建 临时顺序节点
那么存在的问题就是 羊群效应
,再说一下 Curator 如何解决羊群效应以及仍然存在羊群效应,如何再进一步优化!
这里由于 ZooKeeper 版本使用的 3.4.5,因此 Curator 框架的版本使用的 2.4.2,虽然版本有些变动,但是建立连接这些代码变动不大
为什么要学习 Curator 客户端框架源码呢?
因为在 ZooKeeper 中其实是提供了一套 原生的客户端框架
,基本的功能都具有但是使用起来比较复杂
Curator 就是对 zk 原生的客户端工具进行了封装,再向外提供更好用的 API,以及更加强大的功能如 Leader 选举
如果只是光会使用 Curator,那么当去看一些开源项目的时候,他们其实使用的是 zk 的原生客户端工具,那可能看起来就比较困难了
另一方面是通过研究源码,可以了解到一些高阶的功能如 分布式锁
是如何通过最底层的 zk 操作来实现的,并且在实际生产环境中,如果碰到了问题,可以很快定位到问题所在
接下来从 Curator 客户端如何与 zk 建立连接进行源码分析
创建 Curator 客户端连接工具的代码如下:
// 创建一个 CuratorFramework 实例
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181", // ZooKeeper 服务器地址
5000, // 客户端跟 zk 进行心跳,如果心跳断开超过 5 秒,绘画就会断开
3000, // 连接 zk 的超时时间
new ExponentialBackoffRetry(1000, 3) // 重试策略,1秒后重试,最多重试3次
);
// 启动客户端
client.start();
点进去这个 newClient
方法,这里其实用到了 构造器模式
,通过 builder()
先去创建了一个 Builder
的构造器对象,再通过这个构造器对象去创建一个客户端实例
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
{
// 使用了构造器模式
return builder().
connectString(connectString).
sessionTimeoutMs(sessionTimeoutMs).
connectionTimeoutMs(connectionTimeoutMs).
retryPolicy(retryPolicy).
build();
}
也就是通过 build()
最后去创建实例对象,在 build() 方法中,其实就是去创建了 CuratorFrameworkImpl
这个实例对象
public CuratorFramework build()
{
// 创建 CuratorFrameworkImpl 对象
return new CuratorFrameworkImpl(this);
}
Curator 其实就是基于 ZooKeeper 原生的 API 进行封装的,我们可以找一下 封装的原生客户端工具
到底在哪里
点击进去 CuratorFrameworkImpl
构造方法中,先去创建了 ZookeeperFactory 这个对象
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
// 创建 ZookeeperFactory 对象
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
new Watcher()
{
// ... 太长省略
},
builder.getRetryPolicy(),
builder.canBeReadOnly()
);
// 构造一些对象 ...
}
那么在这个 makeZookeeperFactory
方法中其实就是封装了 ZooKeeper 中原生的客户端对象
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
{
@Override
// 这里就创建了原生的 ZooKeeper 对象
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
AuthInfo auth = authInfo.get();
if ( auth != null )
{
zooKeeper.addAuthInfo(auth.scheme, auth.auth);
}
return zooKeeper;
}
};
}
到此,CuratorFramework
实例对象就创建完成了,接下来通过 start
方法,来启动客户端
// 启动客户端
client.start();
进入 start 方法
@Override
public void start()
{
// ...
try
{
// 启动一个线程,通过 while 循环不断去 eventQueue 中取事件,这个事件就是客户端跟 zk 之间发生了网络连接变化的事件,并且去逐个调用监听器中的方法 stateChanged
connectionStateManager.start();
client.start();
executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops
// 再启动一个线程
executorService.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
// 负责处理后台操作队列中的任务,会尝试去操作队列中获取任务:OperationAndData,再去执行任务,包括创建节点、删除节点等等,并且如果客户端与 zk 的连接断开,在这里还会尝试重连操作
backgroundOperationsLoop();
return null;
}
}
);
}
catch ( Exception e )
{
handleBackgroundOperationException(null, e);
}
}
进入到 backgroundOperationsLoop
方法
private void backgroundOperationsLoop()
{
while ( !Thread.interrupted() )
{
OperationAndData<?> operationAndData;
try
{
// 取出操作任务
operationAndData = backgroundOperations.take();
// ...
}
// ...
// 执行任务
performBackgroundOperation(operationAndData);
}
}
进入到执行任务的 performBackgroundOperation
方法,在这里如果正常连接,就执行取出来的操作任务,否则就去尝试重新连接
private void performBackgroundOperation(OperationAndData<?> operationAndData)
{
try
{
if ( client.isConnected() )
{
// 如果客户端正常连接 zk,就执行操作
operationAndData.callPerformBackgroundOperation();
}
else
{
// 否则,在这里会进行重连
client.getZooKeeper();
// 如果连接超时,就抛出异常
if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
{
throw new CuratorConnectionLossException();
}
// 如果还没有
operationAndData.sleepFor(1, TimeUnit.SECONDS);
// 如果没有超时,则推入到 forcedSleepOperations 强制睡眠后等待重连
queueOperation(operationAndData);
}
}
catch ( Throwable e )
{
// 【连接丢失】异常的处理
if ( e instanceof CuratorConnectionLossException )
{
//
WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null);
if ( checkBackgroundRetry(operationAndData, event) )
{
// 推送到 backgroundOperations 队列尝试重连
queueOperation(operationAndData);
}
else
{
logError("Background retry gave up", e);
}
}
else
{
handleBackgroundOperationException(operationAndData, e);
}
}
}
为什么 zk 中不能采用相对路径来查找节点呢?
由 zk 底层节点存储为了高性能的设计造成,因为 zk 的应用场景主要是直接定位 znode 节点,那么最适合的数据模型就是 散列表
,因此在 zk 底层实现的时候,使用到了 hashtable
,使用节点的 完整路径
来作为 key,因此无法通过相对路径来查找对应的节点
经典面试问题:为什么要使用分布式锁不用数据库的行锁呢?
还是拿电商场景举例,业务系统多机部署,多个系统同时收到对同一个数据更新的请求,而这些数据可能在数据库、缓存集群、ES 集群都同时存储了一份,如果只使用数据库的 行锁
只能保证对数据库的操作是并发安全的,但是对其他缓存操作还是会出现问题,因此多个系统一定要去使用分布式锁,流程如下: