限流(rate limiter)

发布时间:2023年12月26日

项目中业务提出需求,要求对商品的立即购买接口进行限流。

经过百度及调研,决定在拦截器加限流。拦截器相关讲解见上几篇博客springmv中的拦截器

springmvc限流

RateLimiter

最终的限流代码如下(从百度借鉴,已经找不到出处):

/**
 * spring请求限流器
 * 可对全局请求或url表达式请求进行限流
 * 内部使用spring DispatcherServlet的匹配器PatternsRequestCondition
 * 进行url的匹配。
 *
 * @author ValleyChen
 * @version 1.0.0
 * @time 2017/1/21
 */
public class RateLimitInterceptor implements HandlerInterceptor {

    private RateLimiter globalRateLimiter;

    private Properties urlProperties;

    private Map<PatternsRequestCondition, RateLimiter> urlRateMap;

    private UrlPathHelper urlPathHelper;

    private int globalRate;

    private List<URLLimitMapping> limitMappings;

    public RateLimitInterceptor() {
        this(0);
    }

    /**
     * @param globalRate 全局请求限制qps
     */
    public RateLimitInterceptor(int globalRate) {
        urlPathHelper = new UrlPathHelper();
        this.globalRate = globalRate;
        if (globalRate > 0)
            globalRateLimiter = RateLimiter.create(globalRate);
    }

    /**
     * @param globalRate 全局请求限制qps
     */
    public void setGlobalRate(int globalRate) {
        this.globalRate = globalRate;
        if (globalRate > 0)
            globalRateLimiter = RateLimiter.create(globalRate);
    }

    /**
     * url限流
     *
     * @param urlProperties url表达式->qps propertis
     */
    public void setUrlProperties(Properties urlProperties) {
        if (urlRateMap == null)
            urlRateMap = new ConcurrentHashMap();
        fillRateMap(urlProperties, urlRateMap);
        this.urlProperties = urlProperties;
    }

    /**
     * url限流
     *
     * @param limitMappings url表达式 array->qps mapping
     */
    public void setLimitMappings(List<URLLimitMapping> limitMappings) {
        if (urlRateMap == null)
            urlRateMap = new ConcurrentHashMap();
        fillRateMap(limitMappings, urlRateMap);
        this.limitMappings = limitMappings;
    }

    /**
     * 将url表达转换为PatternsRequestCondition,并生成对应RateLimiter
     * 保存
     *
     * @param controllerProperties
     * @param map
     */
    private void fillRateMap(Properties controllerProperties, Map<PatternsRequestCondition, RateLimiter> map) {
        if (controllerProperties != null) {
            for (String key : controllerProperties.stringPropertyNames()) {
                String value = controllerProperties.getProperty(key);
                if (value.matches("[0-9]*[1-9][0-9]*")) {
                    map.put(new PatternsRequestCondition(key), RateLimiter.create(Double.valueOf(value)));
                } else {
                    LoggerUtil.log(key + " 的值" + value + " 不是一个合法的限制");
                }
            }
        }
    }

