👏作者简介:大家好,我是若明天不见,BAT的Java高级开发工程师,CSDN博客专家,后端领域优质创作者
📕系列专栏:多线程及高并发系列
📕其他专栏:微服务框架系列、MySQL系列、Redis系列、Leetcode算法系列、GraphQL系列
📜如果感觉博主的文章还不错的话,请👍点赞收藏关注👍支持一下博主哦??
?时间是条环形跑道,万物终将归零,亦得以圆全完美
多线程及高并发系列
AQS 核心思想:
CLH 锁算法的核心思想是使用一个虚拟的双向链表来表示等待队列,每个等待线程都有一个对应的节点。每个节点都有一个状态标志位,用于表示线程是否已经获得锁。当线程尝试获取锁时,会通过不断自旋的方式检查前驱节点的状态,直到前驱节点释放锁
AQS 核心思想总结:
CAS(Compare and Swap)
操作来确保状态的原子更新。这样可以避免使用锁或其他同步机制,提高并发性能AQS 核心数据结构 CLH 锁的详细解读可见Java AQS 核心数据结构-CLH 锁
AQS 使用了模板方法的设计模式,使用者继承AbstractQueuedSynchronizer
并重写指定的方法,可以定制化同步器的行为,适应不同的并发场景
基本方法如下:
方法名 | 描述 |
---|---|
protected boolean isHeldExclusively() | 该线程是否正在独占资源。只有用到Condition才需要去实现它。 |
protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
自定义同步器实现的相关方法,并通过修改State字段来实现多线程的独占模式或者共享模式
AQS 使用int 成员变量state
表示同步状态,通过内置的FIFO 线程等待/等待队列来完成获取资源线程的排队工作。本质上是 CAS + volatile
state
变量由volatile
修饰,用于展示当前临界资源的获锁情况
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;
状态信息 state 可以通过protected final
类型的getState()、setState()和compareAndSetState() 进行操作。并且,这几个方法都是 final 修饰的,在子类中无法被重写。
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS 提供了对资源的共享方式的支持,可以通过继承AQS来实现不同类型的共享同步器。AQS支持两种基本的资源共享方式:独占模式(Exclusive mode)和共享模式(Shared mode)
tryAcquire
和tryRelease
方法来实现独占模式的同步逻辑,如ReentrantLock
tryAcquireShared
和tryReleaseShared
方法来实现共享模式的同步逻辑,如CountDownLatch
和Semaphore
AQS并不直接限制资源的共享方式,而是提供了底层的同步原语和模板方法,使开发者能够根据具体需求实现自定义的同步器。通过重写AQS的模板方法,可以实现不同类型的资源共享方式,包括独占模式和共享模式,以满足不同的并发场景和需求
此段修改自AbstractQueuedSynchronizer数据结构,详细源码分析可参考此文
AbstractQueuedSynchronizer
类底层的数据结构是使用CLH(Craig,Landin,and Hagersten)
队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。
AQS是将每条请求共享资源的线程封装成一个CLH
锁队列的一个结点(Node)来实现锁的分配
Sync queue
,即同步队列,是双向链表,包括head
结点和tail
结点,head
结点主要用作后续的调度Condition queue
不是必须的,其是一个单向链表,只有当使用Condition
时,才会存在此单向链表。并且可能会有多个Condition queue
AbstractQueuedSynchronizer
继承自AbstractOwnableSynchronizer
抽象类,并且实现了Serializable接口,可以进行序列化
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable
AbstractOwnableSynchronizer
抽象类的源码:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 版本序列号
private static final long serialVersionUID = 3737899427754241961L;
// 构造方法
protected AbstractOwnableSynchronizer() { }
// 独占模式下的线程
private transient Thread exclusiveOwnerThread;
// 设置独占线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 获取独占线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractOwnableSynchronizer抽象类中,可以设置独占资源线程和获取独占资源线程。分别为setExclusiveOwnerThread
与getExclusiveOwnerThread
方法,这两个方法会被子类调用。
AbstractQueuedSynchronizer
类有两个内部类,分别为Node
类与ConditionObject
类。下面分别做介绍
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造方法
Node() { // Used to establish initial head or SHARED marker
}
// 构造方法
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造方法
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
每个线程被阻塞的线程都会被封装成一个Node结点,放入队列。每个节点包含了一个Thread类型的引用,并且每个节点都存在一个状态,具体状态如下:
结点状态 | 结点状态值 | 含义 |
---|---|---|
CANCELLED | 1 | 当前的线程被取消 |
SIGNAL | -1 | 当前节点的后继节点包含的线程需要运行,需要进行unpark 操作 |
CONDITION | -2 | 当前节点在等待condition ,也就是在condition queue 中 |
PROPAGATE | -3 | 当前场景下后续的acquireShared 能够得以执行 |
- | 0 | 当前节点在sync queue 中,等待获取锁 |
在 AQS 中,
ConditionObject
是用于支持条件变量的内部类。它实现了Condition
接口,并提供了条件等待和通知的功能
ConditionObject
的作用是允许线程在满足特定条件之前等待,并在条件满足时被其他线程通知。它为多线程之间的协调提供了一种机制
public interface Condition {
// 使当前线程进入等待状态,直到接收到信号或被中断。在等待期间,当前线程会释放锁,并加入到条件等待队列中
void await() throws InterruptedException;
// 与await()类似,但是忽略中断的影响,即不会响应中断
void awaitUninterruptibly();
// 使当前线程进入等待状态,直到接收到信号、被中断或等待时间超过指定的时间
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 使当前线程进入等待状态,直到接收到信号、被中断或等待时间超过指定的时间。此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 使当前线程进入等待状态,直到接收到信号、被中断或等待到指定的时间
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待在该条件上的线程,使其从等待状态变为可运行状态
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();
}
ConditionObject
的使用通常需要与Lock
接口配合使用,通过调用Lock
的newCondition()
方法获取一个Condition
对象。**在调用条件对象的等待和通知方法之前,需要先获取相应的锁。**通过使用条件变量,线程可以更精确地控制线程的等待和唤醒,以及执行特定的操作序列
AQS(AbstractQueuedSynchronizer)中的acquire
方法是用于获取同步状态(获取锁)的核心方法
tryAcquire
方法尝试获取同步状态tryAcquire
方法返回失败(负值),则调用addWaiter
方法将当前线程包装成一个节点(Node)并加入到等待队列中acquireQueued
方法使线程进入等待状态,直到获取到同步状态或被中断。acquireQueued
方法会不断尝试获取同步状态,如果获取失败,则将节点加入到等待队列中,并使线程阻塞。在等待期间,线程会不断自旋尝试获取同步状态tryAcquire
方法尝试获取同步状态。如果成功获取到同步状态,则调用selfInterrupt方法自我中断,以响应中断的请求acquire方法的伪代码:
public void acquire(int arg) {
if (tryAcquire(arg)) {
return;
}
Node node = addWaiter();
for (;;) {
if (acquireQueued(node, arg)) {
selfInterrupt();
return;
}
}
}
acquireQueued - 等待队列中线程出队列时机流程图
AQS(AbstractQueuedSynchronizer)中的release
方法是用于释放同步状态(释放锁)的核心方法。下面详细介绍了release
方法的代码步骤:
tryRelease
方法尝试释放同步状态。tryRelease
方法是一个抽象方法,由子类实现具体的释放逻辑。如果tryRelease
方法返回 true,表示释放同步状态成功,直接返回。tryRelease
方法返回 false,表示释放同步状态失败,此时会抛出IllegalMonitorStateException
异常,表示当前线程未持有该同步状态unparkSuccessor
方法唤醒等待队列中的后继节点。unparkSuccessor
方法会找到等待队列中的下一个有效节点(即状态不为取消的节点),并调用LockSupport.unpark
方法唤醒该节点对应的线程release方法的伪代码
public void release(int arg) {
if (tryRelease(arg)) {
unparkSuccessor(node);
} else {
throw new IllegalMonitorStateException();
}
}
下述代码中,SimpleLock
类包装了一个内部类Sync
,它继承了AbstractQueuedSynchronizer
。Sync
类重写了tryAcquire
、tryRelease
和isHeldExclusively
方法来实现锁的获取、释放和判断当前线程是否持有锁的逻辑
public class SimpleLock {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// 尝试获取锁,如果成功返回true,否则返回false
return compareAndSetState(0, 1);
}
@Override
protected boolean tryRelease(int arg) {
// 尝试释放锁,如果成功返回true,否则返回false
setState(0);
return true;
}
@Override
protected boolean isHeldExclusively() {
// 判断当前线程是否持有锁
return getState() == 1;
}
final boolean isLocked() {
// 是否已上锁
return getState() != 0;
}
}
}
使用实例:
public class Main {
private static final SimpleLock lock = new SimpleLock();
public static void main(String[] args) {
// 创建两个线程并启动
Thread thread1 = new Thread(() -> {
lock.lock();
try {
// 执行需要互斥的代码块
System.out.println("Thread 1: Lock acquired");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 1: Lock released");
}
});
Thread thread2 = new Thread(() -> {
lock.lock();
try {
// 执行需要互斥的代码块
System.out.println("Thread 2: Lock acquired");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 2: Lock released");
}
});
thread1.start();
thread2.start();
}
}
这只是一个简单的示例,实际上,AQS的应用要更加复杂。在实际场景中,你可能需要考虑更多的因素,例如可重入性、公平性等。如果需要在生产环境中使用锁,优先考虑使用Java标准库提供的java.util.concurrent.locks
包中的锁实现,如ReentrantLock
ReentrantLock 提供了可见性的保证,即当一个线程释放锁时,会将对变量的修改刷新到主内存中,以便其他线程获取锁后能够看到最新的值。
如果需要在锁范围外对成员变量的修改对其他线程可见,可以将该变量声明为
volatile
ReentrantLock可以以公平锁(fair lock)模式或非公平锁(nonfair lock)模式运行。这两种模式的区别在于线程获取锁的顺序。
非公平锁尝试获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部;公平锁则
//
static final class NonfairSync extends Sync {
// 版本号
private static final long serialVersionUID = 7316153563782823691L;
// 获得锁
final void lock() {
if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用
// 把当前线程设置独占了锁
setExclusiveOwnerThread(Thread.currentThread());
else // 锁已经被占用,或者set失败
// 以独占模式获取对象,忽略中断
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
ReentrantLock
的加锁方法lock
进行加锁操作Sync
的lock
方法,由于Sync#lock
是抽象方法,根据ReentrantLock
初始化选择的公平锁和非公平锁,执行相关内部类的lock
方法,本质上都会执行AQS的acquire
方法acquire
方法会执行tryAcquire
方法,因此执行了自定义同步器ReentrantLock
中的tryAcquire
方法,而ReentrantLock
根据锁类型不同执行不同的tryAcquire
tryAcquire
是获取锁逻辑,获取失败后,会执行框架 AQS 的后续逻辑,跟ReentrantLock
自定义同步器无关public static void main(String[] args) {
char[] a = "ABCDE".toCharArray();
char[] b = "12345".toCharArray();
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
try {
for (char c : a) {
System.out.print(c);
condition.signal();
condition.await();
}
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t1").start();
new Thread(() -> {
lock.lock();
try {
for (char c : b) {
System.out.print(c);
condition.signal();
condition.await();
}
condition.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "t2").start();
}
参考资料: