接口限流实践篇

接口限流

一、常见限流算法

流量控制算法也叫流控算法限流算法,主要是为了解决在面对不确定的不稳定的流量冲击下,能够确保系统的稳定运行,如果系统对流量不进行控制,系统很有可能因为大流量的冲击影响系统的稳定性,情况严重时还会导致系统瘫痪。

常用的流控算法分为以下几种:

  • 固定窗口算法
  • 滑动窗口算法
  • 漏桶算法
  • 令牌桶算法

(1)固定窗口算法

固定窗口算法就是指在固定的时间窗口内按照阈值进行流量控制的算法。

image-20240105152240489

首先维护一个计数器,将单位时间段当作一个窗口,计数器记录这个窗口接收请求的次数。

  • 当次数少于限流阈值,就允许访问,并且计数器 + 1。
  • 当次数大于限流阈值,拒绝访问。
  • 当前时间窗口过去之后,计数器清零。

伪代码实现:

/**
* 固定窗口时间算法
* @return
*/
boolean fixedWindowsTryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
if (currentTime - lastRequestTime > windowUnit) { //检查是否在时间窗口内
counter = 0; // 计数器清0
lastRequestTime = currentTime; //开启新的时间窗口
}
if (counter < threshold) { // 小于阀值
counter++; //计数器加1
return true;
}
return false;
}

优点:原理简单,实现容易。

缺点:存在临界问题:如图所示,当8-10s内和10-12s内分别并发500,没有超过阈值;但是如果在8-12s内,则并发数高达1000,已经超过了原先定义的10s内并发不超过500并发。

image-20240105152308466

(2)滑动窗口算法

为了解决固定窗口算法中的临界问题,让限制更加平滑。将固定窗口中分割出多个小时间窗口,分别在每个小的时间窗口中记录访问次数,然后根据时间将窗口往前滑动并删除过期的小的窗口

image-20240105152319437

例如接口限流每分钟处理60个请求,就可以把1分钟分为60个窗口,每隔1秒移动一次,每个窗口只能处理不大于 请求数/窗口数 的请求。如果当前窗口的请求计数总和超过了限制的数量的话就不再处理其他请求。

时间片段划分的越多,滑动窗口的滑动就是越平滑,流量控制的也就越精确。

伪代码实现:

   private int SUB_CYCLE = 10; // 单位时间划分的小周期(单位时间是1分钟,10s一个小格子窗口,一共6个格子)
private int thresholdPerMin = 100; // 每分钟限流请求数
// 计数器, k-为当前窗口的开始时间值秒,value为当前窗口的计数
private final TreeMap<Long, Integer> counters = new TreeMap<>();

/**
* 滑动窗口时间算法实现
*/
boolean slidingWindowsTryAcquire() {
long currentWindowTime = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC) / SUB_CYCLE * SUB_CYCLE; //获取当前时间在哪个小周期窗口
int currentWindowNum = countCurrentWindow(currentWindowTime); //当前窗口总请求数

//超过阀值限流
if (currentWindowNum >= thresholdPerMin) {
return false;
}

//计数器+1
counters.get(currentWindowTime)++;
return true;
}

/**
* 统计当前窗口的请求数
*/
private int countCurrentWindow(long currentWindowTime) {
//计算窗口开始位置
long startTime = currentWindowTime - SUB_CYCLE* (60s/SUB_CYCLE-1);
int count = 0;

//遍历存储的计数器
Iterator<Map.Entry<Long, Integer>> iterator = counters.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Long, Integer> entry = iterator.next();
// 删除无效过期的子窗口计数器
if (entry.getKey() < startTime) {
iterator.remove();
} else {
//累加当前窗口的所有计数器之和
count =count + entry.getValue();
}
}
return count;
}

优点:实现相对简单,且没有固定窗口的临界问题

缺点:无法应对短时间高并发(突刺现象)

(3)漏桶算法