    /**
     * 将url表达转换为PatternsRequestCondition,并生成对应RateLimiter
     * 保存
     *
     * @param limitMappings
     * @param map
     */
    private void fillRateMap(List<URLLimitMapping> limitMappings, Map<PatternsRequestCondition, RateLimiter> map) {
        if (limitMappings != null) {
            for (URLLimitMapping mapping : limitMappings) {
                map.put(new PatternsRequestCondition(mapping.getUrls()), RateLimiter.create(Double.valueOf(mapping.getRate())));
            }
        }
    }




    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {


        if (urlRateMap != null) {
            String lookupPath = urlPathHelper.getLookupPathForRequest(request);
            for (PatternsRequestCondition patternsRequestCondition : urlRateMap.keySet()) {
                //使用spring DispatcherServlet的匹配器PatternsRequestCondition进行匹配
                List<String> matches = patternsRequestCondition.getMatchingPatterns(lookupPath);
                if (!matches.isEmpty()) {
                    if (urlRateMap.get(patternsRequestCondition).tryAcquire()) {
                        LoggerUtil.log(lookupPath + " 请求匹配到" + Joiner.on(",").join(patternsRequestCondition.getPatterns()) + "限流器");
                    } else {
                        LoggerUtil.log(lookupPath + " 请求超过" + Joiner.on(",").join(patternsRequestCondition.getPatterns()) + "限流器速率");
                        //return false;
                        throw new ResponseException(500,"busy");
                    }

                }
            }
        }

        //全局限流
        if (globalRateLimiter != null) {
            if (!globalRateLimiter.tryAcquire()) {
                return false;
            }

        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {

    }
}


package com.chen.limit.model;

/**
 * @author ValleyChen
 * @version 1.0.0
 * @time 2017/1/22
 */
public class URLLimitMapping {

    private String[] urls;
    private int rate=0;

    public URLLimitMapping() {
    }

    public URLLimitMapping(String[] urls, int rate) {
        this.urls = urls;
        this.rate = rate;
    }

    public String[] getUrls() {
        return urls;
    }

    public void setUrls(String[] urls) {
        this.urls = urls;
    }

    public int getRate() {
        return rate;
    }

    public void setRate(int rate) {
        this.rate = rate;
    }
}

 

public class RequestLimitInterceptor implements HandlerInterceptor ,BeanPostProcessor{

    private Logger logger = LoggerFactory.getLogger(RequestLimitInterceptor.class);


    private Integer globalRateLimiter = 100;

    private Map<PatternsRequestCondition, RateLimiter> urlRateMap;

    private Properties urlProperties;

    private UrlPathHelper urlPathHelper = new UrlPathHelper();


    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        if (urlRateMap != null) {
            String lookupPath = urlPathHelper.getLookupPathForRequest(request);
            for (PatternsRequestCondition patternsRequestCondition : urlRateMap.keySet()) {
                //使用spring DispatcherServlet的匹配器PatternsRequestCondition进行匹配
                List<String> matches = patternsRequestCondition.getMatchingPatterns(lookupPath);
                if (!matches.isEmpty()) {
                    if (urlRateMap.get(patternsRequestCondition).tryAcquire(1000, TimeUnit.MILLISECONDS)) {
                        logger.info(" 请求'{}'匹配到mathes {} ,成功获取令牌,进入请求。" ,lookupPath ,Joiner.on(",").join(patternsRequestCondition.getPatterns()) );
                    } else {
                        logger.info( " 请求'{}'匹配到mathes {},超过限流速率,获取令牌失败。" ,lookupPath ,Joiner.on(",").join(patternsRequestCondition.getPatterns()));
                        return false;
                    }

                }
            }
        }
        return true;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {

    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {

    }

    /**
     * 限流的 URL与限流值的K/V 值
     *
     * @param urlProperties
     */
    public void setUrlProperties(Properties urlProperties) {
        this.urlProperties = urlProperties;
    }


    public void setGlobalRateLimiter(Integer globalRateLimiter) {
        this.globalRateLimiter = globalRateLimiter;
    }

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if(RequestMappingHandlerMapping.class.isAssignableFrom(bean.getClass())){
            if(urlRateMap==null){
                urlRateMap = new ConcurrentHashMap<>();
            }
            logger.info("we get all the controllers's methods and assign it to urlRateMap");
            RequestMappingHandlerMapping requestMappingHandlerMapping = (RequestMappingHandlerMapping)bean;
            Map<RequestMappingInfo, HandlerMethod> handlerMethods = requestMappingHandlerMapping.getHandlerMethods();
            for (RequestMappingInfo rmi : handlerMethods.keySet()) {
                PatternsRequestCondition pc = rmi.getPatternsCondition();
                urlRateMap.put(pc,RateLimiter.create(globalRateLimiter));
            }
            if(urlProperties!=null){
                for(String urlPatterns :urlProperties.stringPropertyNames()){
                    String limit = urlProperties.getProperty(urlPatterns);
                    if(!limit.matches("^-?\\d+$"))
                        logger.error("the value {} for url patterns {} is not a number ,please check it ",limit,urlPatterns);
                    urlRateMap.put(new PatternsRequestCondition(urlPatterns), RateLimiter.create(Integer.parseInt(limit)));
                }
            }
        }
        return bean;
    }
}
应用
拦截器配置,可以统一配置所有请求的上限,也可以单独对某个 url配置,该拦截器是基于 SpringMvcRequestMappingHandlerMapping获取url 进行操作。
<bean id="requestLimitInterceptor" class="cn.fraudmetrix.creditcloud.app.intercepters.RequestLimitInterceptor">
        <property name="globalRateLimiter" value="100" />
        <property name="urlProperties">
            <props>
                <prop key="/creditcloud/test">100</prop>
            </props>
        </property>
    </bean>

    <!--拦截器配置-->
    <mvc:interceptors>
        <ref bean="requestLimitInterceptor" />
    </mvc:interceptors>

压测结果:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

原理分析

逐行拆解Guava限流器RateLimiter

谈一谈我的理解:
采用令牌桶算法,想象有一个固定容量的桶,以稳定的速率生成令牌放在桶中。当有请求时,需要获取令牌才能通过。因为桶的容量固定,令牌满了就不生产了。桶中有充足的令牌时,突发的流量可以直接获取令牌通过。当令牌取光后,桶空了,后面的请求就得等生成令牌后才能通过,这种情况下请求通过的速率就变得稳定了。

关键词:令牌生成速率固定,能取到令牌即可通过。

令牌桶的优点是可用处理突发流量。

作者用令牌桶的方式卖包子的例子进行解释:

包子铺每天早上7点准时开门,师傅开始制作这60屉小笼包【RateLimiter rateLimiter = RateLimiter.create(60)】。顾客蜂拥而至排队,第一个顾客先付一屉小笼包的钱【rateLimiter.acquire()】,等一分钟后小笼包出锅,便可以进屋吃包子了。这时伙计会和顾客说:“您本次等待了1分钟,久等了。”【rateLimiter.acquire()返回的值】。之后是第二个顾客,付钱,等一分钟,进屋吃包子。于是,早上大家就这样一直稳定有序的进屋吃包子。这事来了个火急火燎的上班族,说:“我着急吃饭,不想等,有现成的包子吗?”【rateLimiter.tryAcquire()】伙计看了一眼后厨说:“没有现成的包子,需要等会哦。”一看还要等,第三个顾客就走了。
早高峰过去后,买包子的人少了,师傅做完60个包子后,就休息了。
中午12点,隔壁科技园的程序员们下班了,顾客蜂拥而至。因为已经有了60个库存,大家不用等,付了钱就可以直接进屋吃包子,屋子里一下就涌入了60个顾客。看到小笼包都买光了,后厨师傅又赶紧忙了起来。
这时来了个大胃王,大嗓门喊道:“我想要30屉包子。”【rateLimiter.acquire(30)】伙计一愣,心想,这一下也做不出这么多包子呀。但是本着客户至上的原则,他对大胃王说:“好的先生,不过我们的包子都是现做的,您先吃着,我们蒸好了陆陆续续给您端上来可以吗?”大胃王说:“哈哈,好!好饭不怕晚!”于是交钱进门吃包子了。这时候又来了个顾客A,说我想要一屉包子。伙计心里一算,现在是一点,等刚才那个大胃王要的包子上齐了,才能给这位顾客上包子,所以需要等到1:30才能给这位顾客吃上,好惨一顾客。于是说:“不好意思这位顾客,小笼包现在没有了,您需要等一段时间。”顾客A说:“好吧,我交了钱等着吧。”于是过了半个小时,后厨做出来了第31屉包子,顾客A就满足的进屋吃包子了。

因此,令牌桶算法有两个参数限制:
1.桶大小(最大令牌数限制)
2.补给率(匀速生成速率)
稍后,博主又介绍了:
两种获取令牌的方法–
acquire是直接获取令牌,如果还没有到达接受下一个请求的时间点,就继续等待。而tryAcquire则如其名,我先试试,如果在我允许的时间范围内获取不到就不等了,如果能获取到那我就等着。

两种实现方式–
SmoothBursty:平滑突发限流,以稳定的速率生成令牌。
SmoothWarmingUp:平滑预热限流,随着请求量的增加,令牌生成速率会缓慢提升直到一个稳定的速率。

Dubbo服务端限流

并发控制
限制com.foo.BarService的每个方法,服务端并发执行(或占用线程池线程数)不能超过10个:

<dubbo:service interface="com.foo.BarService" executes="10" />

限制com.foo.BarService的sayHello方法,服务器并发执行(或占用线程池线程数)不能超过10个。

<dubbo:service interface="com.foo.BarService">
    <dubbo:method name="sayHello" executes="10" />
</dubbo:service>

actives限流
该限流方式与前两种不同,其可以设置在提供端,也可以设置在消费者端。可以设置为接口级别,也可以设置为方法级别。
根据消费者与提供者建立的连接类型,其意义也不同。
长连接: 表示当前的长连接最多可以处理的请求个数。与长连接的数量没有问题。
短连接:表示当前服务可以同时处理的短连接数量。
类级别

<dubbo:service interface="com.foo.BarService" actives="10" />
<dubbo:reference interface="com.foo.BarService" actives="10" />

方法级别

<dubbo:reference interface="com.foo.BarService">
    <dubbo:method name="sayHello" actives="10" />
</dubbo:service>
<dubbo:reference interface="com.foo.BarService">
    <dubbo:method name="sayHello" actives="10" />
</dubbo:service>

Nginx的API限流(http):

API限流(http)
Nginx
对于Nginx接入层限流可以使用Nginx自带了两个模块:
连接数限流模块ngx_http_limit_conn_module和漏桶算法实现的请求限流模块ngx_http_limit_req_module。

文章来源:https://blog.csdn.net/xunboang0059/article/details/132542721
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。