Dubbo过滤器 - TpsLimitFilter

概述

TpsLimitFilter 是服务提供端对 TPS 限流的实现。该过滤器的限流是基于令牌的,本质上是计数器限流的实现方式,即一个时间段内只分配 N 个令牌,每个请求过来都会消耗一个令牌,耗完即止,后面再来的请求都会被拒绝。计数器算法简单粗暴,易于实现。但是缺点也是很大的,容易造成前一个时间段非常忙碌,下一时间段又非常空闲。

配置

1
2
3
4
<!-- 每次发放 100 个令牌 -->
<dubbo:parameter key="tps" value="100" />
<!-- 令牌刷新的间隔是 1s,如果不配置则默认 60s -->
<dubbo:parameter key="tps.interval" value="1000" />

将以上的配置项添加到 <dubbo:provider/><dubbo:service/><dubbo:protocol/> 中开启即可。

注意,目前 Dubbo Filter 的 SPI 配置文件中并没有配置 TpsLimitFilter ,如果需要使用则配置:

1
2
# com.alibaba.dubbo.rpc.Filter 文件
tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

TpsLimitFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
/**
* 限流器
*/
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 根据tps限流规则判断是否限制此次调用,如果是就抛出异常。目前使用 TPSLimiter作为限流器的实现类
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
"Failed to invoke service " +
invoker.getInterface().getName() +
"." +
invocation.getMethodName() +
" because exceed max service tps.");
}
return invoker.invoke(invocation);
}
}

TPSLimiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface TPSLimiter {

/**
* 根据 tps 限流规则判断是否限制此次调用
* <p>
* judge if the current invocation is allowed by TPS rule
*
* @param url url
* @param invocation invocation
* @return true allow the current invocation, otherwise, return false
*/
boolean isAllowable(URL url, Invocation invocation);

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 默认TPS限制器实现类,以服务为纬度
*/
public class DefaultTPSLimiter implements TPSLimiter {
/**
* StatItem 集合,即缓存每个接口的令牌数
* key: 服务键,interface + group + version
* value: StatItem
*/
private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();

/**
* 是否触发限流
*
* @param url url
* @param invocation invocation
* @return
*/
@Override
public boolean isAllowable(URL url, Invocation invocation) {
// 获得 tps 配置项,即令牌数
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
// 获得 tps.interval 周期配置项,默认60 秒,即令牌刷新时间间隔
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL);
// 获得服务键
String serviceKey = url.getServiceKey();

// 如果设置了令牌数,则开始限流处理
if (rate > 0) {
// 获取服务键对应的 StatItem 对象
StatItem statItem = stats.get(serviceKey);
// 不存在,则进行创建
if (statItem == null) {
stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
// 根据 tps 限流规则判断是否限制此次调用
return statItem.isAllowable();

// 不进行限流
} else {
// 移除当前服务键关联的 StatItem
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}

TPSLimiter 接口中的核心是 isAllowable() 方法,在 DefaultTPSLimiter 实现中,使用 ConcurrentHashMap 为每个服务健维护了一个相应的 StatItem 对象。在 isAllowable() 方法实现中,会从 URL 中读取 tps 参数值(默认为 -1,即没有限流),对于需要限流的请求,会从 stats 集合中获取(或创建)相应 StatItem 对象,然后调用 StatItem 对象的isAllowable() 方法判断是否被限流。

StatItem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class StatItem {

/**
* 统计名,目前使用服务键
*/
private String name;

/**
* 最后重置时间
*/
private long lastResetTime;
/**
* 令牌刷新时间间隔,即重置 token 值的时间周期,这样就实现了在 interval 时间段内能够通过 rate 个请求的效果。
*/
private long interval;

/**
* 令牌数,初始值为 rate 值,每通过一个请求 token 递减一,当减为 0 时,不再通过任何请求,实现限流的作用。
*/
private AtomicInteger token;

/**
* 一段时间内能通过的 TPS 上限
*/
private int rate;

/**
* 构造方法
*
* @param name 服务键
* @param rate 限制大小
* @param interval 限制周期
*/
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
// 记录时间戳
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
}

/**
* 限流规则判断是否限制此次调用
*
* @return
*/
public boolean isAllowable() {
// 周期性重置token
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
// 记录最近一次重置token的时间戳
lastResetTime = now;
}

// CAS,直到获得一个令牌,或者没有足够的令牌才结束
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}

// 是否允许访问 【取决是否能够拿到令牌】
return flag;
}

long getLastResetTime() {
return lastResetTime;
}

int getToken() {
return token.get();
}
}

StatItem 包装了令牌刷新的时间间隔、每次发放的令牌数等属性。它的核心是 isAllowable 方法,这也是整个 TPS 限流算法的核心。

它的主要逻辑如下:

  1. 判断上次发放令牌的时间点到现在是否超过令牌刷新的时间间隔,如果超过就重新发送令牌,之前没用完的不会叠加,而是重新设置令牌数。
  2. 通过 CAS 递减令牌,减掉后令牌数如果小于 0 则会触发限流。

小结

TpsLimitFilter 中的限流算法是基于计数器,注意和令牌桶算法的区别。常见的限流算法有计数器、令牌桶、漏桶等。