Dubbo源码分析 - 服务引用

概述

一个 RPC 框架暴露给用户最基本的功能就是服务发布服务引用,上一篇文章中已经详细介绍了 Dubbo 服务暴露核心流程,本篇文章将对服务引用进行分析。Dubbo 支持两种方式引用服务:

  • 服务直连的方式,仅适合在调试服务的时候使用;
  • 基于注册中心引用服务,这是生产环境中使用的服务引用方式。

ReferenceConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class ReferenceConfig<T> extends AbstractReferenceConfig {

private static final long serialVersionUID = -5864351140409987595L;

/**
* 自适应 Protocol 拓展实现
*/
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

/**
* 自适应 Cluster 拓展实现
*/
private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();

/**
* 自适应 ProxyFactory 拓展实现
*/
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
/**
* 服务引用URL数组
*/
private final List<URL> urls = new ArrayList<URL>();
/**
* 服务接口名
* todo 用于组装 URL
*/
private String interfaceName;
/**
* 服务接口
* todo 用于创建代理对象
*/
private Class<?> interfaceClass;


// client type
private String client;

/**
* 直连服务提供者地址
* 1 可以是注册中心,也可以是服务提供者
* 2 可以配置多个,使用 ";" 分割
*/
private String url;

// method configs
private List<MethodConfig> methods;

// default config
private ConsumerConfig consumer;
/**
* 协议名
*/
private String protocol;

/**
* 基于服务接口的代理对象
*/
private transient volatile T ref;


private transient volatile Invoker<?> invoker;
private transient volatile boolean initialized;
private transient volatile boolean destroyed;
}

ReferenceConfig 作为服务引用的核心配置承载类,用于保存服务引用相关配置信息。

服务引用入口

1
2
3
4
5
6
7
8
9
10
11
12
13
+--- ReferenceConfig
public synchronized T get() {
// 已销毁,不可获得
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
// 若未初始化,调用init()方法进行初始化
if (ref == null) {
init();
}
// 返回引用服务
return ref;
}

在服务消费方中,接口并不包含具体的实现逻辑,具体实现都放在服务提供方,但是当我们在调用接口的时候,却能做到与调用本地方法没有区别,原因在于调用方提供了一个代理类,在运行时与该接口绑定,当接口中的方法被调用时,实际是作用于该代理类上,代理类封装了远程调用的逻辑,把请求参数发送给远程服务提供方,获取结果后再返回

服务引用的核心是创建服务接口的代理对象,创建代理对象之前会从配置中获取需要的参数,这个和服务暴露是一致的,下面我们对创建代理对象之前的准备工作进行简单说明。

服务引用准备

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
+--- ReferenceConfig
private void init() {
// 已经初始化过,直接返回
if (initialized) {
return;
}
initialized = true;
// 校验接口名非空
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:reference interface=\"\" /> interface not allow null!");
}
// 拼接属性配置(环境变量 + .properties 中的属性)到 ConsumerConfig对象
checkDefault();
// 拼接属性配置(环境变量 + .properties 中的属性)到ReferenceConfig(自己)
appendProperties(this);


/*----------- 1 泛化调用处理 --------------*/

// 若未设置 generic 属性,就使用ConsumerConfig的generic属性
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
}

// 是否是泛化引用,如果是就直接设置当前接口为 GenericService
if (ProtocolUtils.isGeneric(getGeneric())) {
interfaceClass = GenericService.class;

// 普通接口的实现
} else {
try {
// 根据接口名,获得对应的接口类
// todo 注意,interfaceClass 和 interfaceName 的关系
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}

// 校验接口和方法
checkInterfaceAndMethods(interfaceClass, methods);
}


/*----------- 2 服务直连处理 --------------*/

// todo 直连提供者,第一优先级,通过 -D 参数(系统变量)指定 ,例如 java -Dcom.alibaba.xxx.XxxService=dubbo://localhost:20890
// 根据服务名获取对应的 提供者地址 dubbo://localhost:20890
String resolve = System.getProperty(interfaceName);
String resolveFile = null;

