接口限流是对某一时间窗口内的请求数进行限制,以保持系统的可用性和稳定性,防止因流量暴增而导致的系统运行缓慢或宕机。此外,接口限流也可以通过限制每个用户或每个接口调用的频率和并发数,来控制对服务资源的访问。
大量正常用户高频访问导致服务器宕机
恶意用户高频访问导致服务器宕机
网页爬虫 ,对于这些情况我们需要对用户的访问进行限流访问
限流发生在流量进来之前,超过的流量进行限制。
熔断是一种应对故障的机制,发生在流量进来之后,如果系统发生故障或者异常,熔断会自动切断请求,防止故障进一步扩展,导致服务雪崩。
削峰是对流量的平滑处理,通过缓慢地增加请求的处理速率来避免系统瞬时过载。
削峰大概就是水库,把流量储存起来,慢慢流,限流大概就是闸口,拒绝超出的流量。
在Java中,可以使用信号量(Semaphore)机制来控制并发数量。信号量是一个计数器,用于限制对共享资源的访问。以下是一个使用信号量机制控制并发数量的示例:
import java.util.concurrent.Semaphore;
public class ConcurrencyControlExample {
private static final int MAX_CONCURRENT_THREADS = 5; // 最大并发线程数
private static Semaphore semaphore = new Semaphore(MAX_CONCURRENT_THREADS);
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new WorkerThread("" + i)).start();
}
}
static class WorkerThread implements Runnable {
private String command;
public WorkerThread(String s) {
this.command = s;
}
@Override
public void run() {
try {
// 获取信号量,如果信号量为0,则当前线程需要等待
semaphore.acquire();
// 执行任务
System.out.println(Thread.currentThread().getName() + "开始处理:" + command);
processCommand();
System.out.println(Thread.currentThread().getName() + "结束处理:" + command);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放信号量,允许其他线程获取信号量并执行任务
semaphore.release();
}
}
private void processCommand() {
try {
Thread.sleep(2000); // 模拟耗时任务
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
在工程实践中,常见的是使用令牌桶算法来实现这种模式,常用的限流算法有两种:漏桶算法和令牌桶算法。
如图所示,令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务,令牌桶算法通过发放令牌,根据令牌的rate频率做请求频率限制,容量限制等。
在Wikipedia上,令牌桶算法是这么描述的:
1、每过1/r秒桶中增加一个令牌。
2、桶中最多存放b个令牌,如果桶满了,新放入的令牌会被丢弃。
3、当一个n字节的数据包到达时,消耗n个令牌,然后发送该数据包。
4、如果桶中可用令牌小于n,则该数据包将被缓存或丢弃。
令牌桶控制的是一个时间窗口内通过的数据量,在API层面我们常说的QPS、TPS,正好是一个时间窗口内的请求量或者事务量,只不过时间窗口限定在1s罢了。以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶的另外一个好处是可以方便的改变速度,一旦需要提高速率,则按需提高放入桶中的令牌的速率。
在我们的工程实践中,通常使用Google开源工具包Guava提供的限流工具类RateLimiter来实现控制速率,该类基于令牌桶算法来完成限流,非常易于使用,而且非常高效。如我们不希望每秒的任务提交超过1个
public static void main(String[] args) {
String start = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
RateLimiter limiter = RateLimiter.create(1.0); // 这里的1表示每秒允许处理的量为1个
for (int i = 1; i <= 10; i++) {
double waitTime = limiter.acquire(i);// 请求RateLimiter, 超过permits会被阻塞
System.out.println("cutTime=" + System.currentTimeMillis() + " call execute:" + i + " waitTime:" + waitTime);
}
String end = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
System.out.println("start time:" + start);
System.out.println("end time:" + end);
}
首先通过RateLimiter.create(1.0);创建一个限流器,参数代表每秒生成的令牌数,通过limiter.acquire(i);来以阻塞的方式获取令牌,令牌桶算法允许一定程度的突发(允许消费未来的令牌),所以可以一次性消费i个令牌;当然也可以通过tryAcquire(int permits, long timeout, TimeUnit unit)来设置等待超时时间的方式获取令牌,如果超timeout为0,则代表非阻塞,获取不到立即返回,支持阻塞或可超时的令牌消费。
从输出来看,RateLimiter支持预消费,比如在acquire(5)时,等待时间是4秒,是上一个获取令牌时预消费了3个两排,固需要等待3*1秒,然后又预消费了5个令牌,以此类推。
ateLimiter通过限制后面请求的等待时间,来支持一定程度的突发请求(预消费),在使用过程中需要注意这一点,Guava有两种限流模式,一种为稳定模式(SmoothBursty:令牌生成速度恒定,平滑突发限流),一种为渐进模式(SmoothWarmingUp:令牌生成速度缓慢提升直到维持在一个稳定值,平滑预热限流) 两种模式实现思路类似,主要区别在等待时间的计算上。
注:RateLimiter控制的是速率,Samephore控制的是并发量。RateLimiter的原理就是令牌桶,它主要由许可发出的速率来定义,如果没有额外的配置,许可证将按每秒许可证规定的固定速度分配,许可将被平滑地分发,若请求超过permitsPerSecond则RateLimiter按照每秒 1/permitsPerSecond 的速率释放许可。注意:RateLimiter适用于单体应用,且RateLimiter不保证公平性访问。
使用上述方式使用RateLimiter的方式不够优雅,自定义注解+AOP的方式实现(适用于单体应用),详细见下面代码:
自定义注解:
import java.lang.annotation.*;
/**
* 自定义注解可以不包含属性,成为一个标识注解
*/
@Inherited
@Documented
@Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimitAspect {
}
自定义切面类
import com.google.common.util.concurrent.RateLimiter;
import com.test.cn.springbootdemo.util.ResultUtil;
import net.sf.json.JSONObject;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@Component
@Scope
@Aspect
public class RateLimitAop {
@Autowired
private HttpServletResponse response;
private RateLimiter rateLimiter = RateLimiter.create(5.0); //比如说,我这里设置"并发数"为5
@Pointcut("@annotation(com.test.cn.springbootdemo.aspect.RateLimitAspect)")
public void serviceLimit() {
}
@Around("serviceLimit()")
public Object around(ProceedingJoinPoint joinPoint) {
Boolean flag = rateLimiter.tryAcquire();
Object obj = null;
try {
if (flag) {
obj = joinPoint.proceed();//这个方法用于执行原来的方法或继续原来的控制流程。
}else{
String result = JSONObject.fromObject(ResultUtil.success1(100, "failure")).toString();
output(response, result);
}
} catch (Throwable e) {
e.printStackTrace();
}
System.out.println("flag=" + flag + ",obj=" + obj);
return obj;
}
public void output(HttpServletResponse response, String msg) throws IOException {
response.setContentType("application/json;charset=UTF-8");
ServletOutputStream outputStream = null;
try {
outputStream = response.getOutputStream(); //这行代码获取了与当前HTTP响应关联的输出流,并将其赋值给outputStream变量。
outputStream.write(msg.getBytes("UTF-8"));//这部分将转换后的字节数组写入到之前获取的输出流中,这意味着数据将被发送到客户端。
} catch (IOException e) {
e.printStackTrace();
} finally {
outputStream.flush();
outputStream.close();
}
}
}
测试controller
import com.test.cn.springbootdemo.aspect.RateLimitAspect;
import com.test.cn.springbootdemo.util.ResultUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
public class TestController {
@ResponseBody
@RateLimitAspect
@RequestMapping("/test")
public String test(){
return ResultUtil.success1(1001, "success").toString();
}
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
算法实现:
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {
// 计算的起始时间
private static long lastOutTime = System.currentTimeMillis();
// 流出速率 每秒 2 次
private static int leakRate = 2;
// 桶的容量
private static int capacity = 2;
//剩余的水量
private static AtomicInteger water = new AtomicInteger(0);
//返回值说明:
// false 没有被限制到
// true 被限流
public static synchronized boolean isLimit(long taskId, int turn) {
// 如果是空桶,就当前时间作为漏出的时间
if (water.get() == 0) {
lastOutTime = System.currentTimeMillis();
water.addAndGet(1);
return false;
}
// 执行漏水
int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
// 计算剩余水量
int waterLeft = water.get() - waterLeaked;
water.set(Math.max(0, waterLeft));
// 重新更新leakTimeStamp
lastOutTime = System.currentTimeMillis();
// 尝试加水,并且水还未满 ,放行
if ((water.get()) < capacity) {
water.addAndGet(1);
return false;
} else {
// 水满,拒绝加水, 限流
return true;
}
}
//线程池,用于多线程模拟测试
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLimit() {
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
final int threads = 2;
// 每条线程的执行轮数
final int turns = 20;
// 线程同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() -> {
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
boolean intercepted = isLimit(taskId, j);
if (intercepted) {
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));
log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
log.info("运行的时长为:" + time);
}
}
分布式限流
自定义注解+拦截器+Redis实现限流 (单体和分布式均适用,全局限流)
自定义注解:
@Inherited
@Documented
@Target({ElementType.FIELD,ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface AccessLimit {
int limit() default 5;
int sec() default 5;
}
拦截器:
public class AccessLimitInterceptor implements HandlerInterceptor {
@Autowired
private RedisTemplate<String, Integer> redisTemplate; //使用RedisTemplate操作redis
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod) {
HandlerMethod handlerMethod = (HandlerMethod) handler;
Method method = handlerMethod.getMethod();
if (!method.isAnnotationPresent(AccessLimit.class)) {
return true;
}
AccessLimit accessLimit = method.getAnnotation(AccessLimit.class);
if (accessLimit == null) {
return true;
}
int limit = accessLimit.limit();
int sec = accessLimit.sec();
String key = IPUtil.getIpAddr(request) + request.getRequestURI();
Integer maxLimit = redisTemplate.opsForValue().get(key);
if (maxLimit == null) {
redisTemplate.opsForValue().set(key, 1, sec, TimeUnit.SECONDS); //set时一定要加过期时间
} else if (maxLimit < limit) {
redisTemplate.opsForValue().set(key, maxLimit + 1, sec, TimeUnit.SECONDS);
} else {
output(response, "请求太频繁!");
return false;
}
}
return true;
}
public void output(HttpServletResponse response, String msg) throws IOException {
response.setContentType("application/json;charset=UTF-8");
ServletOutputStream outputStream = null;
try {
outputStream = response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
} finally {
outputStream.flush();
outputStream.close();
}
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
}
}
controller:
@Controller
@RequestMapping("/activity")
public class AopController {
@ResponseBody
@RequestMapping("/seckill")
@AccessLimit(limit = 4,sec = 10) //加上自定义注解即可
public String test (HttpServletRequest request,@RequestParam(value = "username",required = false) String userName){
//TODO somethings……
return "hello world !";
}
}
配置文件:
/*springmvc的配置文件中加入自定义拦截器*/
<mvc:interceptors>
<mvc:interceptor>
<mvc:mapping path="/**"/>
<bean class="com.pptv.activityapi.controller.pointsmall.AccessLimitInterceptor"/>
</mvc:interceptor>
</mvc:interceptors>