Dubbo过滤器 - ContextFilter & ConsumerContextFilter

概述

ContextFilterConsumerContextFilter 分别用来初始化服务提供端和消费端的上下文 RpcContext 。无论是消费端发起的调用,还是服务端收到的调用,从节点的角度看都是一次调用,都可能产生很多中间临时信息,我们不可能要求在每个方法的参数位置都加一个上下文参数,然后一路往下传。通常做法都是放在 ThreadLocal 中,作为一个全局参数,当前线程中的任何一个地方都可以直接操作上下文信息。

RPC 上下文

RpcContext 是 Dubbo 的上下文信息,其定义如下:

1 上下文中存放的是当前调用过程中所需的环境信息,如 Invoker信息,Invocation信息、地址信息等。
2 RpcContext 是一个 ThreadLocal 的临时状态记录器,该对象维护两个 InternalThreadLocal,分别记录 local 和 server 的上下文。每次收到或发起 RPC 调用的时候,上下文信息都会发生改变。比如:A 调用 B,B 再调用 C,则 B 机器上:在 B 调 C 之前,RpcContext 记录的是 A 调 B 的上下文信息,在 B 开始调 C 时 ,RpcContext 记录的是 B 调 C 的上下文信息。发起调用的时候上下文是由 ConsumerContextFilter 实现的,ContextFilter 保存的是收到的请求的上下文。

服务消费方

1
2
3
4
5
6
7
8
9
10
// 远程调用
xxxService.xxx();
// 本端是否为消费端,这里会返回true
boolean isConsumerSide = RpcContext.getContext().isConsumerSide();
// 获取最后一次调用的提供方IP地址
String serverIP = RpcContext.getContext().getRemoteHost();
// 获取当前服务配置信息,所有配置信息都将转换为URL的参数
String application = RpcContext.getContext().getUrl().getParameter("application");
// 注意:每发起RPC调用,上下文状态会变化
yyyService.yyy();

服务提供方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class XxxServiceImpl implements XxxService {

public void xxx() {
// 本端是否为提供端,这里会返回true
boolean isProviderSide = RpcContext.getContext().isProviderSide();
// 获取调用方IP地址
String clientIP = RpcContext.getContext().getRemoteHost();
// 获取当前服务配置信息,所有配置信息都将转换为URL的参数
String application = RpcContext.getContext().getUrl().getParameter("application");
// 注意:每发起RPC调用,上下文状态会变化
yyyService.yyy();
// 此时本端变成消费端,这里会返回false
boolean isProviderSide = RpcContext.getContext().isProviderSide();
}
}

ConsumerContextFilter

在服务消费者中使用,负责发起调用时初始化 RpcContext 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext()
// 记录Invoker
.setInvoker(invoker)
// 记录服务调用参数
.setInvocation(invocation)
// 本地地址
.setLocalAddress(NetUtils.getLocalHost(), 0)
// 远端地址
.setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

// 设置 RpcInvocation 对象的 invoker 属性
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}

try {
RpcResult result = (RpcResult) invoker.invoke(invocation);
RpcContext.getServerContext().setAttachments(result.getAttachments());
return result;
} finally {
/**
* 清理隐式参数集合
* 注意:每次服务调用完成,RpcContext设置的隐式参数都会被清理
*/
RpcContext.getContext().clearAttachments();
}
}
}

ConsumerContextFilter 通常会和 ContextFilter 配合使用,因为在微服务环境中有很多链式调用。收到请求时,当前节点可以被看作一个服务提供者,由 ContextFilter 设置上下文。当发起请求调用其他服务,当前服务变成一个消费者,由 ConsumerContextFilter 设置上下文。

ContextFilter

在服务提供者中使用,负责被调用时初始化 RpcContext 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Activate(group = Constants.PROVIDER, order = -10000)
public class ContextFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
/**
* 创建新的 attachments 集合,清理公用的隐式参数。公用的隐式参数,设置的地方如下:

* @see RpcInvocation#RpcInvocation(com.alibaba.dubbo.rpc.Invocation, com.alibaba.dubbo.rpc.Invoker)
*/
Map<String, String> attachments = invocation.getAttachments();
// path,group,version,dubbo,token,timeout,async 都是保留字段,不能作为隐式参数的 key
if (attachments != null) {
attachments = new HashMap<String, String>(attachments);
// 清理 path
attachments.remove(Constants.PATH_KEY);
// 清理 group
attachments.remove(Constants.GROUP_KEY);
// 清理 version
attachments.remove(Constants.VERSION_KEY);
// 清理 dubbo
attachments.remove(Constants.DUBBO_VERSION_KEY);
// 清理 token
attachments.remove(Constants.TOKEN_KEY);
// 清理 timeout
attachments.remove(Constants.TIMEOUT_KEY);
// 清除异步属性,防止异步属性传到过滤器下一个环节
attachments.remove(Constants.ASYNC_KEY);
}

// 这里和 ConsumerContextFilter 不同,没有设置 remoteAddress 的值,做为服务端的过滤器,在收到请求的时候就已经设置了 remoteAddress 的值
// 如 Dubbo 协议: @see com.alibaba.dubbo.remoting.exchange.support.ExchangeHandlerAdapter.reply

RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
// .setAttachments(attachments) // merged from dubbox
.setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());

// mreged from dubbox
// we may already added some attachments into RpcContext before this filter (e.g. in rest protocol)
if (attachments != null) {
if (RpcContext.getContext().getAttachments() != null) {
RpcContext.getContext().getAttachments().putAll(attachments);
} else {
RpcContext.getContext().setAttachments(attachments);
}
}

// 设置 RpcInvocation 对象的 'invoker' 属性
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}

try {
// 调用过滤器链的下一个节点
RpcResult result = (RpcResult) invoker.invoke(invocation);
// 将 SERVER_LOCAL 这个 RpcContext 中的附加信息添加到 RpcResult 的 attachments 字段中,返回给 Consumer。
result.addAttachments(RpcContext.getServerContext().getAttachments());
return result;
} finally {

// 清除上下文信息,当前线程处理下一个调用的时候,会创建新的 RpcContext
RpcContext.removeContext();
RpcContext.getServerContext().clearAttachments();
}
}
}

隐式参数

Dubbo 中的 Attachment 在服务消费方和提供方之间隐式传递参数,可以通过 RpcContext 上的 setAttachmentgetAttachment 在服务消费方和提供方之间进行参数的隐式传递,而传递的载体就是 Invocation 对象。Attachment 传递的过程如下图所示:

注意: path, group, version, dubbo, token, timeout,async 几个 key 是保留字段,在设置 Attachment 参数的 key 时需要使用其它名称。

消费端

在消费端使用 setAttachment 设置 KV 对,在完成一次远程调用会被清空,即多次远程调用要多次设置。

1
2
3
4
5
// 隐式传参,后面的远程调用都会隐式将这些参数发送到服务器端,类似cookie,用于框架集成,不建议常规业务使用。
RpcContext.getContext().setAttachment("index", "1");
// 远程调用
Invoker.invoke();
// ...

服务端

在服务提供端使用 getAttachment 获取隐式参数。

1
2
// 获取客户端隐式传入的参数,用于框架集成,不建议常规业务使用
String index = RpcContext.getContext().getAttachment("index");

小结

本篇文章对 RpcContext 分别在服务提供端和服务消费端的初始化进行了介绍,初始化的时机是每次发起调用和每次被调用。有了 RpcContext 就不需要将调用相关信息通过方法依次传递了。