Dubbo过滤器 - FutureFilter

概述

FutureFilter 是 Dubbo 在消费端的过滤器,用于处理消费端的事件通知。在调用之前、调用之后、出现异常时,会触发 oninvokeonreturnonthrow 三个事件,可以配置当事件发生时,通知哪个类的哪个方法。

示例

服务接口

1
2
3
interface IDemoService {
String sayHello(String name);
}

服务提供端

服务实现

1
2
3
4
5
6
7
@Service
public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
return "nice to meet you - " + name;
}
}

服务配置

1
<dubbo:service interface="com.code.dubbo.DemoService" ref="demoServiceImpl"/>

服务消费端

Callback 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public interface Notify {

/**
* 前置方法,必须具有与原调用方法相同的入参列表
*
* @param requestParam 请求参数
*/
void oninvoke(String requestParam);

/**
* 正常回调方法,至少要有一个入参来接收返回结果
*
* @param result 用于接收原调用方法的结果
* @param requestParam 用于接收原调用方法的请求参数
*/
void onreturn(String result, String requestParam);

/**
* 异常回调方法,至少要有一个入参且第一个入参类型为Throwable或其子类接收返回结果
*
* @param ex 用于接收调用出现的异常
* @param requestPram 用于接收原调用方法的请求参数
*/
void onthrow(Throwable ex, String requestPram);
}

Callback 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Service
public class NotifyImpl implements Notify {
@Override
public void oninvoke(String requestParam) {
System.out.println("oninvoke is running !");
}

@Override
public void onreturn(String result, String requestParam) {
System.out.println("onreturn is running !");
}

@Override
public void onthrow(Throwable ex, String requestParam) {
System.out.println("onthrow is running !");
}
}

消费配置

1
2
3
4
5
6
7
8
9
10
11
<!-- 
1 oninvoke 在方法调用前触发(如果调用出现异常则会直接触发onthrow方法)
2 onreturn 在方法返回会触发(如果调用出现异常则会直接触发onthrow方法)
3 onthrow 调用出现异常时候触发
-->
<dubbo:reference id="demoService" check="false" interface="com.code.dubbo.DemoService">
<dubbo:method name="sayHello"
oninvoke="notifyImpl.oninvoke"
onreturn="notifyImpl.onreturn"
onthrow="notifyImpl.onthrow"/>
</dubbo:reference>

FutureFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {

protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
// 1 是否异步调用
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);

// 2 触发前置方法,即 执行 Callback.oninvoke 方法
fireInvokeCallback(invoker, invocation);

// 3 调用服务提供者
Result result = invoker.invoke(invocation);

// 4 异步回调 onreturn/onthrow
if (isAsync) {
asyncCallback(invoker, invocation);
// 5 同步回调用 onreturn/onth
} else {
syncCallback(invoker, invocation, result);
}

// 返回结果,如果是异步调用或单向调用,结果是空的
return result;
}
}

FutureFilter 用于触发消费端的事件通知,执行逻辑如下:

  1. 在方法调用前触发 oninvoke 指定的方法。
  2. 服务调用。
  3. 根据同步调用还是异步调用,走不同的逻辑分支。同步和异步调用的主要处理区别:
    • 同步调用,事件触发是直接调用的,没有额外逻辑。
    • 异步调用,需要从上下文中先获取到调用产生的 Future 对象,给该对象设置回调对象,回调对象在合适的时机触发事件通知。

触发 oninvoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
+--- FutureFilter
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
// 获得前置方法和方法所在对象
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
// 没有设置直接返回
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}

// 设置访问权限
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}

// 获得原调用方法的入参
Object[] params = invocation.getArguments();
try {
// 反射调用前置方法,可以发现 oninvoke 的方法参数要与调用的方法参数一致
onInvokeMethod.invoke(onInvokeInst, params);
} catch (InvocationTargetException e) {
// 触发异常回调
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
// 触发异常回调
fireThrowCallback(invoker, invocation, e);
}
}

oninvoke 指定的方法就是前置方法,该方法会在调用服务之前执行,且该方法必须具有和原调用方法相同的入参列表。

同步回调

