Dubbo过滤器 - ExecuteLimitFilter & ActiveLimitFilter

概述

ExecuteLimitFilter 是 Dubbo 在服务提供端限流的实现,用于限制每个服务中每个方法的最大并发数(或占用线程池线程数)。ActiveLimitFilter 是 Dubbo 在消费端的限流实现,用于限制一个消费者对一个服务端方法的并发调用量(或占用连接的请求数)。它们都支持接口级别和方法级别的配置。

说明

Dubbo 在 2.6 和 2.7 版本的实现中有些许差异,下面我们分别对不同版本的实现进行说明。

Dubbo 2.6 实现

ExecuteLimitFilter

服务端限流

配置方式

1
2
3
4
5
6
7
<!-- 接口级别配置,每个方法的并发执行数(或占用线程池线程数)不能超过 N 个 -->
<dubbo:service interface="com.foo.BarService" executes="N"/>

<!-- 方法级别配置,sayHello方法的并发执行数(或占用线程池线程数)不能超过 N 个 -->
<dubbo:service interface="com.foo.BarService">
<dubbo:method name="sayHello" executes="N">
</dubbo:service>

注意,如果不设置,则默认不做限制,如果设置了小于等于0,那么同样是不做任何限制。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获得URL
URL url = invoker.getUrl();
// 获得方法名
String methodName = invocation.getMethodName();

// 信号量
Semaphore executesLimit = null;
// 是否获得信号量
boolean acquireResult = false;

// 获得服务提供方当前方法最大可并发请求数
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);

// 最大可并发请求数大于0
if (max > 0) {
// 基于 服务URL + 方法纬度,创建/获取 RpcStatus 计数器
RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
// 创建/获取 RpcStatus 计数器对应的信号量
executesLimit = count.getSemaphore(max);

// 尝试获取信号量,获取失败则抛出异常
if (executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
}
}

long begin = System.currentTimeMillis();
boolean isSuccess = true;

// 计数器 +1
RpcStatus.beginCount(url, methodName);
try {
// 服务调用
Result result = invoker.invoke(invocation);
return result;
} catch (Throwable t) {
// 标记失败
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
// 计数器-1 [调用失败/成功,看isSuccess的值]
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);

// 释放信号量
if (acquireResult) {
executesLimit.release();
}
}
}
}

ExecuteLimitFilter 本质上是利用 RpcStatus 中维护的 Semaphore 进行并发控制,进而达到限流的目的。需要说明的是,ExecuteLimitFilter 虽然使用到了计数器,但是起到限流作用的并不是它,而是计数器对应的信号量 Semaphore 。在 Dubbo 2.7 版本中移除了信号量的实现,使用计数器的原子类操作和CAS机制实现限流。

ActiveLimitFilter

客户端限流

配置方式

1
2
3
4
5
6
7
8
9
10
11
<!-- 接口级别配置,每个方法在每个客户端的并发调用数(占用连接的请求数)不能超过 N 个 -->
<dubbo:service interface="com.foo.BarService" actives="N"/> <!--在服务端配置-->
<dubbo:reference interface="com.foo.BarService" actives="N"/> <!--在客户端配置-->

<!-- 方法级别配置,sayHello方法在每个客户端的并发调用数(占用连接的请求数)不能超过 N 个 -->
<dubbo:service interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="N"><!--在服务端配置-->
</dubbo:service>
<dubbo:reference interface="com.foo.BarService">
<dubbo:method name="sayHello" actives="N"><!--在服务端配置-->
</dubbo:reference>

注意,如果服务端侧和消费端侧都配置了 actives,则消费端侧优先。如果设置了 actives 小于等于 0,则不做并发限制。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获取URL
URL url = invoker.getUrl();
// 获取方法名
String methodName = invocation.getMethodName();
// 获取当前方法在当前客户端的最大调用量
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 基于服务URL + 方法纬度, 获得 RpcStatus 对象
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
/**
* 获得超时时间 [注意:这里的超时值不占用调用服务的超时时间] ,是用来控制等待请求释放资源的时间,防止等待时间太久。
* 在极端情况下,调用服务的时间几乎是 2 * timeout
*/
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;

// 获取当前并发度
int active = count.getActive();

