Dubbo源码分析 - 泛化调用

概述

Dubbo 中的泛化调用主要用于客户端没有 API接口及模型类元 的情况。即客户端只需根据服务端提供的API文档,无需引入相关接口依赖,直接通过 GenericSerive 接口来发起服务调用,参数及返回值中的所有 POJO 均使用 Map 表示。泛化调用对于服务提供端是无需关注的,按正常服务暴露即可。

应用场景

服务测试框架

服务测试框架是作为各个 RPC 服务的调用端,一般情况下,调用端是需要依赖服务提供方接口的。而作为一个通用的测试框架不应该依赖所有服务提供方的接口,不能因为每发布一个新的服务就升级这个测试框架。这时就需要让调用方在没有服务提供方接口的情况下,仍然可以正常地发起 RPC 调用。

服务网关

各个业务方可以使用 HTTP 的方式,通过服务网关调用其它服务。这种场景和通用的服务测试框架有同样的问题,不应该也不能依赖所有服务提供方的接口,也需要调用方在没有服务提供方接口的情况下,仍然可以正常地发起 RPC 调用。

代理必要性

在 Dubbo 调用的过程中,调用端是通过动态代理向服务端发起远程调用,一般调用端是通过服务端接口自动生成代理对象的。但 RPC 调用的本质是调用端向服务端发送一条请求消息,服务端接收并处理,然后向调用端发送一条响应消息,调用端收到并处理响应消息,一次 RPC 调用就完成了。不难看出,是否使用代理,是否使用服务接口代理不是最重要的,重要的是调用端在没有服务接口的情况下仍然能够向服务端发送正确的请求消息。对于调用端来说,服务接口的作用仅用于创建代理对象,基于面向接口开发

关键点在于,只要调用端将调用环境信息,如接口名,方法名,参数信息等发送给服务端,服务端就能解析并处理这条请求消息。但 Dubbo 的调用端向服务端发送消息是通过代理对象来完成的,基于这种情况,Dubbo 定义了一个统一的接口 GenericSerive ,调用端在使用该接口的代理时指定相关调用信息即可。GenericSerive 接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public interface GenericService {

/**
* 泛化调用
*
* @param method 方法名
* @param parameterTypes 参数类型数组
* @param args 参数数组
* @return invocation 调用结果
* @throws GenericException potential exception thrown from the invocation
*/
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;

/**
* 泛化调用异步支持 (2.7 新增)
*
* @param method
* @param parameterTypes
* @param args
* @return
* @throws GenericException
*/
default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
Object object = $invoke(method, parameterTypes, args);
if (object instanceof CompletableFuture) {
return (CompletableFuture<Object>) object;
}
return CompletableFuture.completedFuture(object);
}
}

使用示例

服务端

服务端无需关注泛化调用,不做任何处理,正常服务暴露即可。

  • 接口定义
1
2
3
public interface NotGenericService {
String getRemark(GenericRequest request);
}
  • 服务暴露
    1
    <dubbo:service interface="com.code.dubbo.NotGenericService" ref="notGenericServiceImpl"/>

消费端

  • 服务引入
    1
    <dubbo:reference id="notGenericService" interface="com.code.dubbo.NotGenericService" generic="true"/>

说明:

  1. 消费端无需引入 interface 配置项的依赖,因为设置了 generic 属性,此时 Dubbo 只会将该配置项值看作是一个字符串,不会检查该接口是否存在。
  2. interface 配置项的值是要引用的服务,通过该配置,可以从注册中心获取到所有该服务的提供方的地址。
  3. generic 配置项支持设置 true、nativejava 以及 bean,在 Dubbo 2.7 中还支持了 protobuf-json 和 raw.return ,用于对参数进行序列化和反序列化。可以看出,该配置项不仅用于标识是否泛化,还表示泛化情况下的序列化方式
  • 服务调用
    1
    2
    3
    4
    5
    6
    7
    GenericService genericService = (GenericService) applicationContext.getBean("notGenericService");
    // 通过 GenericSerive 发起服务调用,参数及返回值中的所有 `POJO` 均使用 `Map` 表示
    Map<String, Object> genericParamMap = new HashMap<>(2);
    genericParamMap.put("class","com.code.dubbo.GenericRequest");
    genericParamMap.put("remark", "this is remark!");
    // 泛化值类型 Map
    Object getRemark = genericService.$invoke("getRemark", new String[]{"com.code.dubbo.GenericRequest"}, new Object[]{genericParamMap});

泛化接口类型

调用方在进行泛化调用的时候之所以可以直接使用 GenericService 转换,是在服务引用的时候会判断是否是泛化引用,如果是泛化引用则将接口类型设置为 GenericService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+--- ReferenceConfig#init
// 是否是泛化引用,就直接设置当前接口为 GenericService
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;

// 普通接口的实现
} else {
try {
// 根据接口名,获得对应的接口类
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}

// 校验接口和方法
checkInterfaceAndMethods(interfaceClass, methods);
}

说明: 泛化引用虽然会将服务接口类型设置为 GenericService ,但是并不影响服务发现。

GenericImplFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {

private static final Logger logger = LoggerFactory.getLogger(GenericImplFilter.class);

/**
* 泛化参数类型
*/
private static final Class<?>[] GENERIC_PARAMETER_TYPES = new Class<?>[]{String.class, String[].class, Object[].class};

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
// 1. 获得 generic 配置项
String generic = invoker.getUrl().getParameter(Constants.GENERIC_KEY);

// 泛化实现的调用 - 客户端调用的服务是 GenericService
if (ProtocolUtils.isGeneric(generic) // 判断是否开启了泛化引用
&& !Constants.$INVOKE.equals(invocation.getMethodName()) // 方法名非 $invoke
// 调用信息是 RpcInvocation 类型
&& invocation instanceof RpcInvocation) {

// 省略客户端调用泛化实现的逻辑
}

// 泛化引用的调用 - GenericService 调用服务接口
if (invocation.getMethodName().equals(Constants.$INVOKE) // 调用方法是 $invoke
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3 // 方法参数是 3 个
// 判断是否开启了泛化引用
&& ProtocolUtils.isGeneric(generic)) {

// 2 方法参数列表
Object[] args = (Object[]) invocation.getArguments()[2];

// 3. 根据 generic 的值校验参数值
// 3.1 genecric = nativejava的情况,校验方法参数是否都为 byte[]
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}

// 3.2 generic = bean 的情况,校验方法参数 为 JavaBeanDescriptor
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}

// 4 通过隐式参数,传递 generic 配置项
((RpcInvocation) invocation).setAttachment(Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}

// 5 普通调用
return invoker.invoke(invocation);
}
}

GenericImplFilter 是服务消费端的过滤器,主要是对 泛化引用泛化实现 在调用端的处理。

泛化调用

调用端进行泛化调用的时候,会对参数进行校验,防止服务端接收到参数时反序列化失败;还会将 generic 配置项通过隐式参数的形式传递到服务端。

泛化实现

调用端对泛化实现的服务进行调用时,会对参数进行序列化,并重新设置 RpcInvocation 的方法名、参数类型以及参数值。

GenericFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 1 是否是泛化调用: 方法名:$invoke & 参数个数:3 & 调用接口非 GenericService
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null && inv.getArguments().length == 3
&& !invoker.getInterface().equals(GenericService.class)) {
// 调用的服务方法名
String name = ((String) inv.getArguments()[0]).trim();
// 调用的服务方法参数类型
String[] types = (String[]) inv.getArguments()[1];
// 嗲用的服务方法参数列表
Object[] args = (Object[]) inv.getArguments()[2];

try {
// 2. 反射获得提供方的方法对象,注意这里的 invoker 是服务端的,因此 interface 是服务接口,而非GenericService
Method method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types);
// 2.1 获取服务提供方的目标方法的参数类型
Class<?>[] params = method.getParameterTypes();
if (args == null) {
args = new Object[params.length];
}

// 3 获得 generic 配置项
String generic = inv.getAttachment(Constants.GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(Constants.GENERIC_KEY);
}

// 4 根据 generic 的配置项反序列化参数值
// 4.1 如果没有设置 generic 或者 generic = true,反序列化参数,Map->Pojo (在 java 中,pojo通常用map来表示)
if (StringUtils.isEmpty(generic) || ProtocolUtils.isDefaultGenericSerialization(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());

// 4.2 generic = nativejava, 反序列化参数, byte[]-> Pojo
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try {
UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i]);
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e);
}
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_NATIVE_JAVA +
"] only support message type " +
byte[].class +
" and your message type is " +
args[i].getClass());
}
}

// 4.3 generic = bean ,反序列化参数,JavaBeanDescriptor -> Pojo
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw new RpcException(
"Generic serialization [" +
Constants.GENERIC_SERIALIZATION_BEAN +
"] only support message type " +
JavaBeanDescriptor.class.getName() +
" and your message type is " +
args[i].getClass().getName());
}
}
}

// 5 方法参数转换完毕,进行方法调用。
// 注意此时创建了一个新的 RpcInvocation 对象。$invoke 泛化调用被转为具体的普通调用
Result result = invoker.invoke(new RpcInvocation(method, args, inv.getAttachments()));

// 如果调用结果有异常,并且非GenericException异常,则使用 GenericException 包装
if (result.hasException() && !(result.getException() instanceof GenericException)) {
return new RpcResult(new GenericException(result.getException()));
}

// generic=nativejava的情况下,序列化结果, 结果 -> btyp[]
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
try {
UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(512);
ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(Constants.GENERIC_SERIALIZATION_NATIVE_JAVA)
.serialize(null, os).writeObject(result.getValue());
return new RpcResult(os.toByteArray());
} catch (IOException e) {
throw new RpcException("Serialize result failed.", e);
}

// generic=bean 的情况下,序列化结果, 结果 -> JavaBeanDescriptor
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
return new RpcResult(JavaBeanSerializeUtil.serialize(result.getValue(), JavaBeanAccessor.METHOD));

// generic=true 的情况下,序列化结果,Pojo -> Map
} else {
return new RpcResult(PojoUtils.generalize(result.getValue()));
}
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw new RpcException(e.getMessage(), e);
}
}

// 普通调用(包括调用泛化实现)
return invoker.invoke(inv);
}
}

GenericFilter 是服务端的过滤器,用于处理泛化调用逻辑,不会处理对泛化实现的调用,会将其当作普通调用处理。该过滤主要逻辑如下:

  1. 从调用信息 Invocation 中获取服务方法名、方法参数类型、方法参数列表。
  2. 反射获取提供方的方法对象,注意这里的 invoker 是服务端的,因此 interface 是服务接口,而非GenericService 。
  3. 从隐式参数中获取 generic 配置项,该配置项决定了参数反序列化的方式。
  4. 构建新的 RpcInvocation 对象进行方法调用,本质上是将 $invoke 泛化调用被转为具体的普通调用
  5. 对调用结果进行处理,其中会对正常结果进行序列化处理,然后响应给调用端。若是异常结果,且非 GenericException 异常,则使用 GenericException 包装后返回,防止异常在服务消费端不存在导致反序列化失败。

泛化调用流程图

小结

Dubbo 泛化调用实际是在 Filter 过滤链上执行的序列化和反序列化操作,服务端通过调用端传递的调用信息反射获取对应的服务方法,进而进行服务调用。