📕前置笔记:【AQS核心概念与核心类】
AQS是JUC的基石,很多JUC的相关类底层都是通过AQS框架实现的。PS:以下是Java11下的源码。
Lock接口的实现类,基本都是通过聚合了一个【队列同步器AQS】的子类来完成对线程访问的控制,这里以Lock接口的实现类ReentrantLock为例分析AQS源码。
Sync、NonfairSync、FairSync三个内部类的关系:
Lock对外暴露lock和unlock两个操作API,中间是NonfairSync非公平锁和FairSync公平锁这两个内部类,实际上操作的则是sync这个抽象内部类,而sync继承了AQS类。
以下从lock方法开始,分析AQS源码:
//常见的操作:
Lock lock = new ReentrantLock();
lock.lock();
try{
//do Something
} finally{
lock.unlock();
}
点击跟进lock方法源码:
跟进acquire ==> tryAcquire,这里是模板模式:
继续跟进看其在NonfairSync、FairSync里的实现:
可以看到公平锁和非公平锁对lock方法的实现,区别是if条件中多了一个调用,判断有无前置节点(队列中有没线程在等待)。返回true,即当前线程前面有排队线程,false即当前线程就是队列中的头,或者队列为空。公平锁在加锁前调用hasQueuedPredecessors方法判断等待队列中是否存在有效的Node节点:
有了这个判断:
简言之,非公平锁的lock方法,直接是CAS抢,抢到了直接干活儿,抢不到再acquire排队。
从上面对lock方法的大致分析可以看到:不管公平锁非公平锁,底层都是AQS类的acquire方法,传入1。acquire方法细分为三个步骤:
if体的selfInterrupt方法没啥好说的:
以下分别对三个步骤里的方法做详细分析。
写个简单的示意Demo,做为下面分析多线程出队入队的背景案例:A、B、C三个顾客去银行办理业务,A先到,此时窗口空无一人,他优先获得办理业务的机会。(类比A线程先抢到对象锁),但A顾客办理业务耗时长达20分钟,期间B、C线程也进来抢锁了:
public class CodeAnalyse {
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock(); //非公平锁
//A顾客,耗时20min
new Thread(() -> {
lock.lock();
try {
System.out.println("----come in A");
TimeUnit.MINUTES.sleep(20);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "A").start();
//第二个顾客,B顾客,一看A在办理,只能去候客区(进入AQS队列),等待A办理完后再尝试抢锁
new Thread(() -> {
lock.lock();
try{
System.out.println("----come in B");
}finally {
lock.unlock();
}
},"B").start();
//第三个顾客,C顾客,一看A在办理,只能去候客区(进入AQS队列),等待A办理完后再尝试抢锁,在FIFO队列,C前面是B
new Thread(() -> {
lock.lock();
try{
System.out.println("----come in C");
}finally {
lock.unlock();
}
},"C").start();
//后续顾客D、E、F....以此类推
}
}
画个示意图,初始状态:
从lock到acquire方法,接下来看acquire方法的第一个步骤tryAcquire方法的源码。如Demo案例,A线程先来,此时没线程,AQS类的volatile变量 state = 0,A抢占成功:
接着B线程lock,走到tryAcquire,判断c != 0,往下拱,当前线程current为B,OwnerThread为A,也不相等,返回false
false取反,继续往下判断条件,就到了addWaiter
因此,tryAcquire方法是尝试抢锁,看下state位:
addWaiter方法,看名字就知道:为当前线程以给定模式创建Node并入队。接着上面看,当前addWaiter方法的传参是Node.EXCLUSIVE,即线程要以独占的方式等待锁,相反的,Node.SHARED表示线程以共享的模式等待锁。开始第一轮循环,AQS队列为空,尾指针等于null,不满足条件,走initializeSyncQueue:
注意现在的B线程所对应的Node B并不是头节点,而是一个新new的Node,这个Node也称虚拟节点或者哨兵节点,作用是占位,Node的两个属性:Thread = null ,waitStatus = 0。最后把头节点设置为尾节点
队列示意图:
此时,再回addWaiter方法进行下一次循环,尾节点tail不再为null,设置我最终要返回Node B的前置节点为oldTail,oldTail = tail,也即刚才的那个虚拟节点。然后CAS尾节点为Node B,返回Node B对象:
如下图示意:
此时,NodeB(线程B)入队成功。同理,C线程来,NodeC入队也一样,不同的是,尾节点不再为null,不用重复刚来时的初始化步骤initializeSyncQueue了:
总之就是来回再倒一个双向链表的prev和next指针,以及头尾指针的关系,最终实现入队。
双向链表中,第一个节点为虚节点(也叫哨兵节点),其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是从第二个节点开始的。
上面的addWaiter方法返回Node B 后,再回到acquire方法,该如何实现入队后的通知唤醒操作?
传入addWaiter返回的Node B节点与args = 1。进入acquireQueued方法,一轮循环,获得传入节点的前置节点,Node B的前置,即虚拟节点,判断p是头节点没问题,继续往下拱,到前面的tryAcquire,即再尝试抢一次,假设A线程仍还在办理业务,那这次tryAcquire自然也返回false,到了shouldParkAfterFailedAcquire(方法名含义:在抢锁失败后是否Park等待唤醒):
传入前置节点和Node B节点:ws = 0,Node.SINGLE= -1,不相等,直接到最后一个else分支,把前置节点的waitStatus属性改为-1,并返回false
当前状态示意图:
退回刚开始acquireQueued方法的for(;;)循环,同理,A还在继续占着锁,仍然进到shouldParkAfterFailedAcquire方法:
此时,前置节点的的waitStatus是-1了,返回true,到了parkAndCheckInterrupt
方法:
parkAndCheckInterrupt
方法使用了LockSupport,且被线程如果是被中断的,就返回true。LockSupport.park阻塞线程B,直到被unpark发放permit许可,到此,B就稳稳当当的停留在队列中了,等着后面的唤醒通知机制。
同理,Node C进来,node C的前置节点为node B,不是head,都不用tryAcquire尝试抢锁了,直接执行两次shouldParkAfterFailedAcquire,把它前置节点node B的waitStatus从0改为-1,还是到这儿,把B的waitStatus由默认的0改为-1,返回false。再到parkAndCheckInterrupt
,稳当地在队里等着被唤醒。
到此,再看上篇的waitStatus这个属性,这个-1的含义就理解透彻了,即Node所在的线程已经安静坐着等释放锁了。
再整理梳理下上面acquireQueued方法内部调用的两个重要方法的源码:
如果前置节点的waitStatus是SIGNAL状态,即shouldParkAfterFailedAcquire返回true,就执行parkAndCheckInterrupt挂起当前线程。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
到此,lock方法 ? acquire方法下的三个方法整体流程结束:
总结就是,它们分别在做尝试抢 ? 抢不到,入队 ? 帮助Node坐稳椅子(LockSupport.park() 挂起),等待锁释放后FIFO唤醒
以上,B、C在队列等,A线程终于忙完了unlock,state = 0了,AQS队列钟排队的线程也该出队了。接下来看unlock方法涉及的AQS的源码:
跟进release方法:
抛出不支持的操作异常,模板设计模式:
往下看在子类ReentrantLock的实现,看条件判断里的tryRelease方法:state = 1,传入的release = 1,int c = 1 -1 = 0,下面这个自然也一般不出现,调用方法准备释放锁的线程即线程A,Thread.currentThrad为A,ownerThread也是A
进入下面的if分支,设置free为true,改抢到的线程owenerThread为null,坑位腾出来,把state设置为0,返回true:
再回到调用点,tryRelease条件上面返回了true,进入,头节点给h,h成了虚拟节点,其不为null,且waitStatus在上面B入队时就改成了-1,条件成立,进入方法里unparkSuccessor
传入h,即虚拟头节点,waitStatus为-1,<0,CAS重新设回0:
Node s是h的下一个节点,也就是Node B ,不等于null,但其waitStatus是 -1(Node C入队时改的),不大于0,因此走unpark这儿,唤醒B:
回调用点,release方法返回true,即unlock成功:
再说刚才unparkSuccessor里的unpark B线程,退回上面lock下的源码里的Locksupport,这儿还LockSupport等待被放发permit后唤醒着呢:
B被unpark了,不是中断 了,这里返回false:
继续for(;;)进行下一轮循环,B的前置节点为虚拟节点,符合,继续tryAcquire抢占锁:
为什么B排队了,现在轮到它了却还要尝试抢锁?因为非公平锁,B被唤醒了,此时也有可能刚好又来了线程X,而X插队抢到了。
假设此时B去执行tryAcquire,return true,抢占成功,再回调用处:
看下setHead方法,此时Node node为Node B,Node p是虚拟节点,B为头节点,B的thread改为null,因为B抢到锁了,这把Node的椅子腾出来,Node B的前置指针改为null
示意图:
此时,p.next再设置为null,再没有任何变量指向虚拟节点的对象Node,需要回收,因此,才help GC ,返回false
acquire方法判断条件为false,不用自我中止,执行结束,lock方法执行结束,线程B终于抢锁成功,加冕!
此时,原本的虚拟节点仙逝了,而抢到锁的B线程所在的Node成了新一代的虚拟节点
到此,A彻底退出江湖,B抢到锁办理业务中…
主流程中,还有一个cancelAcquire(异常情况,等一半不想等了):
跟进:
跟进,看到cancelAcquire方法,某Node中途不排队了,取消抢锁:
如图:
如果现在走人的是队尾节点Node 5,那就node4.next = null,队尾指针tail指向node4就行:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//node要走了,线程属性设置为空
node.thread = null;
// 取前置节点,后面做判断
Node pred = node.prev;
//waitStatus只有取值1时,才能大于0,1即获取锁的请求取消
//即前置节点也要走,属于下面的情况,这里先说只有node5走的情况
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//设置node 5的状态值为cancle
node.waitStatus = Node.CANCELLED;
//如果是尾指针,CAS把前一个节点设置为队尾,并把pred.next置为null
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
//.....略
如果走人的是中间节点,比如Node4,又如果一走就连着走了一片,比如node3也要走:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
//node要走了,线程属性设置为空
node.thread = null;
// 取前置节点,后面做判断
Node pred = node.prev;
//waitStatus只有取值1时,才能大于0,1即获取锁的请求取消
//即前置节点也要走,现在就是这个情况
while (pred.waitStatus > 0)
//while当前置节点的状态大于0,即是取消状态,就一直循环
//如此就找到了非取消状态的前置节点,并把指针指向赋值改过来
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
//如果是尾指针,这里走else分支
if (node == tail && compareAndSetTail(node, pred)) {
pred.compareAndSetNext(predNext, null);
} else {
//
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
pred.compareAndSetNext(predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}