// 如果达到限流阈值,和服务提供者不一样,并不是直接抛出异常,而是先等待直到超时以等待并发度降低,因为请求是允许有超时时间的。
if (active >= max) {
// 并发控制
synchronized (count) {
/**
*
* 循环获取当前并发数,如果大于限流阈值则等待
* 会有两种结果:
* 1 某个Invoker在调用结束后,并发把计数器原子-1并唤醒等待线程,会有一个等待状态的线程被唤醒并继续执行逻辑
* 2 wait等待超时都没有被唤醒,此时抛出异常
*/
while ((active = count.getActive()) >= max) {
try {
// 等待,直到超时,或者被唤醒
count.wait(remain);
} catch (InterruptedException e) {
}

// 是否超时,超时则抛出异常
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}

try {
long begin = System.currentTimeMillis();
// 开始计数,并发原子数 + 1
RpcStatus.beginCount(url, methodName);
try {
// 调用服务
Result result = invoker.invoke(invocation);
// 结束计数(调用成功),并发原子数 - 1
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
// 结束计数(调用失败),并发原子数 -1
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
// 唤醒等待的相同服务的相同方法的请求
if (max > 0) {
synchronized (count) {
count.notify();
}
}
}
}
}

ActiveLimitFilter 依赖 RpcStatus 的 beginCount() 方法和 endCount() 方法来实现 RpcStatus.active 字段的增减来达到限流的目的。 此外,做为消费端的限流过滤器,达到限流的阈值时并不是直接抛出异常,而是充分利用请求超时时间,允许在请求超时时间内等待并发度降低。

RpcStatus

RpcStatus 做为 ExecuteLimitFilterActiveLimitFilter 实现限流的核心类,前者限流使用 RpcStatus 封装的信号量 Semaphore ,后者限流使用 RpcStatus 维护的原子类型的 AtomicInteger active 属性。下面我们对 RpcStatus 的核心代码实现进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
public class RpcStatus {
/**
* 服务状态信息
* key: URL
* value: RpcStatus 计数器
*/
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
/**
* 服务每个方法的状态信息
* key1: URL
* key2: 方法名
* RpcStatus 计数器
*/
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();

private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
/**
* 当前并发度
*
* @see com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
*/
private final AtomicInteger active = new AtomicInteger();
/**
* 总调用次数
*/
private final AtomicLong total = new AtomicLong();
/**
* 总调用失败次数
*/
private final AtomicInteger failed = new AtomicInteger();
/**
* 总调用时长,单位: 毫秒
*/
private final AtomicLong totalElapsed = new AtomicLong();
/**
* 总调用失败时长,单位:毫秒
*/
private final AtomicLong failedElapsed = new AtomicLong();
/**
* 所有调用中最长的耗时,单位:毫秒
*/
private final AtomicLong maxElapsed = new AtomicLong();
/**
* 所有失败调用中最长的耗时,单位:毫秒
*/
private final AtomicLong failedMaxElapsed = new AtomicLong();
/**
* 所有成功调用中最长的耗时,单位:毫秒
*/
private final AtomicLong succeededMaxElapsed = new AtomicLong();
/**
* Semaphore used to control concurrency limit set by `executes`
* <p>
* 服务执行信号量【包含服务执行信号量大小】
*
* @see com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
*/
private volatile Semaphore executesLimit;
/**
* 服务执行信号量大小
*/
private volatile int executesPermits;

private RpcStatus() {
}

/**
* 根据服务URL为纬度的获得RpcStatus
*
* @param url
* @return status
*/
public static RpcStatus getStatus(URL url) {
// URL的字符串
String uri = url.toIdentityString();
// 是否存在
RpcStatus status = SERVICE_STATISTICS.get(uri);

// 不存在则创建,并且放入缓存中
if (status == null) {
SERVICE_STATISTICS.putIfAbsent(uri, new RpcStatus());
status = SERVICE_STATISTICS.get(uri);
}
return status;
}


/**
* 根据 服务URL + 方法 获得RpcStatus
*
* @param url
* @param methodName
* @return status
*/
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
// 获得方法集合
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName);
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}