// todo 直连提供者,第二优先级,通过文件映射,例如 com.alibaba.xxx.XxxService=dubbo://localhost:20890
if (resolve == null || resolve.length() == 0) {
// 从系统属性中获取解析文件路径
resolveFile = System.getProperty("dubbo.resolve.file");
if (resolveFile == null || resolveFile.length() == 0) {
// 默认先加载 ${user.home}/dubbo-resolve.properties 文件,无需配置,自动加载
File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
if (userResolveFile.exists()) {
// 获取文件绝对路径
resolveFile = userResolveFile.getAbsolutePath();
}
}
// 存在resolveFile,则进行文件读取加载
if (resolveFile != null && resolveFile.length() > 0) {
Properties properties = new Properties();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(resolveFile));
// 从文件中加载配置
properties.load(fis);
} catch (IOException e) {
throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
} finally {
try {
if (null != fis) {
fis.close();
}
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}

// 根据服务名获取对应的直连 提供者地址 dubbo://localhost:20890
resolve = properties.getProperty(interfaceName);
}
}

// todo 直连提供者,第三优先级,通过配置,如 <dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService" url="dubbo://localhost:20880"/>
// 不通过系统属性指定,就使用配置的直连(在配置的前提下),如:<dubbo:reference id="xxxService" interface="com.alibaba.xxx.XxxService" url="dubbo://localhost:20890" />

// 设置直连提供者的 url
if (resolve != null && resolve.length() > 0) {
url = resolve;
if (logger.isWarnEnabled()) {
if (resolveFile != null) {
logger.warn("Using default dubbo resolve file " + resolveFile + " replace " + interfaceName + "" + resolve + " to p2p invoke remote service.");
} else {
logger.warn("Using -D" + interfaceName + "=" + resolve + " to p2p invoke remote service.");
}
}
}


/*----------- 3 引用配置信息收集 --------------*/
// 尝试从ConsumerConfig 对象中,读取 application,module,registries,monitor 配置对象
if (consumer != null) {
if (application == null) {
application = consumer.getApplication();
}
if (module == null) {
module = consumer.getModule();
}
if (registries == null) {
registries = consumer.getRegistries();
}
if (monitor == null) {
monitor = consumer.getMonitor();
}
}
// 从ModuleConfig 对象中,读取registries,monitor配置对象
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
// 从ApplicationConfig对象中,读取registries,monitor配置对象
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}

// 校验ApplicationConfig配置
checkApplication();
// 校验 Stub和 Mock 相关的配置
checkStubAndMock(interfaceClass);

// 创建参数集合map,用于下面创建Dubbo URL
Map<String, String> map = new HashMap<String, String>();
// 符合条件的方法对象的属性,主要用来Dubbo事件通知
Map<Object, Object> attributes = new HashMap<Object, Object>();

// 将 side,dubbo,timestamp,pid参数,添加到map集合中
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}

// todo 非泛化服务,设置revision,methods,interface加入到map集合中
if (!isGeneric()) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 获取接口方法列表,并添加到map中
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}

// 设置 interface 的值为 interfaceName
map.put(Constants.INTERFACE_KEY, interfaceName);

// 将各种配置对象中的属性,添加到 map 集合中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);

// 获得服务键,作为前缀 格式:group/interface:version
String prefix = StringUtils.getServiceKey(map);

// 将MethodConfig 对象数组中每个MethodConfig中的属性添加到map中
if (methods != null && !methods.isEmpty()) {
// 遍历 MethodConfig 列表
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
// 当配置了 MethodConfig.retry=false 时,强制禁用重试
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
// 添加重试次数配置 methodName.retries
map.put(method.getName() + ".retries", "0");
}
}
// 将带有@Parameter(attribute=true)配置对象的属性,添加到参数集合中
appendAttributes(attributes, method, prefix + "." + method.getName());
// 检查属性集合中的事件通知方法是否正确,若正确,进行转换
checkAndConvertImplicitConfig(method, map, attributes);
}
}

// 以系统环境变量(DUBBO_IP_TO_REGISTRY)的值作为服务消费者ip地址,没有设置再取主机地址
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry == null || hostToRegistry.length() == 0) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

// 把attributes集合添加到StaticContext进行缓存,为了以后的事件通知
StaticContext.getSystemContext().putAll(attributes);



/*----------- 4 根据收集的服务引用配置信息创建接口的代理对象 --------------*/
// 创建Service 代理对象
ref = createProxy(map);

// 根据服务名,ReferenceConfig,代理类构建ConsumerModel,并将ConsumerModel存入到ApplicationModel
ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

服务引用准备工作就一个工作,收集服务引用的配置,特别处理了泛化调用服务直连的情况。有了引用配置信息后,下面我们就看服务引用的逻辑。

