信号量(Semaphore)是一种用于控制多个进程对共享资源的访问的同步机制。信号量通常由一个计数器和一组等待队列组成。计数器表示当前可以访问资源的进程数。当一个进程需要访问资源时,它会将计数器减1。如果计数器大于0,则进程可以访问资源;如果计数器等于0,则进程将被阻塞,并加入到等待队列中。当一个进程释放资源时,它会将计数器加1。如果等待队列中存在进程,则会从等待队列中选择一个进程,并将其唤醒。
**Java中的信号量(Semaphore)**是对线程访问资源进行流量控制操作。Semaphore 类是 Java 1.5 引入的,在 java.util.concurrent 包下。Semaphore 类提供了一种线程同步机制,它可以控制多个线程对共享资源的访问,可以实现互斥访问、生产者-消费者问题、读写问题等。
public class Test {
private static Semaphore semaphore = new Semaphore(20); // 每次最大允许20个线程
public static void main(String[] args) {
// 创建100个线程
for (int i = 0; i<100; i++){
new Thread(()->{
try{
semaphore.acquire(); // 请求访问,如果计数器小于1,会阻塞等待
System.out.println(Thread.currentThread().getName()+"----执行前----"+System.currentTimeMillis());
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"++++执行后++++"+System.currentTimeMillis());
semaphore.release(); // 释放资源(计数器+1)
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
}
}
}
由上述例子可以知道,Semaphore的作用就是限制同时访问资源的线程数。其实效果好像Synchronized关键字,只不过Semaphore可以放行多个线程,而Synchronized可以指定锁对象。(想象一下Semaphore的构造参数设置1,是不是也是一个简单的锁了)
什么时候用到Semaphore呢,我觉得一般情况下数据量大的业务方法考虑用Semaphore进行限流。例如批量数据操作,数据导入导出时,考虑数据量过大,多个线程同时操作情况下OOM问题。(其他大佬说考虑数据库连接数问题以及文件句柄资源访问时使用Semaphore限流)
至于具体用法,就像以上简单例子一样,只是将Thread.sleep(2000)
替换成业务代码即可。
这个时候就有小伙伴会问了(其实没有小伙伴会问),这样的话业务代码和技术代码不是混在一起了,怎么将其区分开呢?咳咳~,这个时候就需要使用自定义注解配合AOP实现了。
<!-- 引入aop支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LimitFlow {
int value();
boolean fair() default false;
}
@Aspect
@Component
public class LimitFlowAspect {
ConcurrentHashMap<String,Semaphore> semaphoreMap = new ConcurrentHashMap<String,Semaphore>();
@Pointcut("@annotation(com.example.semaphore_limit.anno.LimitFlow)")
public void getPointcut(){}
@Around("getPointcut()")
public Object before(ProceedingJoinPoint joinPoint) throws Throwable {
// long l1 = System.currentTimeMillis();
// System.out.println(Thread.currentThread().getName()+"--- 执行前 ---"+l1); // 输出执行前时间
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
String semaphoreKey = signature.toString(); // 方法名作为semaphore集合的key
Semaphore semaphore = semaphoreMap.get(semaphoreKey);
if (Objects.isNull(semaphore)){
Method method = signature.getMethod();
LimitFlow annotation = method.getAnnotation(LimitFlow.class); // 默认肯定是被标注的方法,getAnnotation肯定不为null
int value = annotation.value();
boolean fair = annotation.fair();
synchronized (this) { // 同步判断是否含有key,没有则创建新key
if (!semaphoreMap.containsKey(semaphoreKey)) { // 双重检测,线程安全
semaphore = new Semaphore(value,fair);
semaphoreMap.put(semaphoreKey, semaphore); //
}else{
semaphore = semaphoreMap.get(semaphoreKey);
}
}
}
semaphore.acquire();
Object proceed = joinPoint.proceed();
semaphore.release();
// long l2 = System.currentTimeMillis(); // 执行后时间
// System.out.println(Thread.currentThread().getName()+"--- 执行后 ---"+l2+"--- 共计 ---"+(l2-l1)+"ms");
return proceed;
}
}
@RestController
public class TestController {
@LimitFlow(1)
@GetMapping("test")
public String test() throws InterruptedException {
Thread.sleep(5000);
return "success";
}
}
以上就是Semaphore的简单使用介绍,日常开发中Semaphore使用还是比较少的,但是面试题会常常拿来提问。
在限流方面,Google的guava也提供了令牌桶算法做接口限流,感兴趣的可以找相关资料看看。