/**
* 服务调用开始计数
*
* @param url URL 对象
* @param methodName 方法名
*/
public static void beginCount(URL url, String methodName) {
// SERVICE_STATISTICS -> 基于服务URL的计数
beginCount(getStatus(url));
// METHOD_STATISTICS -> 基于服务URL + 方法的计数
beginCount(getStatus(url, methodName));
}

/**
* 计数 - 调用中的次数
*
* @param status
*/
private static void beginCount(RpcStatus status) {
status.active.incrementAndGet();
}

/**
* 服务调用结束的计数
*
* @param url URL对象
* @param methodName 方法名
* @param elapsed 时长,毫秒
* @param succeeded 是否成功
*/
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
// SERVICE_STATISTICS -> 基于服务URL的计数
endCount(getStatus(url), elapsed, succeeded);
// METHOD_STATISTICS -> 基于服务URL + 方法的计数
endCount(getStatus(url, methodName), elapsed, succeeded);
}

/**
* 结束计数
*
* @param status
* @param elapsed
* @param succeeded
*/
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
// 调用的次数要递减
status.active.decrementAndGet();
// 总调用次数递增
status.total.incrementAndGet();
// 总调用时长递增
status.totalElapsed.addAndGet(elapsed);
// 更新最大调用时长
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}

// 是否调用成功
if (succeeded) {
// 更新最大成功调用时长
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
// 调用失败次数递增
status.failed.incrementAndGet();
// 总调用失败时长
status.failedElapsed.addAndGet(elapsed);

// 更新最大失败调用时长
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}

/**
* set value.
*
* @param key
* @param value
*/
public void set(String key, Object value) {
values.put(key, value);
}

/**
* get value.
*
* @param key
* @return value
*/
public Object get(String key) {
return values.get(key);
}

/**
* get active.
*
* @return active
*/
public int getActive() {
return active.get();
}

/**
* get total.
*
* @return total
*/
public long getTotal() {
return total.longValue();
}

/**
* get total elapsed.
*
* @return total elapsed
*/
public long getTotalElapsed() {
return totalElapsed.get();
}

/**
* get failed.
*
* @return failed
*/
public int getFailed() {
return failed.get();
}

/**
* get failed elapsed.
*
* @return failed elapsed
*/
public long getFailedElapsed() {
return failedElapsed.get();
}

/**
* get succeeded.
*
* @return succeeded
*/
public long getSucceeded() {
return getTotal() - getFailed();
}

/**
* get succeeded elapsed.
*
* @return succeeded elapsed
*/
public long getSucceededElapsed() {
return getTotalElapsed() - getFailedElapsed();
}

/**
* get succeeded average elapsed.
*
* @return succeeded average elapsed
*/
public long getSucceededAverageElapsed() {
long succeeded = getSucceeded();
if (succeeded == 0) {
return 0;
}
return getSucceededElapsed() / succeeded;
}

/**
* get succeeded max elapsed.
*
* @return succeeded max elapsed.
*/
public long getSucceededMaxElapsed() {
return succeededMaxElapsed.get();
}

/**
* 获取信号量
* <p>
* Get the semaphore for thread number. Semaphore's permits is decided by {@link Constants#EXECUTES_KEY}
*
* @param maxThreadNum value of {@link Constants#EXECUTES_KEY}
* @return thread number semaphore
*/
public Semaphore getSemaphore(int maxThreadNum) {
if (maxThreadNum <= 0) {
return null;
}

// 若信号量不存在,或者信号量大小改变,则创建新的信号量
if (executesLimit == null || executesPermits != maxThreadNum) {
synchronized (this) {
if (executesLimit == null || executesPermits != maxThreadNum) {
// 创建信号量
executesLimit = new Semaphore(maxThreadNum);
executesPermits = maxThreadNum;
}
}
}

// 返回信号量
return executesLimit;
}
}

Dubbo 2.7 实现

ExecuteLimitFilter

配置方式

配置方式同 Dubbo 2.6 版本实现。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
@Activate(group = CommonConstants.PROVIDER, value = EXECUTES_KEY)
public class ExecuteLimitFilter implements Filter, Filter.Listener {

private static final String EXECUTE_LIMIT_FILTER_START_TIME = "execute_limit_filter_start_time";

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 获取相关参数,为下面的逻辑做准备
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);

// 尝试增加active的值,当并发度达到executes配置指定的阈值,则直接抛出异常
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}

