在Java并发编程中,我们经常听到AQS(AbstractQueuedSynchronizer)这个概念,它是Java中锁的核心之一。本文将深入介绍AQS,通过提出一系列问题,带你深入了解AQS的定义、实现、资源获取方式以及应用场景。
带着问题阅读
- 什么是AQS? 为什么它是锁核心?
- AQS是如何实现的?
- AQS定义了什么样的资源获取方式?
AbstractQueuedSynchronizer
提供了一套可用于实现锁同步机制的框架,不夸张地说,AQS
是JUC
同步框架的基石。AQS
通过一个FIFO
队列维护线程同步状态,实现类只需要继承该类,并重写指定方法即可实现一套线程同步机制。
AQS
根据资源互斥级别提供了独占和共享两种资源访问模式;同时其定义Condition
结构提供了wait/signal
等待唤醒机制。在JUC
中,诸如ReentrantLock
、CountDownLatch
等都基于AQS
实现。
AQS原理(独占式)
AQS有三个重要的字段,分别是: head 头节点、tail 尾节点、state 同步状态
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 同步状态
private volatile int state;
// todo volatile 是保证内存的可进行
// ......
}
AQS内部使用一个volatile int state
来表示同步状态,该状态用于标识同步资源的可用数量或者被占用的情况。例如:独占锁中,state
通常表示锁的持有次数,可以为0表示未被占用,大于0表示已被占用。对于共享锁,state
通常表示共享资源的数量。
AQS本身就是个双向链表,head和tail 就是保存的等待队列的头尾结点。
当某个线程获取资源失败时,会被构建成节点加入AQS中
static final class Node {
// 节点状态
volatile int waitStatus;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 当前节点所代表的线程
volatile Thread thread;
// 等待队列使用时的后继节点指针
Node nextWaiter;
}
以下是waitStatus节点状态的取值及各个状态的含义:
CANCELLED
(1):表示节点因为超时或者被中断而被取消。当一个线程在等待队列中等待的时候,如果发生了超时或者线程被中断,节点的状态会被设置为 CANCELLED
。SIGNAL
(-1):表示后继节点会被阻塞,当前节点需要唤醒后继节点。在独占锁的等待队列中,前一个持有锁的线程释放锁时,会将后继节点的 waitStatus
设置为 SIGNAL
,以通知后继节点可以尝试获取锁了。CONDITION
(-2):表示节点在等待队列中,等待条件变量。当一个节点通过 Condition
进入等待队列时,其 waitStatus
会被设置为 CONDITION
。PROPAGATE
(-3):用于共享模式。表示释放锁时,如果发现有后继节点需要被唤醒,会将后继节点的 waitStatus
设置为 PROPAGATE
。0
:表示初始状态,或者表示节点在共享模式下,后继节点在释放锁时需要被唤醒。有关共享模式,下文会介绍。
AQS中还有另外一个内部类ConditionObject
用于实现等待队列/条件队列。
// 实现Condition接口
public class ConditionObject implements Condition, java.io.Serializable {
// 条件队列的头节点
private transient Node firstWaiter;
// 条件队列的尾节点
private transient Node lastWaiter;
// 创建条件队列的节点
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点被取消,清理掉取消节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// transient修饰的变量在对象被序列化时会被忽略,不会被写入到序列化的数据流中。
// ......
}
这里主要分析 ConditionObject
中的一些关键部分:
firstWaiter
和 lastWaiter
: 这两个字段分别表示条件队列的头节点和尾节点。当线程调用 Condition.await()
进入等待状态时,会创建一个节点,并加入到条件队列中。firstWaiter
指向队列的头部,而 lastWaiter
指向队列的尾部。addConditionWaiter()
方法: 这个方法用于在条件队列中创建一个节点。在创建节点之前,会检查队列尾部是否存在已经取消的节点,如果有的话会清理掉这些取消节点。然后创建一个新的节点,并将其加入到队列的尾部。这个方法在 Condition.await()
中调用,用于创建等待节点。tryAcquire(int); // 尝试获取独占锁,可获取返回true,否则false
tryRelease(int); // 尝试释放独占锁,可释放返回true,否则false
tryAcquireShared(int); // 尝试以共享方式获取锁,失败返回负数,只能获取一次返回0,否则返回个数
tryReleaseShared(int); // 尝试释放共享锁,可获取返回true,否则false
isHeldExclusively(); // 判断线程是否独占资源
如实现类只需实现独占锁/共享锁功能,可只实现tryAcquire/tryRelease
或tryAcquireShared/tryReleaseShared
。虽然实现tryAcquire/tryRelease
可自行设定逻辑,但建议使用state
方法对state
变量进行操作以实现同步类。
以下是实现一个独占锁示例(与上述原理图对应)
public class CustomExclusiveLock extends AbstractQueuedSynchronizer {
// 实现AQS中的抽象方法
@Override
protected boolean isHeldExclusively() {
// 判断当前线程是否持有独占锁
return getExclusiveOwnerThread() == Thread.currentThread();
}
@Override
protected boolean tryAcquire(int arg) {
// 尝试获取独占锁,返回 true 表示获取成功
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0){
throw new IllegalArgumentException("Lock not held");
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 提供外部调用的加锁方法
public void lock() {
acquire(1);
}
// 提供外部调用的解锁方法
public void unlock() {
release(1);
}
public static void main(String[] args) {
CustomExclusiveLock lock = new CustomExclusiveLock();
// 线程1获取锁
new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 1 acquired the lock");
Thread.sleep(2000); // 模拟一些业务处理
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("Thread 1 released the lock");
}
}).start();
// 线程2尝试获取锁(但会被阻塞)
new Thread(() -> {
lock.lock();
try {
System.out.println("Thread 2 acquired the lock");
} finally {
lock.unlock();
System.out.println("Thread 2 released the lock");
}
}).start();
}
}
Connected to the target VM, address: '127.0.0.1:49699', transport: 'socket'
Thread 1 acquired the lock
Thread 1 released the lock
Thread 2 acquired the lock
Thread 2 released the lock
Disconnected from the target VM, address: '127.0.0.1:49699', transport: 'socket'
AQS中可以分为独占、共享模式,其中这两种模式下还可以支持响应中断、纳秒级别超时
独占模式可以理解为同一时间只有一个线程能够获取同步状态
共享模式可以理解为可以有多个线程能够获取同步状态,方法中常用shared
标识
方法中常用acquire
标识获取同步状态,release
标识释放同步状态
独占式实际就是时刻上只允许一个线程独占该资源,多线程竞争情况下也只能有一个线程获取同步状态成功
下面是独占式关键源码
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// ...
/**
* 尝试获取独占锁
*
* @param arg 获取锁所需的参数
* @return 是否成功获取锁
*/
protected abstract boolean tryAcquire(int arg);
/**
* 尝试释放独占锁
*
* @param arg 释放锁所需的参数
* @return 是否成功释放锁
*/
protected abstract boolean tryRelease(int arg);
/**
* 获取独占锁,如果获取失败则进入等待队列
*
* @param arg 获取锁所需的参数
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ...
/**
* 尝试释放独占锁,并唤醒后继节点
*
* @param arg 释放锁所需的参数
* @return 是否成功释放锁
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// ...
}
// addWaiter(Node.EXCLUSIVE) 构建独占式节点,并用CAS+失败重试的方式加入AQS的末尾
private Node addWaiter(Node mode) {
//构建节点
Node node = new Node(Thread.currentThread(), mode);
//获取当前的尾节点,获取当前等待队列的尾节点,用于后续将新节点添加到尾部。
Node pred = tail;
if (pred != null) {
// 将新节点的前驱设置为当前尾节点,建立节点间的双向链表关系
node.prev = pred;
//compareAndSetTail 通过 CAS(Compare And Swap)操作尝试将新节点设置为新的尾节点。如果成成功,表示新节点已成功添加到队列尾部。如果失败说明在这个过程中有其他线程修改了尾节点,需要重新进入 enq 流程。
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾节点为空或则CAS失败执行enq
enq(node);
return node;
}
// enq方法主要以自旋(中途不会进入等待模式)去CAS设置尾节点,如果AQS中没有节点则头尾节点为同一节点
private Node enq(final Node node) {
//失败重试
for (;;) {
// 获取当前尾节点(Node t = tail;)
Node t = tail;
//没有尾节点 则CAS设置头节点(头尾节点为一个节点),否则CAS设置尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 通过 CAS 操作尝试将新节点设置为新的尾节点。如果成功,表示新节点已成功添加到队列尾部;如果失败,说明在这个过程中有其他线程修改了尾节点,需要重新进行循环。
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在这段代码中:
tryAcquire
和 tryRelease
是需要具体的子类去实现的,用于定制独占锁的获取和释放逻辑。acquire
方法是获取独占锁的核心方法,它首先尝试通过 tryAcquire
获取锁,如果失败则将线程加入等待队列,最终调用 acquireQueued
进入等待状态。release
方法用于释放锁,它先调用 tryRelease
尝试释放锁,成功后再唤醒后继节点。以下是
acquire
方法的执行过程
享式就是允许多个线程同时获取一定的资源,比如信号量、读锁就是用共享式实现的,其实共享式与独占式流程类似,只是尝试获取同步状态的实现不同,我们用个获取同步状态的方法来说明
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
// 用于尝试获取共享资源。如果返回值为负数,表示获取失败;如果为零或正数,表示获取成功,返回的值表示成功获取的资源数量。
protected int tryAcquireShared(int arg) {
// 子类实现获取共享资源的逻辑
// 成功获取资源返回正数或零,失败返回负数
throw new UnsupportedOperationException();
}
// 用于释放共享资源。如果返回值为 true,表示释放成功;如果为 false,表示释放失败。
protected boolean tryReleaseShared(int arg) {
// 子类实现释放共享资源的逻辑
// 成功释放返回true,失败返回false
throw new UnsupportedOperationException();
}
// 用于获取共享资源,如果获取失败则将当前线程加入等待队列。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 获取失败,加入等待队列
doAcquireShared(arg);
}
// 用于释放共享资源,并唤醒等待队列中的线程。如果释放成功,则返回 true;如果释放失败,则返回 false。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 释放成功,唤醒等待队列中的线程
doReleaseShared();
return true;
}
return false;
}
// 当 acquireShared 方法中的 tryAcquireShared 返回负数时,会调用此方法将线程加入等待队列。
private void doAcquireShared(int arg) {
//添加共享式节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 获取前驱节点
final Node p = node.predecessor();
// 如果前驱节点为头节点 并且 获取同步状态成功 设置头节点
if (p == head && tryAcquireShared(arg) >= 0) {
setHeadAndPropagate(node, arg);
p.next = null; // help GC
failed = false;
return;
}
//获取失败进入会等待的自旋
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 当 releaseShared 方法中的 tryReleaseShared 返回 true 时,会调用此方法唤醒等待队列中的线程。
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果头节点的状态为 SIGNAL,表示有等待线程,唤醒一个
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// ......
}
AQS如何保证原子性、可见性:
?AQS使用头尾节点来实现双向队列,提供同步状态和获取/释放同步状态的模板方法来实现阻塞(同步)队列,并且这些字段使用volatile修饰,保证可见性与读取的场景配合,不需要保证原子性,在写的场景下常用CAS保证原子性。AQS充当阻塞队列,Condition充当它的等待队列来实现等待/通知模式,AQS的内部类ConditionObject在await时会加入Condition末尾并释放同步状态进入等待队列,在被唤醒后自旋(失败会进入等待)获取同步状态;在single时会CAS的将condition头节点并加入AQS尾部再去唤醒(因为一个AQS可能对应多个Condition因此要CAS保证原子性)
AQS获取资源的方式:
AQS分为独占式和共享式,使用独占式时只允许一个线程获取同步状态,使用共享式时则允许多个线程获取同步状态;其中还提供响应中断、等待超时的类似方法