接口限流
一、常见限流算法
流量控制算法也叫流控算法
、限流算法
,主要是为了解决在面对不确定的不稳定的流量冲击下,能够确保系统的稳定运行,如果系统对流量不进行控制,系统很有可能因为大流量的冲击影响系统的稳定性,情况严重时还会导致系统瘫痪。
常用的流控算法分为以下几种:
(1)固定窗口算法
固定窗口算法就是指在固定的时间窗口内按照阈值进行流量控制的算法。
首先维护一个计数器,将单位时间段当作一个窗口,计数器记录这个窗口接收请求的次数。
- 当次数少于限流阈值,就允许访问,并且计数器 + 1。
- 当次数大于限流阈值,拒绝访问。
- 当前时间窗口过去之后,计数器清零。
伪代码实现:
boolean fixedWindowsTryAcquire() { long currentTime = System.currentTimeMillis(); if (currentTime - lastRequestTime > windowUnit) { counter = 0; lastRequestTime = currentTime; } if (counter < threshold) { counter++; return true; } return false; }
|
优点:原理简单,实现容易。
缺点:存在临界问题:如图所示,当8-10s内和10-12s内分别并发500,没有超过阈值;但是如果在8-12s内,则并发数高达1000,已经超过了原先定义的10s内并发不超过500并发。
(2)滑动窗口算法
为了解决固定窗口算法中的临界问题,让限制更加平滑。将固定窗口中分割出多个小时间窗口,分别在每个小的时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小的窗口。
例如接口限流每分钟处理60个请求,就可以把1分钟分为60个窗口,每隔1秒移动一次,每个窗口只能处理不大于 请求数/窗口数 的请求。如果当前窗口的请求计数总和超过了限制的数量的话就不再处理其他请求。
时间片段划分的越多,滑动窗口的滑动就是越平滑,流量控制的也就越精确。
伪代码实现:
private int SUB_CYCLE = 10; private int thresholdPerMin = 100;
private final TreeMap<Long, Integer> counters = new TreeMap<>();
boolean slidingWindowsTryAcquire() { long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / SUB_CYCLE * SUB_CYCLE; int currentWindowNum = countCurrentWindow(currentWindowTime);
if (currentWindowNum >= thresholdPerMin) { return false; }
counters.get(currentWindowTime)++; return true; }
private int countCurrentWindow(long currentWindowTime) { long startTime = currentWindowTime - SUB_CYCLE* (60s/SUB_CYCLE-1); int count = 0;
Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<Long, Integer> entry = iterator.next(); if (entry.getKey() < startTime) { iterator.remove(); } else { count =count + entry.getValue(); } } return count; }
|
优点:实现相对简单,且没有固定窗口的临界问题
缺点:无法应对短时间高并发(突刺现象)
(3)漏桶算法
漏桶算法是定义一个有一定容量的桶,如果桶的容量未满,新的请求就被放入桶内,如果桶的容量满了,新的请求就会被丢弃,漏桶算法通过控制输出速率,平滑网络流量,起到消峰填谷的作用。
- 访问系统请求的流入速率是不确定的
- 桶的容量表示限制处理的请求数
- 如果桶已经满了,达到了限流的阈值,则会拒绝请求
- 系统对请求的处理速率是固定的
漏桶算法由于漏出的速率是固定的,所以在突发流量的情况下,并不能够有效地使用网络资源,这种情况下对于请求的处理就缺乏效率。
伪代码实现:
private long rate; private long currentWater; private long refreshTime; private long capacity;
boolean leakybucketLimitTryAcquire() { long currentTime = System.currentTimeMillis(); long outWater = (currentTime - refreshTime) / 1000 * rate; long currentWater = Math.max(0, currentWater - outWater); refreshTime = currentTime;
if (currentWater < capacity) { currentWater++; return true; } return false; }
|
优点:可以按照固定的速率处理请求。
缺点:无法应对突发的并发流量,因为处理速率是固定的。
应用:消息中间件采用的是漏桶限流的思想。
(4)令牌桶算法
系统以一个恒定的频率生成令牌(token)放入桶内,当有请求需要被处理时,需要从桶里拿出一个或多个令牌(token),当桶里的令牌不足时,则新的请求将会被拒绝。
与漏桶不同的是,桶里放入的是令牌,而漏桶放入的是请求,当出现突发流量时,只要桶内的令牌足够时请求就可以得到处理的机会(这里说机会,主要还是要看系统的处理能力)。
伪代码实现
private long putTokenRate; private long refreshTime; private long capacity; private long currentToken = 0L;
boolean tokenBucketTryAcquire() {
long currentTime = System.currentTimeMillis(); long generateToken = (currentTime - refreshTime) / 1000 * putTokenRate; currentToken = Math.min(capacity, generateToken + currentToken); refreshTime = currentTime; if (currentToken > 0) { currentToken--; return true; } return false; }
|
优点:可以向漏桶那样匀速,也可以像计数器那样突发处理请求。
二、限流设计实现
(1)Redis + Lua实现令牌桶算法
常用依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.12</version> </dependency>
|
自定义注解,用于标注限流的方法
@Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { String key() default ""; int time() default 1; int count(); boolean ipLimit() default false; }
|
编写lua脚本
lua脚本的逻辑:
- 首先根据传入的key(接口路径)查询对应的value(令牌数)。
- 如果为null,说明是第一次访问,初始化令牌桶以及数量,记录初始化时间,返回剩余令牌数。
- 如果不为null,则继续判断。
- 判断value令牌数是否大于1。
- 如果大于1,则value-1,返回剩余令牌数。
- 如果小于1,则判断补充令牌的时间间隔是否足够。
- 如果足够,则补充令牌并且更新补充令牌的时间,返回剩余令牌数。
- 如果不足够,说明超过限流访问次数,返回-1。
(注意:脚本可能逻辑有点问题,待后续修正。)
redis.replicate_commands();
local key = KEYS[1]
local update_len = tonumber(ARGV[1])
local key_time = 'ratetokenprefix'..key
local curr_time_arr = redis.call('TIME')
local nowTime = tonumber(curr_time_arr[1])
local curr_key_time = tonumber(redis.call('get',KEYS[1]) or 0)
local token_count = tonumber(redis.call('get',KEYS[1]) or -1)
local token_size = tonumber(ARGV[2])
if token_count < 0 then redis.call('set',key_time,nowTime) redis.call('set',key,token_size -1) return token_size -1 else if token_count > 0 then redis.call('set',key,token_count - 1) return token_count - 1 else if curr_key_time + update_len < nowTime then redis.call('set',key,token_size -1) return token_size - 1 else return -1 end end end
|
Redis配置类,读取lua脚本
@Component public class CommonConfig {
@Bean public DefaultRedisScript<Long> redisLuaScript() { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua"))); redisScript.setResultType(Long.class); return redisScript; }
@Bean public RedisTemplate<String, Serializable> limitRedisTemplate(LettuceConnectionFactory factory) { RedisTemplate<String, Serializable> template = new RedisTemplate<>(); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setConnectionFactory(factory); return template; } }
|
自定义拦截器拦截带有注解的方法,执行相应逻辑
@Component public class RateLimitInterceptor implements HandlerInterceptor { private final Logger LOG = LoggerFactory.getLogger(this.getClass());
@Autowired private RedisTemplate<String, Serializable> limitRedisTemplate;
@Autowired private DefaultRedisScript<Long> redisLuaScript;
@Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { assert handler instanceof HandlerMethod; HandlerMethod method = (HandlerMethod) handler; RateLimit rateLimit = method.getMethodAnnotation(RateLimit.class); if (rateLimit != null) { int count = rateLimit.count(); String key = rateLimit.key(); int time = rateLimit.time(); boolean ipLimit = rateLimit.ipLimit(); StringBuilder sb = new StringBuilder(); sb.append(Constants.RATE_LIMIT_KEY).append(key).append(":"); List<String> keys = Collections.singletonList(sb.toString()); Long execute = limitRedisTemplate.execute(redisLuaScript, keys, time, count); assert execute != null; int value = execute.intValue(); System.out.println("执行的value == " + value); if (-1 == value) { Result resultModel = Result.fail("接口调用超过限流次数"); response.setStatus(199); response.setCharacterEncoding("utf-8"); response.setContentType("application/json"); response.getWriter().write(JSONUtil.toJsonStr(resultModel)); response.getWriter().flush(); response.getWriter().close(); LOG.info("当前接口调用超过时间段内限流,key:{}", sb.toString()); return false; } else { LOG.info("当前访问时间段内剩余{}次访问次数", execute.toString()); } } return true; } public static String getIpAddr(HttpServletRequest request) { String ipAddress = null; try { ipAddress = request.getHeader("x-forwarded-for"); if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("WL-Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getRemoteAddr(); } if (ipAddress != null && ipAddress.length() > 15) { if (ipAddress.indexOf(",") > 0) { ipAddress = ipAddress.substring(0, ipAddress.indexOf(",")); } } } catch (Exception e) { ipAddress = ""; } return ipAddress; } }
|
注册拦截器实现
@Configuration @EnableWebMvc public class WebConfig extends WebMvcConfigurerAdapter {
@Autowired private RateLimitInterceptor rateLimitInterceptor;
@Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(rateLimitInterceptor); super.addInterceptors(registry); } }
|
一个测试案例
@RestController @RequestMapping("/") public class TestController { @RateLimit(key = "testGet", time = 1, count = 5, ipLimit = false) @RequestMapping("/get") public Result testGet() { return Result.success(); } }
|
其他,常量以及统一结果返回类
public class Constants { public static final String RATE_LIMIT_KEY = "rateLimit:"; }
|
@Data @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public class Result { private Integer code; private String message; private Boolean success; private Object data; ... }
|
(2)使用google的guava实现令牌桶算法
引入依赖
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency>
|
自定义限流注解
@Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) @Documented public @interface Limit { String key() default ""; double permitsPerSecond () ; long timeout(); TimeUnit timeunit() default TimeUnit.MILLISECONDS; String msg() default "系统繁忙,请稍后再试."; }
|
使用AOP切面拦截限流注解
@Slf4j @Aspect @Component public class LimitAop {
private final Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();
@Around("@annotation(com.example.redislimit.annotation.Limit)") public Object around(ProceedingJoinPoint joinPoint) throws Throwable { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); Limit limit = method.getAnnotation(Limit.class); if (limit != null) { String key = limit.key(); RateLimiter rateLimiter = null; if (!limitMap.containsKey(key)) { rateLimiter = RateLimiter.create(limit.permitsPerSecond()); limitMap.put(key, rateLimiter); log.info("新建了令牌桶={},容量={}", key, limit.permitsPerSecond()); } rateLimiter = limitMap.get(key); boolean acquire = rateLimiter.tryAcquire(limit.timeout(), limit.timeunit()); if (!acquire) { log.debug("令牌桶={},获取令牌失败", key); this.responseFail(limit.msg()); return null; } } return joinPoint.proceed(); }
private void responseFail(String msg) { HttpServletResponse response = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse(); Result resultData = Result.fail(msg); writeJson(response, resultData); }
private void writeJson(HttpServletResponse response, Result result) { response.setContentType("application/json;charset=utf-8"); response.setHeader("Access-Control-Allow-Origin", "*"); response.setHeader("Access-Control-Allow-Method", "POST,GET"); ServletOutputStream out = null; try { out = response.getOutputStream(); out.write(new ObjectMapper().writeValueAsString(result).getBytes(StandardCharsets.UTF_8)); out.flush(); out.close(); } catch (Exception e) { throw new RuntimeException(e.getMessage()); } } }
|
使用限流注解进行测试
@Limit(key = "get", permitsPerSecond = 1, timeout = 500, timeunit = TimeUnit.MICROSECONDS) @RequestMapping("/get") public Result testGet() { return Result.success(); }
|