概述
FutureFilter
是 Dubbo 在消费端的过滤器,用于处理消费端的事件通知。在调用之前、调用之后、出现异常时,会触发 oninvoke
、onreturn
、onthrow
三个事件,可以配置当事件发生时,通知哪个类的哪个方法。
示例
服务接口
1 2 3
| interface IDemoService { String sayHello(String name); }
|
服务提供端
服务实现
1 2 3 4 5 6 7
| @Service public class DemoServiceImpl implements DemoService { @Override public String sayHello(String name) { return "nice to meet you - " + name; } }
|
服务配置
1
| <dubbo:service interface="com.code.dubbo.DemoService" ref="demoServiceImpl"/>
|
服务消费端
Callback 接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public interface Notify {
void oninvoke(String requestParam);
void onreturn(String result, String requestParam);
void onthrow(Throwable ex, String requestPram); }
|
Callback 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Service public class NotifyImpl implements Notify { @Override public void oninvoke(String requestParam) { System.out.println("oninvoke is running !"); }
@Override public void onreturn(String result, String requestParam) { System.out.println("onreturn is running !"); }
@Override public void onthrow(Throwable ex, String requestParam) { System.out.println("onthrow is running !"); } }
|
消费配置
1 2 3 4 5 6 7 8 9 10 11
|
<dubbo:reference id="demoService" check="false" interface="com.code.dubbo.DemoService"> <dubbo:method name="sayHello" oninvoke="notifyImpl.oninvoke" onreturn="notifyImpl.onreturn" onthrow="notifyImpl.onthrow"/> </dubbo:reference>
|
FutureFilter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Activate(group = Constants.CONSUMER) public class FutureFilter implements Filter {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class); @Override public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
fireInvokeCallback(invoker, invocation);
Result result = invoker.invoke(invocation);
if (isAsync) { asyncCallback(invoker, invocation); } else { syncCallback(invoker, invocation, result); }
return result; } }
|
FutureFilter
用于触发消费端的事件通知,执行逻辑如下:
- 在方法调用前触发 oninvoke 指定的方法。
- 服务调用。
- 根据同步调用还是异步调用,走不同的逻辑分支。同步和异步调用的主要处理区别:
- 同步调用,事件触发是直接调用的,没有额外逻辑。
- 异步调用,需要从上下文中先获取到调用产生的 Future 对象,给该对象设置回调对象,回调对象在合适的时机触发事件通知。
触发 oninvoke
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| +--- FutureFilter private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) { final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); if (onInvokeMethod == null && onInvokeInst == null) { return; } if (onInvokeMethod == null || onInvokeInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); }
if (!onInvokeMethod.isAccessible()) { onInvokeMethod.setAccessible(true); }
Object[] params = invocation.getArguments(); try { onInvokeMethod.invoke(onInvokeInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
|
oninvoke
指定的方法就是前置方法,该方法会在调用服务之前执行,且该方法必须具有和原调用方法相同的入参列表。
同步回调
1 2 3 4 5 6 7 8 9 10 11 12
| +--- FutureFilter private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) { if (result.hasException()) { fireThrowCallback(invoker, invocation, result.getException()); } else { fireReturnCallback(invoker, invocation, result.getValue()); } }
|
同步调用,事件触发是直接调用的。调用结果有异常则触发 onthrow
方法,没有异常则触发 onreturn
方法。
异步回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| +--- FutureFilter private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) { ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
future.setCallback(new ResponseCallback() {
@Override public void done(Object rpcResult) { if (rpcResult == null) { logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName())); return; } if (!(rpcResult instanceof Result)) { logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName())); return; }
Result result = (Result) rpcResult; if (result.hasException()) { fireThrowCallback(invoker, invocation, result.getException()); } else { fireReturnCallback(invoker, invocation, result.getValue()); } }
@Override public void caught(Throwable exception) { fireThrowCallback(invoker, invocation, exception); } }); } }
|
异步事件通知逻辑需要从上下文中先获取到调用产生的 Future
对象,然后给该对象设置回调对象,回调对象在合适的时机触发事件通知。信息交换层 中的结果通知和回调紧密关联,关于两者的交互可以参考 信息交换层
。
触发 onreturn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| +--- FutureFilter private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY)); final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
if (onReturnMethod == null && onReturnInst == null) { return; }
if (onReturnMethod == null || onReturnInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onReturnMethod.isAccessible()) { onReturnMethod.setAccessible(true); }
Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = result; params[1] = args;
} else { params = new Object[args.length + 1]; params[0] = result; System.arraycopy(args, 0, params, 1, args.length); }
} else { params = new Object[]{result}; }
try { onReturnMethod.invoke(onReturnInst, params); } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); } }
|
onreturn
指定的方法是返回正常结果通知方法,该方法会在调用结果没有异常的情况下执行,且该方法必须至少要有一个入参来接收返回结果。
触发 onthrow
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| +--- FutureFilter private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY)); final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
if (onthrowMethod == null && onthrowInst == null) { return; } if (onthrowMethod == null || onthrowInst == null) { throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onthrowMethod.isAccessible()) { onthrowMethod.setAccessible(true); }
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
if (rParaTypes[0].isAssignableFrom(exception.getClass())) { try { Object[] args = invocation.getArguments(); Object[] params;
if (rParaTypes.length > 1) { if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) { params = new Object[2]; params[0] = exception; params[1] = args;
} else { params = new Object[args.length + 1]; params[0] = exception; System.arraycopy(args, 0, params, 1, args.length); }
} else { params = new Object[]{exception}; }
onthrowMethod.invoke(onthrowInst, params); } catch (Throwable e) { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e); } } else { logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception); } } }
|
onthrow
指定的方法是返回异常结果通知方法,该方法会在调用结果有异常的情况下执行,且该方法必须至少有一个入参且第一个入参类型为Throwable或其子类接收返回的异常结果。
小结
FutureFilter
用于消费端事件通知,以过滤器的形式统一处理,支持在服务调用之前、调用后以及调用异常的逻辑处理,和切面带来的结果类似。