// 设置限流的开始时间,用于调用完成或调用异常后的消耗时间计算
invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}

/**
* 调用结束 - Filter.Listener 接口实现
*
* @param appResponse
* @param invoker
* @param invocation
*/
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
// 减小 active 的值,同时完成对一次调用的统计
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
}

/**
* 调用失败 - Filter.Listener 接口实现
*
* @param t
* @param invoker
* @param invocation
*/
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (t instanceof RpcException) {
RpcException rpcException = (RpcException) t;
if (rpcException.isLimitExceed()) {
return;
}
}
// 减小 active 的值,同时完成对一次调用的统计
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
}

private long getElapsed(Invocation invocation) {
Object beginTime = invocation.get(EXECUTE_LIMIT_FILTER_START_TIME);
return beginTime != null ? System.currentTimeMillis() - (Long) beginTime : 0;
}
}

ExecuteLimitFilter 依赖 RpcStatus 的 beginCount() 方法和 endCount() 方法来实现 RpcStatus.active 字段的增减来达到限流的目的,放弃了 Dubbo 2.6 的信号量限流实现。

ActiveLimitFilter

配置方式

配置方式同 Dubbo 2.6 版本实现。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
@Activate(group = CONSUMER, value = ACTIVES_KEY)
public class ActiveLimitFilter implements Filter, Filter.Listener {

private static final String ACTIVELIMIT_FILTER_START_TIME = "activelimit_filter_start_time";

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取最大并发度
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
// 获取服务方法的 RpcStatus
final RpcStatus rpcStatus = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());

// 尝试增加active的值,当并发度达到 actives 配置指定的阈值时,则根据超时时间进行等待重新获取
if (!RpcStatus.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
// 加锁
synchronized (rpcStatus) {
// 再次尝试并发度加一
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
// 当前线程阻塞,等待并发度降低
rpcStatus.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
// 超时了则抛出限流异常
if (remain <= 0) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Waiting concurrent invoke timeout in client-side for service: " +
invoker.getInterface().getName() + ", method: " + invocation.getMethodName() +
", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " +
rpcStatus.getActive() + ". max concurrent invoke limit: " + max);
}
}
}
}

// 设置调用服务前的时间戳
invocation.put(ACTIVELIMIT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}

/**
* 调用完成-Filter.Listener 接口实现
*
* @param appResponse
* @param invoker
* @param invocation
*/
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
String methodName = invocation.getMethodName();
URL url = invoker.getUrl();
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);
// 减小 active 的值,同时完成对一次调用的统计
RpcStatus.endCount(url, methodName, getElapsed(invocation), true);
// 调用 notifyFinish() 方法唤醒阻塞在对应 RpcStatus 对象上的线程
notifyFinish(RpcStatus.getStatus(url, methodName), max);
}

/**
* 调用失败 - Filter.Listener 接口实现
*
* @param t
* @param invoker
* @param invocation
*/
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
String methodName = invocation.getMethodName();
URL url = invoker.getUrl();
int max = invoker.getUrl().getMethodParameter(methodName, ACTIVES_KEY, 0);

if (t instanceof RpcException) {
RpcException rpcException = (RpcException) t;
// 限流异常不处理
if (rpcException.isLimitExceed()) {
return;
}
}
// 减小 active 的值,同时完成对一次调用的统计
RpcStatus.endCount(url, methodName, getElapsed(invocation), false);
// 调用 notifyFinish() 方法唤醒阻塞在对应 RpcStatus 对象上的线程(所有阻塞等待的线程)
notifyFinish(RpcStatus.getStatus(url, methodName), max);
}

private long getElapsed(Invocation invocation) {
Object beginTime = invocation.get(ACTIVELIMIT_FILTER_START_TIME);
return beginTime != null ? System.currentTimeMillis() - (Long) beginTime : 0;
}

/**
* 唤醒因限流导致阻塞等待的线程
*
* @param rpcStatus
* @param max
*/
private void notifyFinish(final RpcStatus rpcStatus, int max) {
if (max > 0) {
synchronized (rpcStatus) {
rpcStatus.notifyAll();
}
}
}
}

ActiveLimitFilter 同样依赖 RpcStatus 的 beginCount() 方法和 endCount() 方法来实现 RpcStatus.active 字段的增减来达到限流的目的。

