在Java
并发编程中,Semaphore
是一个非常重要的工具类。它位于java.util.concurrent
包中,为我们提供了一种限制对临界资源的访问的机制。你可以将其视为一个同步控制的瑞士军刀,因为它既能够控制对资源的并发访问数量,也能够保证资源的公平访问。
Semaphore
在中文中意为“信号量”,它维护了一个许可集。这些许可可以理解为对某种资源的访问权限。当线程希望访问某个资源时,它必须从Semaphore
中获取一个许可;当线程完成对资源的访问后,它应该释放这个许可,以便其他线程可以使用。
构造函数:
Semaphore(int permits)
: 创建一个具有给定许可数的Semaphore
,但并非公平策略。这意味着等待时间久的线程并不一定会优先获得许可。Semaphore(int permits, boolean fair)
: 创建一个具有给定许可数的Semaphore
,并指定是否使用公平策略。如果fair为true,则等待时间久的线程会优先获得许可。常用方法:
acquire()
: 获取一个许可,如果当前没有可用的许可,则线程会被阻塞,直到有一个许可可用。这个方法可以被中断。acquire(int permits)
: 获取指定数量的许可。如果当前没有足够的许可可用,则线程会被阻塞,直到有足够的许可可用。这个方法同样可以被中断。acquireUninterruptibly()
: 获取一个许可,但如果当前没有可用的许可,则线程会被阻塞,直到有一个许可可用。与acquire()不同的是,这个方法不会被中断。acquireUninterruptibly(int permits)
: 获取指定数量的许可,如果当前没有足够的许可可用,则线程会被阻塞,直到有足够的许可可用。这个方法同样不会被中断。release()
: 释放一个许可,将其返回到Semaphore中,以供其他线程使用。release(int permits)
: 释放指定数量的许可。availablePermits()
: 返回当前Semaphore中可用的许可数。hasQueuedThreads()
: 查询是否有线程正在等待获取许可。getQueueLength()
: 返回正在等待获取许可的线程数。drainPermits()
: 获取并返回当前所有可用的许可,并将可用许可数减少到0。reducePermits(int reduction)
: 减少Semaphore
中可用的许可数。这个方法主要用于在某些情况下动态地减少资源的可用性。使用Semaphore
非常简单。首先,你需要创建一个Semaphore
实例,指定可用的许可数量。然后,线程在需要访问资源时调用acquire()
方法获取许可,访问完成后调用release()
方法释放许可。
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
private static final int MAX_PERMITS = 3;
private static Semaphore semaphore = new Semaphore(MAX_PERMITS);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 获取到许可");
// 模拟资源访问
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " 释放许可");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}).start();
}
}
}
运行结果
Thread-0 获取到许可
Thread-1 获取到许可
Thread-2 获取到许可
Thread-2 释放许可
Thread-3 获取到许可
Thread-0 释放许可
Thread-4 获取到许可
Thread-1 释放许可
Thread-5 获取到许可
Thread-5 释放许可
Thread-3 释放许可
Thread-6 获取到许可
Thread-7 获取到许可
Thread-4 释放许可
Thread-8 获取到许可
Thread-7 释放许可
Thread-6 释放许可
Thread-9 获取到许可
Thread-8 释放许可
Thread-9 释放许可
在上述代码中,我们创建了一个拥有3个许可的Semaphore
。然后我们启动了10个线程,每个线程都试图获取一个许可来访问资源。由于只有3个许可,所以任何时候最多只有3个线程能够同时访问资源。
acquire()
方法时,如果当前没有可用的许可,线程会被阻塞,直到有一个许可可用。acquire()
调用都有一个对应的release()调用。Semaphore
还提供了tryAcquire()
方法,该方法尝试获取一个许可,如果当前没有可用的许可,它会立即返回false,而不是阻塞线程。class LoginQueueUsingSemaphore {
private Semaphore semaphore;
public LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}
boolean tryLogin() {
return semaphore.tryAcquire();
}
void logout() {
semaphore.release();
}
int availableSlots() {
return semaphore.availablePermits();
}
}
请注意我们如何使用以下方法:
为了测试我们的登录队列,我们将首先尝试达到限制并检查下一次登录尝试是否会被阻止:
@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}
接下来,我们将查看注销后是否有可用的插槽:
@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();
assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}
TimedSemaphore允许多个许可证作为简单的信号量,但在给定的时间段内,在该时间段之后时间重置并且所有许可证都被释放。
class DelayQueueUsingTimedSemaphore {
private TimedSemaphore semaphore;
DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}
boolean tryAdd() {
return semaphore.tryAcquire();
}
int availableSlots() {
return semaphore.getAvailablePermits();
}
}
当我们使用以一秒为时间段的延迟队列时,在一秒内使用完所有插槽后,应该没有一个可用:
public void givenDelayQueue_whenReachLimit_thenBlocked() {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue
= new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}
但休眠一段时间后,信号量应该重置并释放许可证:
@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();
assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}
Semaphore
是一个强大而灵活的同步工具,它允许我们细粒度地控制对资源的并发访问。通过合理地使用Semaphore
,我们可以确保系统在高并发环境下的稳定性和性能。