漏桶算法是定义一个有一定容量的桶,如果桶的容量未满,新的请求就被放入桶内,如果桶的容量满了,新的请求就会被丢弃,漏桶算法通过控制输出速率,平滑网络流量,起到消峰填谷的作用。

  • 访问系统请求的流入速率是不确定的
  • 桶的容量表示限制处理的请求数
  • 如果桶已经满了,达到了限流的阈值,则会拒绝请求
  • 系统对请求的处理速率是固定的

image-20240111221309621

漏桶算法由于漏出的速率是固定的,所以在突发流量的情况下,并不能够有效地使用网络资源,这种情况下对于请求的处理就缺乏效率。

伪代码实现:

private long rate; // 每秒处理数(出水率)
private long currentWater; // 当前剩余水量
private long refreshTime; // 最后刷新时间
private long capacity; // 桶容量

/**
* 漏桶算法
* @return
*/
boolean leakybucketLimitTryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
long outWater = (currentTime - refreshTime) / 1000 * rate; //流出的水量 =(当前时间-上次刷新时间)* 出水率
long currentWater = Math.max(0, currentWater - outWater); // 当前水量 = 之前的桶内水量-流出的水量
refreshTime = currentTime; // 刷新时间

// 当前剩余水量还是小于桶的容量,则请求放行
if (currentWater < capacity) {
currentWater++;
return true;
}
// 当前剩余水量大于等于桶的容量,限流
return false;
}

优点:可以按照固定的速率处理请求。

缺点:无法应对突发的并发流量,因为处理速率是固定的。

应用:消息中间件采用的是漏桶限流的思想。

(4)令牌桶算法

系统以一个恒定的频率生成令牌(token)放入桶内,当有请求需要被处理时,需要从桶里拿出一个或多个令牌(token),当桶里的令牌不足时,则新的请求将会被拒绝。

与漏桶不同的是,桶里放入的是令牌,而漏桶放入的是请求,当出现突发流量时,只要桶内的令牌足够时请求就可以得到处理的机会(这里说机会,主要还是要看系统的处理能力)。

image-20240111221328615

伪代码实现

private long putTokenRate; // 每秒处理数(放入令牌数量)
private long refreshTime; // 最后刷新时间
private long capacity; // 令牌桶容量
private long currentToken = 0L; // 当前桶内令牌数

/**
* 漏桶算法
* @return
*/
boolean tokenBucketTryAcquire() {

long currentTime = System.currentTimeMillis(); //获取系统当前时间
long generateToken = (currentTime - refreshTime) / 1000 * putTokenRate; //生成的令牌 =(当前时间-上次刷新时间)* 放入令牌的速率
currentToken = Math.min(capacity, generateToken + currentToken); // 当前令牌数量 = 之前的桶内令牌数量+放入的令牌数量
refreshTime = currentTime; // 刷新时间

//桶里面还有令牌,请求正常处理
if (currentToken > 0) {
currentToken--; //令牌数量-1
return true;
}

return false;
}

优点:可以向漏桶那样匀速,也可以像计数器那样突发处理请求。

二、限流设计实现

(1)Redis + Lua实现令牌桶算法

常用依赖

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.12</version>
</dependency>

自定义注解,用于标注限流的方法

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {
String key() default ""; // 限流唯一标识
int time() default 1; // 限流单位时间,默认为1,单位s
int count(); // 单位内时间内限制访问次数
boolean ipLimit() default false; // 是否限制
}

编写lua脚本

lua脚本的逻辑:

  • 首先根据传入的key(接口路径)查询对应的value(令牌数)。
  • 如果为null,说明是第一次访问,初始化令牌桶以及数量,记录初始化时间,返回剩余令牌数。
  • 如果不为null,则继续判断。
    • 判断value令牌数是否大于1。
      • 如果大于1,则value-1,返回剩余令牌数。
      • 如果小于1,则判断补充令牌的时间间隔是否足够。
        • 如果足够,则补充令牌并且更新补充令牌的时间,返回剩余令牌数。
        • 如果不足够,说明超过限流访问次数,返回-1。

(注意:脚本可能逻辑有点问题,待后续修正。)

