Semaphore字面意思是信号量。主要用于控制有限的资源的访问数量。比如:公共厕所有5个蹲位,但有10个人要上厕所、仓库容量有限,达到容量后无法再存储货物,除非有出货。再比如我们的池技术(连接池、对象池等),池的大小有限,但使用者很多,我们需要不断的进行获取连接和释放连接。
Semaphore 内部同样有个静态内部类 Sync,同样实现了 AQS,同样有公平(FairSync)和非公平两个实现 (NonfairSync)
/**
*
* @param permits 资源数量
* 按蹲位的示例来说,者就是蹲位的数量,默认使用非公平策略
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
*
* 获取一个资源
* 当资源已经不够时,当前申请的线程会阻塞
* 此方式线程可以被打断,被打断后抛出 InterruptedException
*/
public void acquire() throws InterruptedException
/**
*
* 获取 permits 个资源
* 当资源已经不够时,当前申请的线程会阻塞
* 此方式线程可以被打断,被打断后抛出 InterruptedException
*/
public void acquire(int permits) throws InterruptedException
/**
*
* 获取一个资源
* 当资源已经不够时,当前申请的线程会阻塞
* 此方式线程就算被打断,它依然会等待获取资源,只是获取资源的时间和不打断获取资源的时间有所变化
*/
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)
/**
*
* 立即获取一个资源
* 当资源足够时,立即返回 true
* 当资源已经不够时,立即返回 false
*/
public boolean tryAcquire()
/**
*
* 立即获取一个资源
* 当资源足够时,立即返回 true
* 当资源已经不够时,等待对应到时间,还没有资源才会立即返回 false
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException
/**
*
* 释放出一个资源
*/
public void release()
/**
*
* 释放出 permits 个资源
*/
public void release(int permits)
/**
* 获取当前可用资源个数
*/
public int availablePermits()
/**
* 获取所有可用资源个数
*/
public int drainPermits()
/**
* 是否是公平锁
*/
public boolean isFair()
/**
* 是否有线程正在等待获取资源
*/
public final boolean hasQueuedThreads()
/**
* 正在等待获取资源的线程数量
*/
public final int getQueueLength()
比如公共厕所一共有5个蹲位,当前有10个人来上厕所
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SemaphoreTest {
public static void main(String[] args) {
// 公共厕所 5 个蹲位
Semaphore toilet = new Semaphore(5);
// 10 个人上厕所
for (int i = 0; i < 10; i++) {
new Thread(()->{
log.debug("想上厕所");
if(toilet.availablePermits() == 0){
log.debug("没位置,等一下");
}
try{
toilet.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("抢到位置了....");
try {
// 模拟上厕所时间
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("上完了");
toilet.release();
},"第"+i+"个人").start();
}
}
}
Semaphore 在实际应用中其实使用得并不多,因为当资源较少的时候,可能造成大量线程阻塞。而且我们使用线程池的方式也可以达到这些效果,而且线程池的使用更简单
倒数计数器。用于某个线程等待 CountDownLatch 的计数为 0 才开始执行。实际应用中的例子就是:某个线程的执行需要等待其他一个或多个并行线程执行完之后才能开始。
/**
*
* @Param count 其实数值
* 如果 count < 0,则抛出 IllegalArgumentException 异常
*/
public CountDownLatch(int count)
CountDownLatch 其实实现比较简单,也只提供了如下一个构造函数
/**
* 初始化 CountDownLatch 后调用该方法,用于等待倒数计数为 0
* 如果计数一直不为 0,则一直等待。除非线程被打断,打断后抛出 InterruptedException 异常
* 底层是AQS acquire 操作(获取锁)
*/
public void await() throws InterruptedException
/**
* 初始化 CountDownLatch 后调用该方法,用于等待倒数计数为 0
* 如果计数一直不为 0,则等待最长 timeout 时间。如果时间到了,计数还是不为 0 ,则返回 false,否则返回 true
* 除非线程被打断,打断后抛出 InterruptedException 异常
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
/**
* 调用一次计数减一,如果计数已经为0,则不会有什么效果
* 底层是 AQS release 方法(释放锁)
*/
public void countDown()
/**
* 返回当前计数(注意:此方法只在调试和测试时调用)
* 不要用作业务判断,计数为0,所有 await 的线程都会取消阻塞
*/
public long getCount()
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchTest {
public static void main(String[] args) {
// 运动员个数
int num = 8;
CountDownLatch runner = new CountDownLatch(num);
for (int i = 0; i < num; i++) {
new Thread(()->{
log.debug("开始执行");
long time = 9000 + new Random().nextInt(5000);
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("执行完毕,用时:{}",time);
runner.countDown();
},"线程"+i).start();
}
try {
// 阻塞等待其他线程执行完毕,main 线程才会执行
runner.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("等待其他线程执行完毕");
}
}
CyclicBarrier翻译过来就是:循环屏障。各个线程之间相互等待,等最后一个线程准备完毕,一起执行。
CyclicBarrier 内部使用 ReentrantLock 实现。
/**
* @Param parties 参与的线程数(计数)
*/
public CyclicBarrier(int parties)
/**
* @Param barrierAction 表示当参与的线程都执行完成后,要执行的任务
*/
public CyclicBarrier(int parties, Runnable barrierAction)
// 获取当前的CyclicBarrier一共有多少线程参数与
public int getParties()
// 等待(如果有线程在等待中,调用 reset 方法,会抛出 BrokenBarrierException)
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException
// 判断barrier是否已经损坏
public boolean isBroken()
// 重置barrier,重复使用前调用
public void reset()
// 获取当前正在等待该barrier的线程数
public int getNumberWaiting()
田径比赛中,最后一个运动员准备好之后,发令起跑,最先到达终点的为第一名
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
int num = 8;
CyclicBarrier barrier = new CyclicBarrier(num,()->{
// 执行这个任务的是最后一个准备的线程
log.debug("都准备好了,开跑");
});
for (int i = 0; i < num; i++) {
new Thread(()->{
try {
barrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
// 模拟到达时间(一般百米在9到14秒以内)
long time = 9000 + new Random().nextInt(5000);
try {
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.debug("到达终点,用时:{}",time);
},"运动员"+i).start();
}
}
}