JAVA并发编程-AQS底层实现原理及应用(一)

发布时间:2024年01月13日
前言

Java中的大部分同步类(CountDownLatch,Semaphore,CyclicBarrier、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。本篇不讲述包含共享锁和Condition Queue的部分。

一、开发一个自定义Lock锁
public class SelfLock implements Lock {


    private static class Sync extends AbstractQueuedSynchronizer{
        //加锁
        public boolean tryAcquire(int acquire){
            //设置state 0没有线程持有锁,1有线程持有锁
            if(compareAndSetState(0,acquire)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //释放锁
        public boolean tryRelease(int release){
            if(getState() == 0){
                throw new IllegalMonitorStateException();
            }
            setState(release);
            return true;
        }

        //创建condition
        Condition newCondition(){
            return new ConditionObject();
        }

        public Boolean isLocked() {
            return getState() == 1;
        }
    }
    public static final Sync sync = new Sync();
    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        //模板方法
        return sync.tryAcquire(1);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    public void unlock() {
        sync.release(0);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public Boolean isLocked(){
        return sync.isLocked();
    }

    public Boolean hasQueuedThreads(){
        return sync.hasQueuedThreads();
    }
}
二、从AQS实现看ReentrantLock原理及应用
2.1、ReentrantLock的特性
ReentrantLock特性Synchronized
锁实现机制基于AQS监视器模式
灵活性可以超时获取锁、响应中断获取锁
释放锁形式显示使用unlock自动释放
锁类型公平锁、非公平锁非公平锁
可重入性可重入可重入
条件队列可关联多个条件队列可关联一个

代码示例

//Synchronized的使用方式
// 用于代码块
synchronized (this) {}
//用于类
synchronized (object.class) {}
// 用于对象
synchronized (object) {}
//.用于方法
public synchronized void test () {}
// 4.可重入性
for (int i = 0; i < 100; i++) {
	synchronized (this) {}
}
//ReentrantLock的使用方式
public void test () throw Exception {
	// 1.初始化选择公平锁、非公平锁,此时实现公平锁
	ReentrantLock lock = new ReentrantLock(true);
	// 2.可用于代码块
	lock.lock();

	try {
		// 3.支持多种加锁方式,超时获取锁
		if(lock.tryLock(100, TimeUnit.MILLISECONDS)){ }
		//4.在锁可用时获取锁,但如果当前线程被中断,则会抛出InterruptedException异常
		if(lock.lockInterruptibly()){ }
	} finally {
		// 5.手动释放锁
		lock.unlock()
	}

}
2.2、ReentrantLock与AQS的关联

非公平锁加锁流程

// 非公平锁
static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
		}
  ...
}
  • 通过cas设置状态成功则获取锁成功,当前线程独占这把锁

  • 通过cas设置状态成功则获取锁失败,走acquire方法

  • 某个线程获取锁失败的后续流程是什么呢?有以下两种可能:
    (1)将当前线程获锁结果设置为失败,获取锁流程结束。这种设计会极大降低系统的并发度,并不满足我们实际的需求。所以就需要下面这种流程,也就是AQS框架的处理流程。
    (2)存在某种排队等候机制,线程继续等待,仍然保留获取锁的可能,获取锁流程仍在继续。

  • 对于问题1的第二种情况,既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

  • 处于排队等候机制中的线程,什么时候可以有机会获取锁呢?

  • 如果处于排队等候机制中的线程一直无法获取锁,还是需要一直等待吗,还是有别的策略来解决这一问题?

对于上边提到的问题,其实在ReentrantLock类源码中都无法解答,而这些问题的答案,都是位于Acquire方法所在的类AbstractQueuedSynchronizer中。

三、AQS
3.1、原理概览

AQS核心思想是如果被请求的共享资源空闲,那么就将当前线程设置为工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制保证锁分配。这种机制依赖CLH队列实现,将暂时获取不到锁的线程放到队列中。
CLH是单向链表,AQS里面实现的是CLH的变种双向链表。实现原理如图
CLH变种
AQS使用volatile修饰int类型的state变量,通过cas完成对state的修改
观察node类

static final class Node {
        //表示线程以共享的模式等待锁
        static final Node SHARED = new Node();
        //表示线程正在以独占的方式等待锁
        static final Node EXCLUSIVE = null;
		//取消获取锁
        static final int CANCELLED =  1;
        //表示线程已经准备好了
        static final int SIGNAL    = -1;
        //表示节点在等待队列中,节点线程等待唤醒
        static final int CONDITION = -2;
        //当前线程处在SHARED情况下,该字段才会使用,此处不涉及
        static final int PROPAGATE = -3;
        //节点状态
        volatile int waitStatus;
        //前驱指针
        volatile Node prev;
        //后继指针
        volatile Node next;
		//当前节点线程
		volatile Thread thread;
		//指向下一个处于CONDITION状态的节点,此处不涉及
        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
  • 加锁过程
  • 非公平锁与AQS之间方法的关联之处,以非公平锁为例,加锁过程。
    加锁执行流程
  • 非公平锁与AQS之间方法的关联之处,以非公平锁为例,释放锁过程。
    释放锁流程
2.3、通过ReentrantLock理解AQS

ReentrantLock中公平锁和非公平锁在底层是相同的,这里以非公平锁为例进行分析。

static final class NonfairSync extends Sync {
	...
	final void lock() {
		if (compareAndSetState(0, 1))
			setExclusiveOwnerThread(Thread.currentThread());
		else
			acquire(1);
	}
  ...
}

看一下这个Acquire

public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

再看一下tryAcquire方法:

protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}

可以看出AQS中并没有对tryAcquire具体实现,具体实现需要实现类自己完成,这里以ReentrantLock为例,如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。

2.3.1、线程加入等待队列
  • 加入等待队列的时机?
    当执行acquire方法tryAcquire失败时,就会调用addWaiter加入到等待队列
  • 如何入队列?
    会执行addWaiter(Node.EXCLUSIVE)加入等待队列
private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

主要流程如下:
1.通过当线程和线程独占模式创建新节点
2.如果尾节点不是null,node的pre指向旧tail,通过cas设置新的尾节点是node,旧的tail.next指向node
3.如果tail是null或者cas失败(Pred指针和Tail指向的位置不同(说明被别的线程已经修改))了,走enq
4.如果CLH还没初始化则初始化一个头节点,头结点时空参构造,后面的步骤和之前一样,只不过这里是死循环
演示获取锁CLH图示

1.当没有线程获取到锁时,线程A获取锁成功。

2.线程B、C申请锁,但是锁被线程A占有,依次在队列中往后排队即可。

在公平锁加锁流程中判断是否存在有效结点的方法

public final boolean hasQueuedPredecessors() {
	// The correctness of this depends on head being initialized
	// before tail and on head.next being accurate if the current
	// thread is first in queue.
	Node t = tail; // Read fields in reverse initialization order
	Node h = head;
	Node s;
	return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}

如何理解h != t && ((s = h.next) == null || s.thread != Thread.currentThread()),双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。h != t && h.next是null,可能出现在node正在入队的时候,因为节点入队不是原子操作,出现下图①,②的情况,存在有效节点。h != t && h.next不是null,第一个有效节点不是当前线程,存在有效节点。

if (t == null) { // Must initialize
	if (compareAndSetHead(new Node()))
		tail = head;
} else {
	① node.prev = t;
	if (compareAndSetTail(t, node)) {
		② t.next = node;
		return t;
	}
}
  • 出队列时机?
public final void acquire(int arg) {
	if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

上文解释了addWaiter方法,这个方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排队中的线程进行“获锁”操作。
总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

final boolean acquireQueued(final Node node, int arg) {
	// 标记是否成功拿到资源
	boolean failed = true;
	try {
		// 标记等待过程中是否中断过
		boolean interrupted = false;
		// 开始自旋,要么获取锁,要么中断
		for (;;) {
			// 获取当前节点的前驱节点
			final Node p = node.predecessor();
			// 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
			if (p == head && tryAcquire(arg)) {
				// 获取锁成功,头指针移动到当前node,将当前节点设置为虚节点
				setHead(node);
				p.next = null; // help GC
				failed = false;
				return interrupted;
			}
			// 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析
			if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
				interrupted = true;
		}
	} finally {
		if (failed)
			cancelAcquire(node);
	}
}
private void setHead(Node node) {
	head = node;
	node.thread = null;
	node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	// 获取头结点的节点状态
	int ws = pred.waitStatus;
	// 说明头结点处于唤醒状态
	if (ws == Node.SIGNAL)
		return true; 
	// 通过枚举值我们知道waitStatus>0是取消状态
	if (ws > 0) {
		do {
			// 循环向前查找取消节点,把取消节点从队列中剔除
			node.prev = pred = pred.prev;
		} while (pred.waitStatus > 0);
		pred.next = node;
	} else {
		// 设置前任节点等待状态为SIGNAL
		compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
	}
	return false;
}

parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

方法的流程

从上图可以看出,跳出当前循环的条件是当“前置节点是头结点,且当前线程获取锁成功”。为了防止因死循环导致CPU资源被浪费,我们会判断前置节点的状态来决定是否要将当前线程挂起,具体挂起流程用流程图表示如下(shouldParkAfterFailedAcquire流程):
流程图

文章来源:https://blog.csdn.net/qq_51059003/article/details/135219486
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。