RpcStatus

同样地,RpcStatus 是服务端和消费端实现限流的核心实现,下面对该对象进行说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
public class RpcStatus {

/**
* 服务状态信息
* key: URL
* value: RpcStatus
*/
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
/**
* 服务每个方法的状态信息
* key1: URL
* key2: 方法名
* value: RpcStatus
*/
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
/**
* 当前并发度
*/
private final AtomicInteger active = new AtomicInteger();
/**
* 调用的总数
*/
private final AtomicLong total = new AtomicLong();
/**
* 失败的调用数
*/
private final AtomicInteger failed = new AtomicInteger();
/**
* 所有调用的总耗时
*/
private final AtomicLong totalElapsed = new AtomicLong();
/**
* 所有失败调用的总耗时
*/
private final AtomicLong failedElapsed = new AtomicLong();
/**
* 所有调用中最长的耗时
*/
private final AtomicLong maxElapsed = new AtomicLong();
/**
* 所有失败调用中最长的耗时
*/
private final AtomicLong failedMaxElapsed = new AtomicLong();
/**
* 所有成功调用中最长的耗时
*/
private final AtomicLong succeededMaxElapsed = new AtomicLong();

private RpcStatus() {
}

/**
* @param url
* @return status
*/
public static RpcStatus getStatus(URL url) {
String uri = url.toIdentityString();
return SERVICE_STATISTICS.computeIfAbsent(uri, key -> new RpcStatus());
}

/**
* @param url
*/
public static void removeStatus(URL url) {
String uri = url.toIdentityString();
SERVICE_STATISTICS.remove(uri);
}

/**
* @param url
* @param methodName
* @return status
*/
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.computeIfAbsent(uri, k -> new ConcurrentHashMap<>());
return map.computeIfAbsent(methodName, k -> new RpcStatus());
}

/**
* @param url
*/
public static void removeStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map != null) {
map.remove(methodName);
}
}

public static void beginCount(URL url, String methodName) {
beginCount(url, methodName, Integer.MAX_VALUE);
}

/**
* 在远程调用开始之前执行,其中会获取 服务和服务方法 对应的 RpcStatus 对象,然后分别将它们的 active 字段+1
*
* @param url
* @param methodName
* @param max
* @return
*/
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
// 获取服务对应的 RpcStatus
RpcStatus appStatus = getStatus(url);
// 获取服务方法对应的 RpcStatus
RpcStatus methodStatus = getStatus(url, methodName);
// 是否需要限流
if (methodStatus.active.get() == Integer.MAX_VALUE) {
return false;
}
// 自旋 + CAS 更新服务方法的并发度
for (int i; ; ) {
i = methodStatus.active.get();
// 并发度超过 max 上限,直接返回 false
if (i + 1 > max) {
return false;
}
if (methodStatus.active.compareAndSet(i, i + 1)) {
break;
}
}
// 服务的并发度+1
appStatus.active.incrementAndGet();
return true;
}

/**
* 会对服务和服务方法两个维度的 RpcStatus 中的所有字段进行更新,完成统计.
*
* @param url
* @param elapsed
* @param succeeded
*/
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
endCount(getStatus(url), elapsed, succeeded);
endCount(getStatus(url, methodName), elapsed, succeeded);
}

private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
// 降低并发度
status.active.decrementAndGet();
// 调用总次数增加
status.total.incrementAndGet();
// 调用总耗时增加
status.totalElapsed.addAndGet(elapsed);
// 更新最大耗时
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
// 如果此次调用成功,则会更新成功调用的最大耗时
if (succeeded) {
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
// 如果此次调用失败,则会更新失败调用的最大耗时
} else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}

/**
* get active.
*
* @return active
*/
public int getActive() {
return active.get();
}

// 省略其它代码

}

小结

ExecuteLimitFilterActiveLimitFilter 分别作为服务端和消费端的限流实现,之所以前者是针对服务端的后者是针对消费端的,因为过滤器设置的针对的对象不同而已。我们可以发现 Dubbo 2.7 中两者实现逻辑几乎一致,Dubbo 2.6 中服务端的限流实现借助了信号量,消费端限流实现同样是原子类。