服务引用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
+--- ReferenceConfig
private T createProxy(Map<String, String> map) {

/*---------------- 1. 是否本地引用判断 ------------------*/

// 创建URL对象,该对象仅用来 判断是否本地引用。
// protocol = temp的原因是,已经使用InjvmProtocol#getInjvmProtocol方法获取到了具体协议为InjvmProtocol,不需要再通过protocol属性获取具体协议
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 是否本地引用
final boolean isJvmRefer;

// isInjvm()方法返回非空,说明配置了 injvm 配置项,那么就使用本地引用。
if (isInjvm() == null) {
// todo 如果配置了url配置项【直连服务提供者地址】,说明使用直连服务提供者的功能,而不使用本地引用
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;

// 调用InjvmProtocol#isInjvmRefer(url)方法, 通过 tempUrl 判断,是否需要本地引用。 即根据url的协议,scope以及injvm等参数检测是否需要本地引用
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
isJvmRefer = true;

// 默认不是
} else {
isJvmRefer = false;
}
// 通过injvm属性值判断
} else {
isJvmRefer = isInjvm().booleanValue();
}


/*---------------- 2. 执行本地引用。注意,本地引用不支持直连 ------------------*/

// 本地引用
// todo 注意:本地引用服务时,不是使用服务提供者的URL,而是服务消费者的URL
if (isJvmRefer) {

// 创建服务引用 URL 对象,协议为 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);

// 使用 InjvmProtocol 引用服务,返回的是 InjvmInvoker 对象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}

/*---------------- 3. 执行远程引用 ------------------*/
// 远程引用
} else {

/*---------------- 3.1 处理直连的方式 ------------------*/
// url不为空,表示使用直连方式,可以是服务提供者的地址,也可以是注册中心的地址
if (url != null && url.length() > 0) {
// 拆分地址成数组,使用 ";" 分割
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);

// 可能是多个直连地址
if (us != null && us.length > 0) {
for (String u : us) {
// 创建URL对象
URL url = URL.valueOf(u);
// 路径属性 url.path未设置时,就设置默认路径,缺省使用接口全名 interfaceName
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}

// 如果url.protocol = registry时,即是注册中心的地址,在参数url.parameters.refer上带上服务引用的配置参数map集合
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));

// 服务提供者的地址
} else {
// 合并url
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}


/*---------------- 3.2 处理非直连的方式 ------------------*/
// 没有定义直连,就从注册中心中获取服务提供者
} else {

// todo 加载注册中心 URL 数组,支持多注册中心。注意,注册中心的 URL 的协议已经被替换成了 Registry ,而不是真正的协议,如 zookeeper
List<URL> us = loadRegistries(false);

// 循环注册中心数组,添加到 urls 中
if (us != null && !us.isEmpty()) {
for (URL u : us) {

// 加载监控中心 URL
URL monitorUrl = loadMonitor(u);

// 服务引用配置对象 map,带上监控中心的 URL
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}

// 注册中心的地址,带上服务引用的配置参数集合map,作为 refer 参数添加到注册中心的URL中,并且需要编码。通过这样的方式,注册中心的URL中,包含了服务引用的配置。
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}

// 既不是服务直连,也没有配置注册中心,抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}

/*---------------- 3.3 确定服务引用地址后,执行服务引用------------------*/


/*---------------- 3.3.1 单个服务引用地址,直接引用服务即可------------------*/
if (urls.size() == 1) {
// 如果是直连 Provider 的场景,则 URL 可能是如 dubbo 协议;如果依赖注册中心,则使用 RegistryProtocol
invoker = refprotocol.refer(interfaceClass, urls.get(0));

/*---------------- 3.3.2 多个服务引用地址(可能是多个服务提供者地址、可能是多个注册中心,也可能是混合的情况),需要将引入的服务通过 Cluster 进行包装------------------*/
} else {

// 循环 urls,引用服务,返回Invoker 对象。此时会有多个Invoker对象,需要进行合并。
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;

// 上面的逻辑已经处理了,可以区分是注册中心还是服务提供者,urls列表中的URL如果是注册中心就会使用refer参数拼接消费者URL信息
for (URL url : urls) {

// 引用服务,返回 Invoker 对象并把服务加入到Invoker集合中
invokers.add(refprotocol.refer(interfaceClass, url));

// 确定是 注册中心,还是直连Provider
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}

// 有注册中心就是多注册中心的情况,则将引用的 Invoker 合并
if (registryURL != null) {

// todo 对多注册中心的只用 AvailableCluster进行Invoker的合并,这里是AvailableCluster,即服务调用时仅调用第一个可用的Invoker 【todo 集群容错】
// todo 因为基于每个注册中心的服务已经封装成了一个 ClusterInvoker
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 创建StaticDirectory实例,并由Cluster对多个Invoker进行合并
invoker = cluster.join(new StaticDirectory(u, invokers));


// 无注册中心,也就是直连多个服务,可以是不同协议的服务。使用 cluster 合并即可
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}


// 是否启动时检查
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
// 若配置check = true 配置项时,调用Invker#isAvailable()方法,启动时检查,即就是判断当前创建的invoker是否有对应的Exporter
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}

/*---------------- 4. 根据服务引用的 Invoker 创建代理对象 ------------------*/
/**
* 1 创建Service 代理对象 【该代理对象的内部,会调用 Invoker#invoke(Invocation) 方法,进行 Dubbo 服务的调用】
* 2 getProxy内部最终得到是一个被StuProxyFactoryWrapper包装后的JavassistProxyFactory
* 3 把invoker转换成接口代理,代理都会创建InvokerInvocationHandler,InvokerInvocationHandler实现了JDK的InvocationHandler接口,所以服务暴露的Dubbo接口都会委托给这个代理去发起远程调用(injvm协议除外)
*
*/
return (T) proxyFactory.getProxy(invoker);
}

