Dubbo过滤器 - TpsLimitFilter
概述 TpsLimitFilter 是服务提供端对 TPS 限流的实现。该过滤器的限流是基于令牌的,本质上是计数器限流的实现方式,即一个时间段内只分配 N 个令牌,每个请求过来都会消耗一个令牌,耗完即止,后面再来的请求都会被拒绝。计数器算法简单粗暴,易于实现。但是缺点也是很大的,容易造成前一个时间段非常忙碌,下一时间段又非常空闲。
配置 1 2 3 4 <dubbo:parameter key ="tps" value ="100" /> <dubbo:parameter key ="tps.interval" value ="1000" />
将以上的配置项添加到 <dubbo:provider/>
或 <dubbo:service/>
或 <dubbo:protocol/>
中开启即可。
注意,目前 Dubbo Filter 的 SPI 配置文件中并没有配置 TpsLimitFilter ,如果需要使用则配置:
1 2 # com.alibaba.dubbo.rpc.Filter 文件 tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter
TpsLimitFilter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Activate (group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)public class TpsLimitFilter implements Filter { private final TPSLimiter tpsLimiter = new DefaultTPSLimiter(); @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) { throw new RpcException( "Failed to invoke service " + invoker.getInterface().getName() + "." + invocation.getMethodName() + " because exceed max service tps." ); } return invoker.invoke(invocation); } }
TPSLimiter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public interface TPSLimiter { boolean isAllowable (URL url, Invocation invocation) ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class DefaultTPSLimiter implements TPSLimiter { private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>(); @Override public boolean isAllowable (URL url, Invocation invocation) { int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1 ); long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL); String serviceKey = url.getServiceKey(); if (rate > 0 ) { StatItem statItem = stats.get(serviceKey); if (statItem == null ) { stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval)); statItem = stats.get(serviceKey); } return statItem.isAllowable(); } else { StatItem statItem = stats.get(serviceKey); if (statItem != null ) { stats.remove(serviceKey); } } return true ; } }
TPSLimiter 接口中的核心是 isAllowable() 方法,在 DefaultTPSLimiter 实现中,使用 ConcurrentHashMap 为每个服务健维护了一个相应的 StatItem 对象。在 isAllowable() 方法实现中,会从 URL 中读取 tps 参数值(默认为 -1,即没有限流),对于需要限流的请求,会从 stats 集合中获取(或创建)相应 StatItem 对象,然后调用 StatItem 对象的isAllowable() 方法判断是否被限流。
StatItem 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 class StatItem { private String name; private long lastResetTime; private long interval; private AtomicInteger token; private int rate; StatItem(String name, int rate, long interval) { this .name = name; this .rate = rate; this .interval = interval; this .lastResetTime = System.currentTimeMillis(); this .token = new AtomicInteger(rate); } public boolean isAllowable () { long now = System.currentTimeMillis(); if (now > lastResetTime + interval) { token.set(rate); lastResetTime = now; } int value = token.get(); boolean flag = false ; while (value > 0 && !flag) { flag = token.compareAndSet(value, value - 1 ); value = token.get(); } return flag; } long getLastResetTime () { return lastResetTime; } int getToken () { return token.get(); } }
StatItem 包装了令牌刷新的时间间隔、每次发放的令牌数等属性。它的核心是 isAllowable 方法,这也是整个 TPS 限流算法的核心。
它的主要逻辑如下:
判断上次发放令牌的时间点到现在是否超过令牌刷新的时间间隔,如果超过就重新发送令牌,之前没用完的不会叠加,而是重新设置令牌数。
通过 CAS 递减令牌,减掉后令牌数如果小于 0 则会触发限流。
小结 TpsLimitFilter 中的限流算法是基于计数器,注意和令牌桶算法的区别。常见的限流算法有计数器、令牌桶、漏桶等。