1
2
3
4
5
6
7
8
9
10
11
12
+--- FutureFilter
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
// 有异常,触发异常回调
if (result.hasException()) {
// 注意:如果是consumer自己throw的异常,不会走到这里,而是直接执行 onthrow 方法
fireThrowCallback(invoker, invocation, result.getException());
// 正常,触发正常回调
} else {
// 执行 onreturn 方法
fireReturnCallback(invoker, invocation, result.getValue());
}
}

同步调用,事件触发是直接调用的。调用结果有异常则触发 onthrow 方法,没有异常则触发 onreturn 方法。

异步回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
+--- FutureFilter
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {

// 获得 Future 对象 。由于异步不知道服务提供方什么时候会执行完毕,所以要添加回调等待服务提供者返回结果。
Future<?> f = RpcContext.getContext().getFuture();

// 设置回调 ResponseCallback 到 DefaultFuture中
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();

// 当provider返回响应时,执行 DefaultFuture.doReceived 方法,该方法会调用ResponseCallback对象的done或者caught方法
future.setCallback(new ResponseCallback() {
/**
* 正常回调
* @param rpcResult
*/
@Override
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
///must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}

// 根据调用结果,调用 ResponseCallback 对象的 done 或者 caught 方法
Result result = (Result) rpcResult;
if (result.hasException()) {
fireThrowCallback(invoker, invocation, result.getException());
} else {
fireReturnCallback(invoker, invocation, result.getValue());
}
}

/**
* 触发异常回调方法
* @param exception
*/
@Override
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}

异步事件通知逻辑需要从上下文中先获取到调用产生的 Future 对象,然后给该对象设置回调对象,回调对象在合适的时机触发事件通知。信息交换层 中的结果通知和回调紧密关联,关于两者的交互可以参考 信息交换层

触发 onreturn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
+--- FutureFilter
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {

// 获得 onreturn 方法和对象
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));

//not set onreturn callback
if (onReturnMethod == null && onReturnInst == null) {
return;
}

if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onReturnMethod.isAccessible()) {
onReturnMethod.setAccessible(true);
}

// 原调用方法的入参
Object[] args = invocation.getArguments();

Object[] params;

// onreturn 方法的参数列表
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();

// onreturn 方法的参数多于1个
if (rParaTypes.length > 1) {

// onreturn(xx, Object[]) 两个参数:第一个参数与真实方法返回结果类型相同【用来接收返回结果】,第二个接收所有的真实请求参数
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = result;
params[1] = args;

// onreturn(xx, Object... args) 多个参数:第一个参数与真实方法的返回结果类型相同,后边几个接收所有的真实请求参数
} else {
params = new Object[args.length + 1];
params[0] = result;
System.arraycopy(args, 0, params, 1, args.length);
}

// onreturn(xx) 只有一个参数:接收返回执行结果
} else {
params = new Object[]{result};
}

// 调用方法
try {
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}

onreturn 指定的方法是返回正常结果通知方法,该方法会在调用结果没有异常的情况下执行,且该方法必须至少要有一个入参来接收返回结果。

触发 onthrow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
+--- FutureFilter
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {

// 获得 onthrow 方法和对象
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));

if (onthrowMethod == null && onthrowInst == null) {
return;
}
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (!onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}

// 获取 onthrow 方法的参数列表
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();

// onthrow 方法的参数第一个值必须为异常类型,所以这里需要构造参数列表
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
try {
// 原调用方法的参数列表
Object[] args = invocation.getArguments();
Object[] params;

// onthrow 方法的参数个数 > 1
if (rParaTypes.length > 1) {
// 原调用方法只有一个参数,而且这个参数是数组类型
// onthrow(xx, Object[]) 两个参数:第一个参数接收 exception,第二个接收所有的真实请求参数
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
params[1] = args;

// 原调用方法的参数多于一个
// onthrow(xx, Object... args) 多个参数:第一个参数接收exception,后边几个接收所有的真实请求参数
} else {
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
}

// 原调用方法没有参数
// onthrow(xx) 只有一个参数:接收exception
} else {
params = new Object[]{exception};
}

// 调用方法
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
}

onthrow 指定的方法是返回异常结果通知方法,该方法会在调用结果有异常的情况下执行,且该方法必须至少有一个入参且第一个入参类型为Throwable或其子类接收返回的异常结果。

小结

FutureFilter 用于消费端事件通知,以过滤器的形式统一处理,支持在服务调用之前、调用后以及调用异常的逻辑处理,和切面带来的结果类似。