服务引用主要分为两大逻辑,一个是执行本地引用,另一个是执行远程引用。注意,本地引用是不支持直连的方式,执行远程引用还要特别处理多个引用地址的情况,可能是多个服务地址,也可能是多个注册中心集群地址,这种情况下要合并服务。下面我们对以上的服务引用逻辑进行详细说明。

本地引用

1
2
3
4
5
6
7
8
9
10
11
12
+--- ReferenceConfig
if (isJvmRefer) {

// 创建服务引用 URL 对象,协议为 injvm
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);

// 使用 InjvmProtocol 引用服务,返回的是 InjvmInvoker 对象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}

在服务引用时会先判断是否本地引用,如果本地引用,则会通过 InjvmProtocol 执行本地引用。

InjvmProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 引用本地服务,Invoker执行的时候会从父类中的 {@link #exporterMap} 属性中拿,根据key,即 url.getServiceKey
*
* @param serviceType
* @param url URL address for the remote service
* @param <T>
* @return
* @throws RpcException
*/
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// 创建 InjvmInvoker 对象,注意:
// 1 传入的exporterMap参数,包含所有的 InjvmExporter 对象,它是父类 AbstractProtocol 中的属性
// 2 缓存的服务键格式: group/interface:version,和远程服务的服务键不同,没有 port
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}

InjvmInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class InjvmInvoker<T> extends AbstractInvoker<T> {

/**
* 服务键
*/
private final String key;
/**
* Exporter集合,在InjvmInvoker#invoke(invocation)方法中,可以通过该Invoker的key属性,获得对应的Exporter对象
* key: 服务键
* 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}。
*/
private final Map<String, Exporter<?>> exporterMap;

InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
this.exporterMap = exporterMap;
}


/**
* 是否可用。开启启动时检查时,调用该方法,判断该Invoker对象是否有对应的Exporter,若不存在,说明以来服务不存在,检查不通过
*
* @return
*/
@Override
public boolean isAvailable() {
// 判断是否有Exporter对象
InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
if (exporter == null) {
return false;
} else {
return super.isAvailable();
}
}

/**
* 调用,本质上就是利用公用的 Map
*
* @param invocation
* @return
* @throws Throwable
*/
@Override
public Result doInvoke(Invocation invocation) throws Throwable {
/**
* 1 根据服务键从 {@link InjvmInvoker#exporterMap}缓存中 获取 Exporter
*/
Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}

// 2 设置服务提供者地址为本地
RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0);


// 3 从Exporter中拿到Invoker,然后调用invoke方法,
return exporter.getInvoker().invoke(invocation);
}
}

远程引用

远程引用有两种情况,一种是直连服务提供者,另一种是依赖注册中心。这两种方式的处理截然不同,下面我们分别分析。

直连服务提供者

针对直连提供者的情况,由于不会使用到注册中心,因此在服务引用的时候使用的 Protocol 的实现非 RegistryProtcol ,而是服务具体的协议。

依赖注册中心

依赖注册中心的情况,服务引用的时候使用的 Protocol 的实现是 RegistryProtocol ,它是连接服务提供者、服务消费者以及注册中心的强梁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* 多个Invoker也会被封装成一个
*
* @param type Service class
* @param url URL address for the remote service
* @param <T>
* @return
* @throws RpcException
*/
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {

// 1 获得真实的注册中心的URL
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);

