sync,ReentrantLock是互斥锁,保证一个资源同一时间只允许被一个线程访问
Semaphore(信号量)保证1个或多个资源可以被指定数量的线程同时访问
底层实现是基于AQS去做的。
Semaphore底层也是基于AQS的state属性做一个计数器的维护。state的值就代表当前共享资源的个数。如果一个线程需要获取的1或多个资源,直接查看state的标识的资源个数是否足够,如果足够的,直接对state - 1拿到当前资源。如果资源不够,当前线程就需要挂起等待。知道持有资源的线程释放资源后,会归还给Semaphore中的state属性,挂起的线程就可以被唤醒。
Semaphore也分为公平和非公平的概念。
使用场景:连接池对象就可以基础信号量去实现管理。在一些流量控制上,也可以采用信号量去实现。再比如去迪士尼或者是环球影城,每天接受的人流量是固定的,指定一个具体的人流量,可能接受10000人,每有一个人购票后,就对信号量进行–操作,如果信号量已经达到了0,或者是资源不足,此时就不能买票。
以上面环球影城每日人流量为例子去测试一下。
public static void main(String[] args) throws InterruptedException {
// 今天环球影城还有人个人流量
Semaphore semaphore = new Semaphore(10);
new Thread(() -> {
System.out.println("一家三口要去~~");
try {
semaphore.acquire(3);
System.out.println("一家三口进去了~~~");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("一家三口走了~~~");
semaphore.release(3);
}
}).start();
for (int i = 0; i < 7; i++) {
int j = i;
new Thread(() -> {
System.out.println(j + "大哥来了。");
try {
semaphore.acquire();
System.out.println(j + "大哥进去了~~~");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(j + "大哥走了~~~");
semaphore.release();
}
}).start();
}
Thread.sleep(10);
System.out.println("main大哥来了。");
if (semaphore.tryAcquire()) {
System.out.println("main大哥进来了。");
}else{
System.out.println("资源不够,main大哥进来了。");
}
Thread.sleep(10000);
System.out.println("main大哥又来了。");
if (semaphore.tryAcquire()) {
System.out.println("main大哥进来了。");
semaphore.release();
}else{
System.out.println("资源不够,main大哥进来了。");
}
}
其实Semaphore整体就是对构建Semaphore时,指定的资源数的获取和释放操作
获取资源方式:
归还资源方式:
先查看Semaphore的整体结构,然后基于获取资源,以及归还资源的方式去查看源码
Semaphore内部有3个静态内类。
首先是向上抽取的Sync
其次还有两个Sync的子类NonFairSync以及FairSync两个静态内部类
Sync内部主要提供了一些公共的方法,并且将有参构造传入的资源个数,直接基于AQS提供的setState方法设置了state属性。
NonFairSync以及FairSync区别就是tryAcquireShared方法的实现是不一样。
在构建Semaphore的时候,如果只设置资源个数,默认情况下是非公平。
如果在构建Semaphore,传入了资源个数以及一个boolean时,可以选择非公平还是公平。
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
从非公平的acquire方法入手
首先确认默认获取资源数是1个,并且acquire是允许中断线程时,抛出异常的。获取资源的方式,就是直接用state - 需要的资源数,只要资源足够,就CAS的将state做修改。如果没有拿到锁资源,就基于共享锁的方式去将当前线程挂起在AQS双向链表中。如果基于doAcquireSharedInterruptibly拿锁成功,会做一个事情。会执行setHeadAndPropagate方法。一会说
// 信号量的获取资源方法(默认获取一个资源)
public void acquire() throws InterruptedException {
// 跳转到了AQS中提供共享锁的方法
sync.acquireSharedInterruptibly(1);
}
// AQS提供的
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 判断线程的中断标记位,如果已经中断,直接抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 先看非公平的tryAcquireShared实现。
// tryAcquireShared:
// 返回小于0,代表获取资源失败,需要排队。
// 返回大于等于0,代表获取资源成功,直接执行业务代码
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 信号量的非公平获取资源方法
final int nonfairTryAcquireShared(int acquires) {
// 死循环。
for (;;) {
// 获取state的数值,剩余的资源个数
int available = getState();
// 剩余的资源个数 - 需要的资源个数
int remaining = available - acquires;
// 如果-完后,资源个数小于0,直接返回这个负数
if (remaining < 0 ||
// 说明资源足够,基于CAS的方式,将state从原值,改为remaining
compareAndSetState(available, remaining))
return remaining;
}
}
// 获取资源失败,资源不够,当前线程需要挂起等待
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 构建Node节点,线程和共享锁标记,并且到AQS双向链表中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 拿到上一个节点
final Node p = node.predecessor();
// 如果是head.next,就抢一手
if (p == head) {
// 再次基于非公平的方式去获取一次资源
int r = tryAcquireShared(arg);
// 到这,说明拿到了锁资源
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
// 如果上面没拿到,或者不是head的next节点,将前继节点的状态改为-1,并挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 如果线程中断会抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquire()以及acquire(int)的方式,都是执行acquireSharedInterruptibly方法去尝试获取资源,区别只在于是否传入了需要获取的资源个数。
tryAcquire()以及tryAcquire(int因为这两种方法是直接执行tryAcquire,只使用非公平的实现,只有非公平的情况下,才有可能在有线程排队的时候获取到资源
但是tryAcquire(int,time,unit)这种方法是正常走的AQS提供的acquire。因为这个tryAcquire可以排队一会,即便是公平锁也有可能拿到资源。这里的挂起和acquire挂起的区别仅仅是挂起的时间问题。
还有acquireUninterruptibly()以及acquireUninterruptibly(int)只是在挂起线程后,不会因为线程的中断而去抛出异常
公平与非公平只是差了一个方法的实现tryAcquireShared实现
这个方法的实现中,如果是公平实现,需要先查看AQS中排队的情况
// 信号量公平实现
protected int tryAcquireShared(int acquires) {
// 死循环。
for (;;) {
// 公平实现在走下述逻辑前,先判断队列中排队的情况
// 如果没有排队的节点,直接不走if逻辑
// 如果有排队的节点,发现当前节点处在head.next位置,直接不走if逻辑
if (hasQueuedPredecessors())
return -1;
// 下面这套逻辑和公平实现是一模一样的。
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
因为信号量从头到尾都是共享锁的实现……
释放资源操作,不区分公平和非公平
// 信号量释放资源的方法入口
public void release() {
sync.releaseShared(1);
}
// 释放资源不分公平和非公平,都走AQS的releaseShared
public final boolean releaseShared(int arg) {
// 优先查看tryReleaseShared,这个方法是信号量自行实现的。
if (tryReleaseShared(arg)) {
// 只要释放资源成功,执行doReleaseShared,唤醒AQS中排队的线程,去竞争Semaphore的资源
doReleaseShared();
return true;
}
return false;
}
// 信号量实现的释放资源方法
protected final boolean tryReleaseShared(int releases) {
// 死循环
for (;;) {
// 拿到当前的state
int current = getState();
// 将state + 归还的资源个数,新的state要被设置为next
int next = current + releases;
// 如果归还后的资源个数,小于之前的资源数。
// 避免出现归还资源后,导致next为负数,需要做健壮性判断
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS操作,保证原子性,只会有一个线程成功的就之前的state修改为next
if (compareAndSetState(current, next))
return true;
}
}
为了更好的了解PROPAGATE节点状态的意义,优先从JDK1.5去分析一下释放资源以及排队后获取资源的后置操作
首先查看4个线程获取信号量资源的情况
往下查看释放资源的过程会触发什么问题
首先t1释放资源,做了进一步处理
当线程3获取锁资源后,线程2再次释放资源,因为执行点问题,导致线程4无法被唤醒
====================================JDK1.5实现============================================.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void setHeadAndPropagate(Node node, int propagate) {
setHead(node);
if (propagate > 0 && node.waitStatus != 0) {
Node s = node.next;
if (s == null || s.isShared())
unparkSuccessor(node);
}
}
====================================JDK1.8实现============================================.
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
// 拿到head节点
Node h = head;
// 判断AQS中有排队的Node节点
if (h != null && h != tail) {
// 拿到head节点的状态
int ws = h.waitStatus;
// 状态为-1
if (ws == Node.SIGNAL) {
// 将head节点的状态从-1,改为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒后继节点
unparkSuccessor(h);
}
// 发现head状态为0,将head状态从0改为-3,目的是为了往后面传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 没有并发的时候。head节点没变化,正常完成释放排队的线程
if (h == head)
break;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// 拿到head
Node h = head;
// 将线程3的Node设置为新的head
setHead(node);
// 如果propagate 大于0,代表还有剩余资源,直接唤醒后续节点,如果不满足,也需要继续往后判断看下是否需要传播
// h == null:看成健壮性判断即可
// 之前的head节点状态为负数,说明并发情况下,可能还有资源,需要继续向后唤醒Node
// 如果当前新head节点的状态为负数,继续释放后续节点
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
// 唤醒当前节点的后继节点
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}