Dubbo过滤器 - ExecuteLimitFilter & ActiveLimitFilter
概述 ExecuteLimitFilter 是 Dubbo 在服务提供端限流的实现,用于限制每个服务中每个方法的最大并发数(或占用线程池线程数)。ActiveLimitFilter 是 Dubbo 在消费端的限流实现,用于限制一个消费者对一个服务端方法的并发调用量(或占用连接的请求数)。它们都支持接口级别和方法级别的配置。
说明 Dubbo 在 2.6 和 2.7 版本的实现中有些许差异,下面我们分别对不同版本的实现进行说明。
Dubbo 2.6 实现 ExecuteLimitFilter 服务端限流
配置方式 1 2 3 4 5 6 7 <dubbo:service interface ="com.foo.BarService" executes ="N" /> <dubbo:service interface ="com.foo.BarService" > <dubbo:method name ="sayHello" executes ="N" > </dubbo:service >
注意,如果不设置,则默认不做限制,如果设置了小于等于0,那么同样是不做任何限制。
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 @Activate (group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)public class ExecuteLimitFilter implements Filter { @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null ; boolean acquireResult = false ; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0 ); if (max > 0 ) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); executesLimit = count.getSemaphore(max); if (executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited." ); } } long begin = System.currentTimeMillis(); boolean isSuccess = true ; RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false ; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter" , t); } } finally { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if (acquireResult) { executesLimit.release(); } } } }
ExecuteLimitFilter 本质上是利用 RpcStatus 中维护的 Semaphore 进行并发控制,进而达到限流的目的。需要说明的是,ExecuteLimitFilter 虽然使用到了计数器,但是起到限流作用的并不是它,而是计数器对应的信号量 Semaphore 。在 Dubbo 2.7 版本中移除了信号量的实现,使用计数器的原子类操作和CAS机制实现限流。
ActiveLimitFilter 客户端限流
配置方式 1 2 3 4 5 6 7 8 9 10 11 <dubbo:service interface ="com.foo.BarService" actives ="N" /> <dubbo:reference interface ="com.foo.BarService" actives ="N" /> <dubbo:service interface ="com.foo.BarService" > <dubbo:method name ="sayHello" actives ="N" > </dubbo:service > <dubbo:reference interface ="com.foo.BarService" > <dubbo:method name ="sayHello" actives ="N" > </dubbo:reference >
注意,如果服务端侧和消费端侧都配置了 actives,则消费端侧优先。如果设置了 actives 小于等于 0,则不做并发限制。
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 @Activate (group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)public class ActiveLimitFilter implements Filter { @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0 ); RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0 ) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0 ); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0 ) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } try { long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true ); return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false ); throw t; } } finally { if (max > 0 ) { synchronized (count) { count.notify(); } } } } }
ActiveLimitFilter 依赖 RpcStatus 的 beginCount() 方法和 endCount() 方法来实现 RpcStatus.active 字段的增减来达到限流的目的。 此外,做为消费端的限流过滤器,达到限流的阈值时并不是直接抛出异常,而是充分利用请求超时时间,允许在请求超时时间内等待并发度降低。
RpcStatus RpcStatus 做为 ExecuteLimitFilter 和 ActiveLimitFilter 实现限流的核心类,前者限流使用 RpcStatus 封装的信号量 Semaphore ,后者限流使用 RpcStatus 维护的原子类型的 AtomicInteger active 属性。下面我们对 RpcStatus 的核心代码实现进行分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 public class RpcStatus { private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); private final AtomicInteger active = new AtomicInteger(); private final AtomicLong total = new AtomicLong(); private final AtomicInteger failed = new AtomicInteger(); private final AtomicLong totalElapsed = new AtomicLong(); private final AtomicLong failedElapsed = new AtomicLong(); private final AtomicLong maxElapsed = new AtomicLong(); private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong(); private volatile Semaphore executesLimit; private volatile int executesPermits; private RpcStatus () { } public static RpcStatus getStatus (URL url) { String uri = url.toIdentityString(); RpcStatus status = SERVICE_STATISTICS.get(uri); if (status == null ) { SERVICE_STATISTICS.putIfAbsent(uri, new RpcStatus()); status = SERVICE_STATISTICS.get(uri); } return status; } public static RpcStatus getStatus (URL url, String methodName) { String uri = url.toIdentityString(); ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri); if (map == null ) { METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>()); map = METHOD_STATISTICS.get(uri); } RpcStatus status = map.get(methodName); if (status == null ) { map.putIfAbsent(methodName, new RpcStatus()); status = map.get(methodName); } return status; } public static void beginCount (URL url, String methodName) { beginCount(getStatus(url)); beginCount(getStatus(url, methodName)); } private static void beginCount (RpcStatus status) { status.active.incrementAndGet(); } public static void endCount (URL url, String methodName, long elapsed, boolean succeeded) { endCount(getStatus(url), elapsed, succeeded); endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount (RpcStatus status, long elapsed, boolean succeeded) { status.active.decrementAndGet(); status.total.incrementAndGet(); status.totalElapsed.addAndGet(elapsed); if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } if (succeeded) { if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { status.failed.incrementAndGet(); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } } public void set (String key, Object value) { values.put(key, value); } public Object get (String key) { return values.get(key); } public int getActive () { return active.get(); } public long getTotal () { return total.longValue(); } public long getTotalElapsed () { return totalElapsed.get(); } public int getFailed () { return failed.get(); } public long getFailedElapsed () { return failedElapsed.get(); } public long getSucceeded () { return getTotal() - getFailed(); } public long getSucceededElapsed () { return getTotalElapsed() - getFailedElapsed(); } public long getSucceededAverageElapsed () { long succeeded = getSucceeded(); if (succeeded == 0 ) { return 0 ; } return getSucceededElapsed() / succeeded; } public long getSucceededMaxElapsed () { return succeededMaxElapsed.get(); } public Semaphore getSemaphore (int maxThreadNum) { if (maxThreadNum <= 0 ) { return null ; } if (executesLimit == null || executesPermits != maxThreadNum) { synchronized (this ) { if (executesLimit == null || executesPermits != maxThreadNum) { executesLimit = new Semaphore(maxThreadNum); executesPermits = maxThreadNum; } } } return executesLimit; } }
Dubbo 2.7 实现 ExecuteLimitFilter 配置方式 配置方式同 Dubbo 2.6 版本实现。
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 @Activate (group = CommonConstants.PROVIDER, value = EXECUTES_KEY)public class ExecuteLimitFilter implements Filter , Filter .Listener { private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time" ; @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0 ); if (!RpcStatus.beginCount(url, methodName, max)) { throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION, "Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited." ); } invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis()); try { return invoker.invoke(invocation); } catch (Throwable t) { if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter" , t); } } } @Override public void onResponse (Result appResponse, Invoker<?> invoker, Invocation invocation) { RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true ); } @Override public void onError (Throwable t, Invoker<?> invoker, Invocation invocation) { if (t instanceof RpcException) { RpcException rpcException = (RpcException) t; if (rpcException.isLimitExceed()) { return ; } } RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false ); } private long getElapsed (Invocation invocation) { Object beginTime = invocation.get(EXECUTE_LIMIT_FILTER_START_TIME); return beginTime != null ? System.currentTimeMillis() - (Long) beginTime : 0 ; } }
ExecuteLimitFilter 依赖 RpcStatus 的 beginCount() 方法和 endCount() 方法来实现 RpcStatus.active 字段的增减来达到限流的目的,放弃了 Dubbo 2.6 的信号量限流实现。
ActiveLimitFilter 配置方式 配置方式同 Dubbo 2.6 版本实现。
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 @Activate (group = CONSUMER, value = ACTIVES_KEY)public class ActiveLimitFilter implements Filter , Filter .Listener { private static final String ACTIVELIMIT_FILTER_START_TIME = "activelimit_filter_start_time" ; @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0 ); final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (!RpcStatus.beginCount(url, methodName, max)) { long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0 ); long start = System.currentTimeMillis(); long remain = timeout; synchronized (rpcStatus) { while (!RpcStatus.beginCount(url, methodName, max)) { try { rpcStatus.wait(remain); } catch (InterruptedException e) { } long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0 ) { throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION, "Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + rpcStatus.getActive() + ". max concurrent invoke limit: " + max); } } } } invocation.put(ACTIVELIMIT_FILTER_START_TIME, System.currentTimeMillis()); return invoker.invoke(invocation); } @Override public void onResponse (Result appResponse, Invoker<?> invoker, Invocation invocation) { String methodName = invocation.getMethodName(); URL url = invoker.getUrl(); int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0 ); RpcStatus.endCount(url, methodName, getElapsed(invocation), true ); notifyFinish(RpcStatus.getStatus(url, methodName), max); } @Override public void onError (Throwable t, Invoker<?> invoker, Invocation invocation) { String methodName = invocation.getMethodName(); URL url = invoker.getUrl(); int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0 ); if (t instanceof RpcException) { RpcException rpcException = (RpcException) t; if (rpcException.isLimitExceed()) { return ; } } RpcStatus.endCount(url, methodName, getElapsed(invocation), false ); notifyFinish(RpcStatus.getStatus(url, methodName), max); } private long getElapsed (Invocation invocation) { Object beginTime = invocation.get(ACTIVELIMIT_FILTER_START_TIME); return beginTime != null ? System.currentTimeMillis() - (Long) beginTime : 0 ; } private void notifyFinish (final RpcStatus rpcStatus, int max) { if (max > 0 ) { synchronized (rpcStatus) { rpcStatus.notifyAll(); } } } }
ActiveLimitFilter 同样依赖 RpcStatus 的 beginCount() 方法和 endCount() 方法来实现 RpcStatus.active 字段的增减来达到限流的目的。
RpcStatus 同样地,RpcStatus 是服务端和消费端实现限流的核心实现,下面对该对象进行说明。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 public class RpcStatus { private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>(); private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>(); private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>(); private final AtomicInteger active = new AtomicInteger(); private final AtomicLong total = new AtomicLong(); private final AtomicInteger failed = new AtomicInteger(); private final AtomicLong totalElapsed = new AtomicLong(); private final AtomicLong failedElapsed = new AtomicLong(); private final AtomicLong maxElapsed = new AtomicLong(); private final AtomicLong failedMaxElapsed = new AtomicLong(); private final AtomicLong succeededMaxElapsed = new AtomicLong(); private RpcStatus () { } public static RpcStatus getStatus (URL url) { String uri = url.toIdentityString(); return SERVICE_STATISTICS.computeIfAbsent(uri, key -> new RpcStatus()); } public static void removeStatus (URL url) { String uri = url.toIdentityString(); SERVICE_STATISTICS.remove(uri); } public static RpcStatus getStatus (URL url, String methodName) { String uri = url.toIdentityString(); ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.computeIfAbsent(uri, k -> new ConcurrentHashMap<>()); return map.computeIfAbsent(methodName, k -> new RpcStatus()); } public static void removeStatus (URL url, String methodName) { String uri = url.toIdentityString(); ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri); if (map != null ) { map.remove(methodName); } } public static void beginCount (URL url, String methodName) { beginCount(url, methodName, Integer.MAX_VALUE); } public static boolean beginCount (URL url, String methodName, int max) { max = (max <= 0 ) ? Integer.MAX_VALUE : max; RpcStatus appStatus = getStatus(url); RpcStatus methodStatus = getStatus(url, methodName); if (methodStatus.active.get() == Integer.MAX_VALUE) { return false ; } for (int i; ; ) { i = methodStatus.active.get(); if (i + 1 > max) { return false ; } if (methodStatus.active.compareAndSet(i, i + 1 )) { break ; } } appStatus.active.incrementAndGet(); return true ; } public static void endCount (URL url, String methodName, long elapsed, boolean succeeded) { endCount(getStatus(url), elapsed, succeeded); endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount (RpcStatus status, long elapsed, boolean succeeded) { status.active.decrementAndGet(); status.total.incrementAndGet(); status.totalElapsed.addAndGet(elapsed); if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } if (succeeded) { if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { status.failed.incrementAndGet(); status.failedElapsed.addAndGet(elapsed); if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } } public int getActive () { return active.get(); } }
小结 ExecuteLimitFilter 和 ActiveLimitFilter 分别作为服务端和消费端的限流实现,之所以前者是针对服务端的后者是针对消费端的,因为过滤器设置的针对的对象不同而已。我们可以发现 Dubbo 2.7 中两者实现逻辑几乎一致,Dubbo 2.6 中服务端的限流实现借助了信号量,消费端限流实现同样是原子类。