redis.replicate_commands();
-- 参数中传递的key
local key = KEYS[1]
-- 令牌桶填充 最小时间间隔
local update_len = tonumber(ARGV[1])
-- 记录 当前key上次更新令牌桶的时间的 key
local key_time = 'ratetokenprefix'..key
-- 获取当前时间(这里的curr_time_arr 中第一个是 秒数,第二个是 秒数后毫秒数),由于我是按秒计算的,这里只要curr_time_arr[1](注意:redis数组下标是从1开始的)
--如果需要获得毫秒数 则为 tonumber(arr[1]*1000 + arr[2])
local curr_time_arr = redis.call('TIME')
-- 当前时间秒数
local nowTime = tonumber(curr_time_arr[1])
-- 从redis中获取当前key 对应的上次更新令牌桶的key 对应的value
local curr_key_time = tonumber(redis.call('get',KEYS[1]) or 0)
-- 获取当前key对应令牌桶中的令牌数
local token_count = tonumber(redis.call('get',KEYS[1]) or -1)
-- 当前令牌桶的容量
local token_size = tonumber(ARGV[2])
-- 令牌桶数量小于0 说明令牌桶没有初始化
if token_count < 0 then
redis.call('set',key_time,nowTime)
redis.call('set',key,token_size -1)
return token_size -1
else
if token_count > 0 then --当前令牌桶中令牌数够用
redis.call('set',key,token_count - 1)
return token_count - 1 --返回剩余令牌数
else --当前令牌桶中令牌数已清空
--判断一下,当前时间秒数 与上次更新时间秒数 的间隔,是否大于规定时间间隔数 (update_len)
if curr_key_time + update_len < nowTime then
redis.call('set',key,token_size -1)
return token_size - 1
else
return -1
end
end
end

Redis配置类,读取lua脚本

@Component
public class CommonConfig {

/**
* 读取限流脚本
* @return
*/
@Bean
public DefaultRedisScript<Long> redisLuaScript() {
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("limit.lua")));
redisScript.setResultType(Long.class);
return redisScript;
}

@Bean
public RedisTemplate<String, Serializable> limitRedisTemplate(LettuceConnectionFactory factory) {
RedisTemplate<String, Serializable> template = new RedisTemplate<>();
template.setKeySerializer(new StringRedisSerializer());
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
template.setConnectionFactory(factory);
return template;
}
}

自定义拦截器拦截带有注解的方法,执行相应逻辑

@Component
public class RateLimitInterceptor implements HandlerInterceptor {
private final Logger LOG = LoggerFactory.getLogger(this.getClass());

@Autowired
private RedisTemplate<String, Serializable> limitRedisTemplate;

@Autowired
private DefaultRedisScript<Long> redisLuaScript;

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
assert handler instanceof HandlerMethod;
HandlerMethod method = (HandlerMethod) handler;
RateLimit rateLimit = method.getMethodAnnotation(RateLimit.class);
//当前方法上有我们自定义的注解
if (rateLimit != null) {
//获得单位时间内限制的访问次数
int count = rateLimit.count();
String key = rateLimit.key();
//获得限流单位时间(单位为s)
int time = rateLimit.time();
boolean ipLimit = rateLimit.ipLimit();
//拼接 redis中的key
StringBuilder sb = new StringBuilder();
sb.append(Constants.RATE_LIMIT_KEY).append(key).append(":");
List<String> keys = Collections.singletonList(sb.toString());
//执行lua脚本
Long execute = limitRedisTemplate.execute(redisLuaScript, keys, time, count);
assert execute != null;
int value = execute.intValue();
System.out.println("执行的value == " + value);
if (-1 == value) {
Result resultModel = Result.fail("接口调用超过限流次数");
response.setStatus(199);
response.setCharacterEncoding("utf-8");
response.setContentType("application/json");
response.getWriter().write(JSONUtil.toJsonStr(resultModel));
response.getWriter().flush();
response.getWriter().close();
LOG.info("当前接口调用超过时间段内限流,key:{}", sb.toString());
return false;
} else {
LOG.info("当前访问时间段内剩余{}次访问次数", execute.toString());
}
}
return true;
}

