并发编程-25 高并发处理手段之消息队列思路 + 应用拆分思路 + 应用限流思路
Spring Boot - 利用Resilience4j-RateLimiter进行流量控制和服务降级
限流算法是一种在分布式系统中广泛使用的技术,用于控制对系统资源的访问速率,以保护系统免受恶意攻击或突发流量导致的过载。
在实际的业务场景中,接口限流策略的应用非常广泛,以下是一些典型的场景:
简单计数器是一种最基础的限流算法,它的实现原理相对直观。
简单计数器的工作原理如下:
package com.artisan.counter;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CounterRateLimit {
/**
* 请求数量
*
* @return
*/
int maxRequest();
/**
* 时间窗口, 单位秒
*
* @return
*/
int timeWindow();
}
package com.artisan.counter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Aspect
@Component
public class CounterRateLimitAspect {
// 存储每个方法对应的请求次数
private Map<String, AtomicInteger> REQUEST_COUNT = new ConcurrentHashMap<>();
// 存储每个方法的时间戳
private Map<String, Long> REQUEST_TIMESTAMP = new ConcurrentHashMap<>();
/**
*
* @param joinPoint
* @return
* @throws Throwable
*/
@Around("@annotation(com.artisan.counter.CounterRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取注解信息
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
CounterRateLimit annotation = method.getAnnotation(CounterRateLimit.class);
// 获取注解的参数
int maxRequest = annotation.maxRequest();
long timeWindowInMillis = TimeUnit.SECONDS.toMillis(annotation.timeWindow());
// 获取方法名
String methodName = method.toString();
// 初始化计数器和时间戳
AtomicInteger count = REQUEST_COUNT.computeIfAbsent(methodName, x -> new AtomicInteger(0));
long startTime = REQUEST_TIMESTAMP.computeIfAbsent(methodName, x -> System.currentTimeMillis());
// 获取当前时间
long currentTimeMillis = System.currentTimeMillis();
// 判断: 如果当前时间超出时间窗口,则重置
if (currentTimeMillis - startTime > timeWindowInMillis) {
count.set(0);
REQUEST_TIMESTAMP.put(methodName, currentTimeMillis);
}
// 原子的增加计数器并检查其值
if (count.incrementAndGet() > maxRequest) {
// 如果超出最大请求次数,递减计数器,并报错
count.decrementAndGet();
throw new RuntimeException("Too many requests, please try again later.");
}
// 方法原执行
return joinPoint.proceed();
}
}
package com.artisan.counter;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@RestController
public class CounterRateLimitController {
/**
* 一秒一次
*
* @return
*/
@GetMapping("/counter")
@CounterRateLimit(maxRequest = 2, timeWindow = 2)
public ResponseEntity counter() {
System.out.println("Request Coming Coming....");
return ResponseEntity.ok("Artisan OK");
}
}
启动项目, 访问接口 http://localhost:8080/counter
简单计数器算法的优点是实现简单,但缺点也很明显:
边界问题:由于计数器是在时间窗口结束时重置的,如果系统的请求量非常大,可能会出现时间窗口临界点的问题,即在窗口即将结束时请求量激增,而在窗口开始时请求量较少,导致系统资源不能被有效利用。
突增流量处理能力不足:无法有效处理突增的流量,因为它的限制是固定的,不能根据系统的实时负载进行调整。
为了解决简单计数器的这些问题,可以采用更为复杂的限流算法,如滑动窗口计数器、漏桶算法、令牌桶算法等。这些算法能够更加平滑和有效地控制请求速率,提高系统的稳定性和可靠性。
在示例中,有一个使用了 @CounterRateLimit 注解的 counter 方法。根据注解的参数,这个方法在2秒钟的时间窗口内只能被调用2次。 如果在 2 秒内有更多的调用,那么这些额外的调用将被限流,并返回错误信息
假设1min一个时间段,每个时间段内最多100个请求。有一种极端情况,当10:00:58这个时刻100个请求一起过来,到达阈值;当10:01:02这个时刻100个请求又一起过来,到达阈值。这种情况就会导致在短短的4s内已经处理完了200个请求,而其他所有的时间都在限流中。
滑动窗口算法是实现限流的一种常用方法,它通过维护一个时间窗口来控制单位时间内请求的数量,从而保护系统免受突发流量或恶意攻击的影响。其核心原理是统计时间窗口内的请求次数,并根据预设的阈值来决定是否允许新的请求通过。
从图上可以看到时间创建是一种滑动的方式前进, 滑动窗口限流策略能够显著减少临界问题的影响,但并不能完全消除它。滑动窗口通过跟踪和限制在一个连续的时间窗口内的请求来工作。与简单的计数器方法不同,它不是在窗口结束时突然重置计数器,而是根据时间的推移逐渐地移除窗口中的旧请求,添加新的请求。
举个例子:假设时间窗口为10s,请求限制为3,第一次请求在10:00:00发起,第二次在10:00:05发起,第三次10:00:11发起,那么计数器策略的下一个窗口开始时间是10:00:11,而滑动窗口是10:00:05。所以这也是滑动窗口为什么可以减少临界问题的影响,但并不能完全消除它的原因。
package com.artisan.slidingwindow;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface SlidingWindowRateLimit {
/**
* 请求数量
*
* @return
*/
int maxRequest();
/**
* 时间窗口, 单位秒
*
* @return
*/
int timeWindow();
}
package com.artisan.slidingwindow;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Aspect
@Component
public class SlidingWindowRateLimitAspect {
/**
* 使用 ConcurrentHashMap 保存每个方法的请求时间戳队列
*/
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Long>> REQUEST_TIMES_MAP = new ConcurrentHashMap<>();
@Around("@annotation(com.artisan.slidingwindow.SlidingWindowRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
SlidingWindowRateLimit rateLimit = method.getAnnotation(SlidingWindowRateLimit.class);
// 允许的最大请求数
int requests = rateLimit.maxRequest();
// 滑动窗口的大小(秒)
int timeWindow = rateLimit.timeWindow();
// 获取方法名称字符串
String methodName = method.toString();
// 如果不存在当前方法的请求时间戳队列,则初始化一个新的队列
ConcurrentLinkedQueue<Long> requestTimes = REQUEST_TIMES_MAP.computeIfAbsent(methodName,
k -> new ConcurrentLinkedQueue<>());
// 当前时间
long currentTime = System.currentTimeMillis();
// 计算时间窗口的开始时间戳
long thresholdTime = currentTime - TimeUnit.SECONDS.toMillis(timeWindow);
// 这一段代码是滑动窗口限流算法中的关键部分,其功能是移除当前滑动窗口之前的请求时间戳。这样做是为了确保窗口内只保留最近时间段内的请求记录。
// requestTimes.isEmpty() 是检查队列是否为空的条件。如果队列为空,则意味着没有任何请求记录,不需要进行移除操作。
// requestTimes.peek() < thresholdTime 是检查队列头部的时间戳是否早于滑动窗口的开始时间。如果是,说明这个时间戳已经不在当前的时间窗口内,应当被移除。
while (!requestTimes.isEmpty() && requestTimes.peek() < thresholdTime) {
// 移除队列头部的过期时间戳
requestTimes.poll();
}
// 检查当前时间窗口内的请求次数是否超过限制
if (requestTimes.size() < requests) {
// 未超过限制,记录当前请求时间
requestTimes.add(currentTime);
return joinPoint.proceed();
} else {
// 超过限制,抛出限流异常
throw new RuntimeException("Too many requests, please try again later.");
}
}
}
package com.artisan.slidingwindow;
import com.artisan.leakybucket.LeakyBucketRateLimit;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@RestController
public class SlidingWindowController {
@GetMapping("/slidingWindow")
@SlidingWindowRateLimit(maxRequest = 2, timeWindow = 2)
public ResponseEntity slidingWindow() {
return ResponseEntity.ok("artisan slidingWindow ");
}
}
滑动窗口算法的优点是它能够比较平滑地控制流量,允许一定程度的突发流量,同时又能够限制平均流量。相比于固定窗口算法,滑动窗口算法能够更精确地控制单位时间内的请求量,因为它考虑了时间窗口内请求的分布情况,而不仅仅是在窗口的开始和结束时刻的请求量。
滑动窗口算法的变种有很多,如基于令牌桶和漏桶的算法,这些算法在滑动窗口的基础上增加了更为复杂的令牌生成和消耗机制,以实现更精细的流量控制。
在Leaky Bucket算法中,容器有一个固定的容量,类似于漏桶的容量。数据以固定的速率进入容器,如果容器满了,则多余的数据会溢出。容器中的数据会以恒定的速率从底部流出,类似于漏桶中的水滴。如果容器中的数据不足以满足流出速率,则会等待直到有足够的数据可供流出。这样就实现了对数据流的平滑控制。
package com.artisan.leakybucket;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface LeakyBucketRateLimit {
/**
* 桶的容量
*
* @return
*/
int capacity();
/**
* 漏斗的速率,单位通常是秒
*
* @return
*/
int leakRate();
}
package com.artisan.leakybucket;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Aspect
@Component
public class LeakyBucketRateLimitAspect {
@Around("@annotation(com.artisan.leakybucket.LeakyBucketRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
LeakyBucketRateLimit leakyBucketRateLimit = method.getAnnotation(LeakyBucketRateLimit.class);
int capacity = leakyBucketRateLimit.capacity();
int leakRate = leakyBucketRateLimit.leakRate();
// 方法签名作为唯一标识
String methodKey = method.toString();
LeakyBucketLimiter limiter = LeakyBucketLimiter.createLimiter(methodKey, capacity, leakRate);
if (!limiter.tryAcquire()) {
// 超过限制,抛出限流异常
throw new RuntimeException("Too many requests, please try again later.");
}
return joinPoint.proceed();
}
}
package com.artisan.leakybucket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
public class LeakyBucketLimiter {
/**
* 桶的容量
*/
private final int capacity;
/**
* 漏桶的漏出速率,单位时间内漏出水的数量
*/
private final int leakRate;
/**
* 当前桶中的水量
*/
private volatile int water = 0;
/**
* 上次漏水的时间
*/
private volatile long lastLeakTime = System.currentTimeMillis();
/**
* 漏桶容器
*/
private static final ConcurrentHashMap<String, LeakyBucketLimiter> LIMITER_MAP = new ConcurrentHashMap<>();
/**
* 静态工厂方法,确保相同的方法使用相同的漏桶实例
*
* @param methodKey 方法名
* @param capacity
* @param leakRate
* @return
*/
public static LeakyBucketLimiter createLimiter(String methodKey, int capacity, int leakRate) {
return LIMITER_MAP.computeIfAbsent(methodKey, k -> new LeakyBucketLimiter(capacity, leakRate));
}
private LeakyBucketLimiter(int capacity, int leakRate) {
this.capacity = capacity;
this.leakRate = leakRate;
}
/**
* 尝试获取许可(try to acquire a permit),如果获取成功返回true,否则返回false
*
* @return
*/
public boolean tryAcquire() {
long currentTime = System.currentTimeMillis();
synchronized (this) {
// 计算上次漏水到当前时间的时间间隔
long leakDuration = currentTime - lastLeakTime;
// 如果时间间隔大于等于1秒,表示漏桶已经漏出一定数量的水
if (leakDuration >= TimeUnit.SECONDS.toMillis(1)) {
// 计算漏出的水量
long leakQuantity = leakDuration / TimeUnit.SECONDS.toMillis(1) * leakRate;
// 漏桶漏出水后,更新桶中的水量,但不能低于0
water = (int) Math.max(0, water - leakQuantity);
lastLeakTime = currentTime;
}
// 判断桶中的水量是否小于容量,如果是则可以继续添加水(相当于获取到令牌)
if (water < capacity) {
water++;
return true;
}
}
// 如果桶满,则获取令牌失败
return false;
}
}
package com.artisan.leakybucket;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@RestController
public class LeakBucketController {
/**
*
*
* @return
*/
@GetMapping("/leakyBucket")
@LeakyBucketRateLimit(capacity = 10, leakRate = 2)
public ResponseEntity leakyBucket() {
return ResponseEntity.ok("leakyBucket test ok!");
}
}
漏桶算法和令牌桶算法最明显的区别是令牌桶算法允许流量一定程度的突发。因为默认的令牌桶算法,取走token是不需要耗费时间的,也就是说,假设桶内有100个token时,那么可以瞬间允许100个请求通过。
令牌桶算法由于实现简单,且允许某些流量的突发,对用户友好,所以被业界采用地较多。当然我们需要具体情况具体分析,只有最合适的算法,没有最优的算法。
使用Guava自带的RateLimiter实现
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.1-jre</version>
</dependency>
令牌桶算法通过一个形象的比喻来描述:想象有一个桶,桶里装有一定数量的令牌。系统会以固定的速率向桶中添加令牌,而每个数据包在发送前都需要从桶中获取一个令牌。如果桶中有足够的令牌,数据包就可以立即发送;如果桶中没有令牌,那么数据包就需要等待,直到桶中有足够的令牌为止。
关键参数:
算法流程:
package com.artisan.tokenbucket;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface TokenBucketRateLimit {
/**
* 产生令牌的速率(xx 个/秒)
*
* @return
*/
double permitsPerSecond();
}
package com.artisan.tokenbucket;
import com.google.common.util.concurrent.RateLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@Aspect
@Component
public class TokenBucketRateLimitAspect {
// 使用ConcurrentHashMap来存储每个方法的限流器
private final ConcurrentHashMap<String, RateLimiter> limiters = new ConcurrentHashMap<>();
// 环绕通知,用于在方法执行前后添加限流逻辑
@Around("@annotation(com.artisan.tokenbucket.TokenBucketRateLimit)")
public Object rateLimit(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取方法签名,用于获取方法信息
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
// 根据方法签名获取方法对象
Method method = signature.getMethod();
// 从方法对象中获取限流注解
TokenBucketRateLimit rateLimit = method.getAnnotation(TokenBucketRateLimit.class);
// 获取注解中定义的每秒令牌数
double permitsPerSecond = rateLimit.permitsPerSecond();
// 获取方法名,作为限流器的唯一标识
String methodName = method.toString();
// 如果限流器缓存中没有该方法的限流器,则创建一个新的
RateLimiter rateLimiter = limiters.computeIfAbsent(methodName, k -> RateLimiter.create(permitsPerSecond));
// 尝试获取令牌,如果可以获取,则继续执行方法
if (rateLimiter.tryAcquire()) {
return joinPoint.proceed();
} else {
// 如果无法获取令牌,则抛出异常,告知用户请求过于频繁
throw new RuntimeException("Too many requests, please try again later.");
}
}
}
package com.artisan.tokenbucket;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 小工匠
* @version 1.0
* @mark: show me the code , change the world
*/
@RestController
public class TokenBucketController {
@GetMapping("/tokenBucket")
@TokenBucketRateLimit(permitsPerSecond = 0.5)
public ResponseEntity tokenBucket() {
return ResponseEntity.ok("artisan token bucket");
}
}
在实施接口限流策略时,应根据具体的业务场景和系统需求,选择合适的限流算法和实现方式,同时注意限流策略对用户体验的影响,做到既能保护系统稳定运行,又不会对合法用户造成过多的困扰。