Semaphore 是 Java 中用于控制对共享资源的访问的同步工具之一。它维护了一定数量的许可,线程在访问共享资源之前必须先获得许可。如果许可数不为零,则线程将获得许可,并将许可数减一;如果许可数为零,则线程将阻塞,直到有其他线程释放许可。Semaphore 可以用于控制同时访问某个特定资源的线程数量。
主要特点和用途:
许可控制: Semaphore 是一个许可控制工具,它维护了一定数量的许可,线程在执行任务之前必须获取许可。许可的数量决定了同时允许多少个线程访问共享资源。
阻塞和释放: 当线程调用 acquire() 方法时,如果有可用的许可,则线程会获得许可,继续执行任务;如果没有可用的许可,则线程会被阻塞,直到有许可为止。而调用 release() 方法可以释放许可,使得其他被阻塞的线程有机会获得许可。
可中断: acquire() 方法支持中断,即在等待许可的过程中,如果线程被中断,会抛出 InterruptedException。
多许可获取: acquire(int permits) 方法可以一次性获取多个许可,而 release(int permits) 方法可以释放多个许可。
非公平性: Semaphore 默认是非公平的,即线程获取许可的顺序是不确定的。可以通过构造方法指定是否使用公平性。
基本用法:
Semaphore 的主要构造方法如下:
// 只指定许可数的构造方法
new Semaphore(3); // 允许3个线程同时访问
// 指定许可数和是否使用公平性的构造方法
new Semaphore(3, true); // 使用公平性
资源控制: 用于控制同时访问某个特定资源的线程数量。通过合理配置许可数,可以避免多个线程同时访问共享资源导致的竞争问题。
限流: 在高并发的系统中,可以使用 Semaphore 来限制同时执行某个操作的线程数量,防止系统资源被耗尽。
线程池管理: Semaphore 可以用于实现自定义的线程池管理策略,控制线程的启动和执行。
任务调度: 在一些场景下,需要等待多个任务都完成后才能继续执行其他操作,Semaphore 可以用于等待多个任务的完成。
并发控制: 用于协调多个线程的并发执行,确保它们在执行某个任务时能够合理地协同工作。
// 内部同步器,Semaphore的核心实现基于这个同步器
private final Sync sync;
// 构造函数,初始化Semaphore,可指定公平或非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
import java.io.Serializable;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.ReentrantLock;
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
// 内部同步器,Semaphore的核心实现基于这个同步器
private final Sync sync;
// 构造函数,初始化Semaphore,可指定公平或非公平模式
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 获取许可,如果没有可用许可,则阻塞直到有可用许可
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 尝试获取许可,如果没有可用许可,则立即返回false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 释放许可,增加可用许可的数量
public void release() {
sync.releaseShared(1);
}
// 获取当前可用许可的数量
public int availablePermits() {
return sync.getPermits();
}
// 减少可用许可的数量
protected void reducePermits(int reductions) {
if (reductions < 0) throw new IllegalArgumentException();
sync.reducePermits(reductions);
}
// 用于公平模式的同步器实现
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
// 尝试以公平方式获取许可
protected int tryAcquireShared(int acquires) {
// 公平模式下,检查是否有前驱线程在等待
if (hasQueuedPredecessors()) {
return -1; // 有前驱线程,获取失败
}
return nonfairTryAcquireShared(acquires);
}
}
// 非公平模式的同步器实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
// 尝试以非公平方式获取许可
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
// 抽象同步器类,继承自AbstractQueuedSynchronizer
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits); // 初始时设置许可数量
}
// 获取当前可用许可的数量
final int getPermits() {
return getState();
}
// 尝试以非公平方式获取许可的实现
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 如果可用许可足够,则尝试原子更新状态
if (remaining < 0 || compareAndSetState(available, remaining)) {
return remaining;
}
}
}
// 释放许可,增加可用许可的数量
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// 溢出检查
if (next < current) throw new Error("Maximum permit count exceeded");
// 尝试原子更新状态
if (compareAndSetState(current, next)) {
return true;
}
}
}
// 减少可用许可的数量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
// 下溢检查
if (next > current) throw new Error("Permit count underflow");
// 尝试原子更新状态
if (compareAndSetState(current, next)) {
return;
}
}
}
// 获取并返回所有可用许可,原子地将状态设置为零
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0)) {
return current;
}
}
}
}
}
Semaphore 可以用于限制同时访问的线程数量,这在资源池管理中很有用,以确保不会超过资源池允许的最大并发数。
import java.util.concurrent.*;
public class ResourcePool {
private final Semaphore semaphore;
public ResourcePool(int poolSize) {
this.semaphore = new Semaphore(poolSize);
}
public Resource acquireResource() throws InterruptedException {
semaphore.acquire();
return new Resource();
}
public void releaseResource(Resource resource) {
// 释放资源并释放许可
resource.close();
semaphore.release();
}
private static class Resource {
public void close() {
// 释放资源的具体操作
}
}
}
在多线程环境中,对数据库连接的并发访问需要进行控制,以防止连接池被过度使用。
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Semaphore;
public class DatabaseConnectionPool {
private final Semaphore semaphore;
private final Connection[] connections;
public DatabaseConnectionPool(int poolSize) throws SQLException {
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
for (int i = 0; i < poolSize; i++) {
connections[i] = createConnection();
}
}
public Connection acquireConnection() throws InterruptedException, SQLException {
semaphore.acquire();
return getNextAvailableConnection();
}
public void releaseConnection(Connection connection) {
if (markConnectionAsUnused(connection)) {
semaphore.release();
}
}
private Connection getNextAvailableConnection() {
// 获取下一个可用连接的逻辑
return null;
}
private boolean markConnectionAsUnused(Connection connection) {
// 标记连接为未使用的逻辑
return true;
}
private Connection createConnection() throws SQLException {
// 创建数据库连接的逻辑
return null;
}
}
控制并发执行任务的数量,以防止系统资源过度消耗。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class ConcurrentTaskController {
private static final int MAX_CONCURRENT_TASKS = 5;
private final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_TASKS);
private final ExecutorService executorService = Executors.newCachedThreadPool();
public void executeTask(Runnable task) {
try {
semaphore.acquire();
executorService.submit(() -> {
try {
task.run();
} finally {
semaphore.release();
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}