// 获取用户ip,可以开启ip限制
public static String getIpAddr(HttpServletRequest request) {
String ipAddress = null;
try {
ipAddress = request.getHeader("x-forwarded-for");
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
ipAddress = request.getHeader("Proxy-Client-IP");
}
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
ipAddress = request.getHeader("WL-Proxy-Client-IP");
}
if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
ipAddress = request.getRemoteAddr();
}
// 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
// "***.***.***.***".length()
if (ipAddress != null && ipAddress.length() > 15) {
// = 15
if (ipAddress.indexOf(",") > 0) {
ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
}
}
} catch (Exception e) {
ipAddress = "";
}
return ipAddress;
}
}

注册拦截器实现

@Configuration
@EnableWebMvc
public class WebConfig extends WebMvcConfigurerAdapter {

@Autowired
private RateLimitInterceptor rateLimitInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(rateLimitInterceptor);
super.addInterceptors(registry);
}
}

一个测试案例

@RestController
@RequestMapping("/")
public class TestController {
// 限流规则,1秒内只允许同一个接口访问5次
@RateLimit(key = "testGet", time = 1, count = 5, ipLimit = false)
@RequestMapping("/get")
public Result testGet() {
return Result.success();
}
}

其他,常量以及统一结果返回类

public class Constants {
public static final String RATE_LIMIT_KEY = "rateLimit:";
}
@Data
@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
public class Result {
private Integer code;
private String message;
private Boolean success;
private Object data;
...
}

(2)使用google的guava实现令牌桶算法

引入依赖

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

自定义限流注解

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface Limit {
String key() default ""; // 限流接口的唯一标识
double permitsPerSecond () ; // 最多的访问限制次数
long timeout(); // 获取令牌最大等待时间
TimeUnit timeunit() default TimeUnit.MILLISECONDS; // 获取令牌最大等待时间,单位(例:分钟/秒/毫秒) 默认:毫秒
String msg() default "系统繁忙,请稍后再试."; // 得不到令牌的提示语
}

使用AOP切面拦截限流注解

@Slf4j
@Aspect
@Component
public class LimitAop {
/**
* 不同的接口,不同的流量控制
* map的key为 Limiter.key
*/
private final Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();

@Around("@annotation(com.example.redislimit.annotation.Limit)")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
//拿limit的注解
Limit limit = method.getAnnotation(Limit.class);
if (limit != null) {
//key作用:不同的接口,不同的流量控制
String key = limit.key();
RateLimiter rateLimiter = null;
//验证缓存是否有命中key
if (!limitMap.containsKey(key)) {
// 创建令牌桶
rateLimiter = RateLimiter.create(limit.permitsPerSecond());
limitMap.put(key, rateLimiter);
log.info("新建了令牌桶={},容量={}", key, limit.permitsPerSecond());
}
rateLimiter = limitMap.get(key);
// 拿令牌
boolean acquire = rateLimiter.tryAcquire(limit.timeout(), limit.timeunit());
// 拿不到命令,直接返回异常提示
if (!acquire) {
log.debug("令牌桶={},获取令牌失败", key);
this.responseFail(limit.msg());
return null;
}
}
return joinPoint.proceed();
}

/**
* 直接向前端抛出异常
*
* @param msg 提示信息
*/
private void responseFail(String msg) {
HttpServletResponse response = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
Result resultData = Result.fail(msg);
writeJson(response, resultData);
}

private void writeJson(HttpServletResponse response, Result result) {
response.setContentType("application/json;charset=utf-8");
response.setHeader("Access-Control-Allow-Origin", "*");
response.setHeader("Access-Control-Allow-Method", "POST,GET");
ServletOutputStream out = null;
try {
//输出Json
out = response.getOutputStream();
out.write(new ObjectMapper().writeValueAsString(result).getBytes(StandardCharsets.UTF_8));
out.flush();
out.close();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
}

使用限流注解进行测试

@Limit(key = "get", permitsPerSecond = 1, timeout = 500, timeunit = TimeUnit.MICROSECONDS)
@RequestMapping("/get")
public Result testGet() {
return Result.success();
}