// 2 根据注册中心地址获得注册中心
Registry registry = registryFactory.getRegistry(url);

// todo 这是干嘛的?为什么要给RegistryService 类型生成Invoker
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// 3 获得服务引用配置参数集合Map,这个是从 refer 参数获取的
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));

// 4 从消费配置参数中获取group属性
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {

// 如果多个分组 -> todo 分组聚合,将每个组的服务调用一次,然后聚合结果
// 如,分组聚合 group="a,b" or group="*"
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {

// 通过SPI加载 MergeableCluster实例,并调用 doRefer 继续执行引用服务逻辑。
// com.alibaba.dubbo.registry.integration.RegistryDirectory.toMergeMethodInvokerMap 该方法已经使用自适应 cluster 合并了分组 Invoker
return doRefer(getMergeableCluster(), registry, type, url);
}
}


/**
* 执行服务引用
* 服务分组的使用,如果消费方有指定 group 属性,那么该属性发挥的作用在服务订阅时,对 Provider 的过滤。注意和分组聚合的区别。
*
* @see com.alibaba.dubbo.common.utils.UrlUtils#isMatch(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.common.URL)
*/
return doRefer(cluster, registry, type, url);
}

上述方法做了两件事,还原注册中心真实的URL地址并根据该地址获取 Registry 对象;如果服务消费方使用了多分组,则自动触发分组合并功能,该功能由自动创建的 MergeableCluster 来完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
+--- RegistryProtocol
/**
* 执行服务引用,返回Invoker对象
*
* @param cluster Cluster 对象
* @param registry 注册中心对象
* @param type 服务接口类型
* @param url 注册中心URL
* @param <T> 泛型
* @return Invoker 对象
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
/**
* 1 创建RegistryDirectory对象【服务目录】,并设置注册中心到它的属性,该对象包含了注册中心的所有服务提供者 List<Invoker>
* 2 其中在其父类AbstractDirectory中会创建List<Router>routers
*/
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
// 设置注册中心和协议
directory.setRegistry(registry);
directory.setProtocol(protocol);


// 获得服务引用配置集合 parameters。注意:url传入RegistryDirectory后,经过处理并重新创建,所以 url != directory.url,
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());

// 生成消费者URL,协议为consumer,具体的参数是 RegistryURL 中 refer 参数指定的参数。在 RegistryDirectory 创建时进行初始化。
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);

// 向注册中心注册服务消费者,在consumers目录下
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 在 subscribeUrl中添加category=consumers和check=false参数
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}

/** 向注册中心订阅 服务提供者 + 路由规则 + 配置规则 节点下的数据,完成订阅后,RegistryDirectory 会收到这几个子节点信息
* 注意:
* 1 第一次发起订阅时会进行一次数据拉取,同时触发RegistryDirectory#notify方法,这里的通知数据是某一个类目的全量数据,如:providers,router,configurators 类目数据。
* 并且当通知providers数据时,在RegistryDirectory#toInvokers方法内完成Invoker转换
* 2 当注册中心宕机,订阅会失败进入catch 逻辑 ---> {@link FailbackRegistry#subscribe(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.registry.NotifyListener)}
*
*/
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));

/**
* 由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker,这一个Invoker代表了多个。
* Cluster默认为FailoverCluster实例,支持服务调用重试
*/
Invoker invoker = cluster.join(directory);

// 向本地注册表,注册消费者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}

上述方法首先会根据 URL 初始化 RegistryDirectory 实例,然后生成 subscribe URL 并进行注册,之后会通过 Registry 订阅服务,最后通过 Cluster 将多个 Invoker 合并成一个 Invoker 返回给上层。

可以看到,依赖注册中心进行服务引用时,核心是对服务目录 RegistryDirectory 生成,这个服务目录内部会使用服务URL对应的具体协议,如 DubboProtcol 引用服务并保存到服务目录中,不同服务可以使用不同的协议。Dubbo 使用 Cluster 将服务目录包装成一个 Cluster Invoker 返回给上层,对上层来说是透明的。关于服务目录可参考 Directory

小结

本篇文章对 Dubbo 服务引用进行了分析,服务引用可分为本地引用和远程引用。服务引用支持直连和依赖注册中心,这两种方式处理截然不同,根本区别在于使用的 Protocol 实现不同。