前言 在 远程通信模块总览 中对 Remoting 层进行了总体说明,下面我们开始详细介绍 Remoting 层的 Transport 网络传输层。本文会从 Transporter 层的 Server、Client、Channel、ChannelHandler、Dispatcher 以及 Codec2 等核心接口出发,分别介绍这些核心接口的实现。
概述 有很多网络库可以实现网络传输的功能,如 Netty、Mina、Grizzly等。但这些 NIO 库对外接口和使用方式不一样,如果使用方直接使用 Netty 或其它通信组件,那么就依赖了具体的NIO库实现,而不是依赖一个有传输能力的抽象,后续要切换其它NIO库实现的话就需要修改依赖和接入的相关代码,这既容易出错也不符合设计模式中的开放-封闭原则。因此,Dubbo Transporter 层就被抽象出来了,它屏蔽了不同的通信框架的异同,封装了统一的对外接口。有了 Transporter 层之后,我们可以通过 Dubbo SPI 动态切换具体的 Transporter 扩展实现,从而切换到不同的 Client 和 Server 实现,达到底层 NIO 库切换的目的。需要注意的是,Dubbo Transporter 层不等于 Transport 扩展接口及其实现,它是对网络传输层的抽象即在NIO库之上的抽象,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec, ChannelHandler, Dispatcher 等。
Transport 抽象层代码结构如下:
注意, Dubbo 接入具体 NIO 库的代码散落在 dubbo-remoting-* 实现模块中,会在后面的文章中介绍。
Transporter 扩展接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @SPI ("netty" )public interface Transporter { @Adaptive ({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind (URL url, ChannelHandler handler) throws RemotingException ; @Adaptive ({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect (URL url, ChannelHandler handler) throws RemotingException ; }
Transporter 是在 Client 和 Server 之上封装的统一的对外接口,针对每个支持的NIO库,都有一个 Transporter 接口实现,它们是 Dubbo 接入具体NIO库的实现入口,在各个 dubbo-remoting-* 实现模块中。如,Dubbo 接入 Mina 网络通信库,就会有对应的 dubbo-remoting-mina 模块对抽象api模块的实现,该模块提供了 Transporter 、Server 、Client 、Channel 、ChannelHandler 等核心接口的实现。
这些 Transporter 接口实现返回的 Client 和 Server 具体实现如下图所示,它们是Dubbo 接入的NIO库对应的 Server和Client实现。
具体NIO库Server的实现
具体NIO库Client的实现
在 远程通信模块总览 中已经介绍过 Transporter 接口以及该接口的门面类 Transporters ,这里不再重复介绍。关于通信具体实现模块会在后面的文章中介绍,它们也是 Transporter 层的一部分,本篇文章着重分析 Transport 层公用组件及抽象概念。
AbstractPeer 抽象类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public abstract class AbstractPeer implements Endpoint , ChannelHandler { private final ChannelHandler handler; private volatile URL url; private volatile boolean closing; private volatile boolean closed; public AbstractPeer (URL url, ChannelHandler handler) { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } this .url = url; this .handler = handler; } }
AbstractPeer 这个抽象类,它同时实现了 Endpoint 接口和 ChannelHandler 接口,AbstractPeer 对 ChannelHandler 接口的所有实现都是委托给维护的 ChannelHandler 属性来处理。对 Endpoint 接口的实现,包括和Channel有关的,如关闭Channel、开始关闭Channel(做标记关闭)、检查Channel是否关闭,这些都是对其维护的 closing 和 closed 属性进行操作;发送消息 send 方法的实现交给其子类去完成;获取端点自身的 URL;获取 ChannelHandler。需要特别说明的是,上层的 ChannelHandler 在链路的最底层保存的位置就是在 AbstractPeer 这个抽象类中 。
AbstractPeer 也是 AbstractChannel、AbstractEndpoint 抽象类的父类,继承关系如下图:
红框中的实现类是 Dubbo 接入的具体NIO库实现相关的 Server、Client 和 Channel 实现类,通过继承关系以及前面的描述,我们可以知道 AbstractChannel、AbstractServer、AbstractClient 都会关联一个 ChannelHandler 对象 ,这个对象很重要,后面会慢慢揭开它的面纱。
AbstractEndpoint 抽象类 上文也提到了,AbstractEndpoint 继承了 AbstractPeer 这个抽象类,因为继承关系因此也会关联一个 ChannelHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public abstract class AbstractEndpoint extends AbstractPeer implements Resetable { private static final Logger logger = LoggerFactory.getLogger(AbstractEndpoint.class ) ; private Codec2 codec; private int timeout; private int connectTimeout; public AbstractEndpoint (URL url, ChannelHandler handler) { super (url, handler); this .codec = getChannelCodec(url); this .timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); this .connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); } protected static Codec2 getChannelCodec (URL url) { String codecName = url.getParameter(Constants.CODEC_KEY, "telnet" ); if (ExtensionLoader.getExtensionLoader(Codec2.class ).hasExtension (codecName )) { return ExtensionLoader.getExtensionLoader(Codec2.class ).getExtension (codecName ) ; } else { return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class ).getExtension (codecName )) ; } } }
通过上面的代码可以看到,AbstractEndpoint 中维护了一个编解码对象 Codec2 ,该对象是在 AbstractEndpoint 构造方法中根据传入的URL完成初始化 ,这个非常重要。除了维护 Codec2 编解码对象外,还维护了超时时间(timeout)和连接超时时间(connectTimeout),它们也是在构造方法中根据传入的URL进行初始化的。
此外,AbstractEndpoint 还实现了 Resetable 接口用来支持重置 AbstractEndpoint 中维护的三个属性,代码实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 --- AbstractEndpoint @Override public void reset (URL url) { if (isClosed()) { throw new IllegalStateException("Failed to reset parameters " + url + ", cause: Channel closed. channel: " + getLocalAddress()); } try { if (url.hasParameter(Constants.TIMEOUT_KEY)) { int t = url.getParameter(Constants.TIMEOUT_KEY, 0 ); if (t > 0 ) { this .timeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.CONNECT_TIMEOUT_KEY)) { int t = url.getParameter(Constants.CONNECT_TIMEOUT_KEY, 0 ); if (t > 0 ) { this .connectTimeout = t; } } } catch (Throwable t) { logger.error(t.getMessage(), t); } try { if (url.hasParameter(Constants.CODEC_KEY)) { this .codec = getChannelCodec(url); } } catch (Throwable t) { logger.error(t.getMessage(), t); } }
抽象的服务端和客户端 由上面的继承关系图可知,AbstractServer 和 AbstractClient 都继承自 AbstractEndpoint 抽象类,下面我们先来分析 AbstractServer 这个抽象服务的实现。
AbstractServer 属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public abstract class AbstractServer extends AbstractEndpoint implements Server { protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler" ; private static final Logger logger = LoggerFactory.getLogger(AbstractServer.class ) ; ExecutorService executor; private InetSocketAddress localAddress; private InetSocketAddress bindAddress; private int accepts; private int idleTimeout = 600 ; }
AbstractServer 在继承 AbstractEndpoint 的同时,还实现了 Server 接口,是服务抽象类,重点实现了服务的公用逻辑 ,Server 接口在 在 远程通信模块总览 中已经介绍,其中的属性已经在代码中详细标注。下面我们接着看它的构造方法,上述的属性字段都是在构造方法中进行初始化的。
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 --- AbstractServer public AbstractServer (URL url, ChannelHandler handler) throws RemotingException { super (url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false ) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); this .accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this .idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null , "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class ).getDefaultExtension () ; executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
构造方法会根据传入的URL初始化 AbstractServer 中的属性,这也是为了其子类的初始化做准备,其中在构造方法中调用了一个模版方法 doOpen,这个方法就是初始化其子类的关键入口,即启动具体的NIO服务 ,下篇文章分析具体NIO库是如何接入的就会清晰了。当前Server关联的线程池 executor 是从 DataStore 中取的,下文会对 DataStore 进行介绍并说明线程池的来源。
模版方法 用于子类实现,完成服务的开启和关闭工作。
1 2 3 protected abstract void doOpen () throws Throwable ;protected abstract void doClose () throws Throwable ;
发送消息 发送消息方法是对 Endpoint 接口的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 --- AbstractServer @Override public void send (Object message, boolean sent) throws RemotingException { Collection<Channel> channels = getChannels(); for (Channel channel : channels) { if (channel.isConnected()) { channel.send(message, sent); } } }
客户端请求连接 用于客户端连接当前服务,是对父类 AbstractPeer 方法的重写,对 ChannelHandler 的实现,AbstractPeer 中的实现很简单,只是判断服务是否关闭,关闭就不会处理客户端连接请求,没有关闭则会把连接请求交给维护的 ChannelHandler 处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 --- AbstractServer @Override public void connected (Channel ch) throws RemotingException { if (this .isClosing() || this .isClosed()) { logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process." ); ch.close(); return ; } Collection<Channel> channels = getChannels(); if (accepts > 0 && channels.size() > accepts) { logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts); ch.close(); return ; } super .connected(ch); }
客户端断开连接 用于客户端断开连接当前服务,是对父类 AbstractPeer 方法的重写,对 ChannelHandler 的实现,AbstractPeer 中的实现很简单,直接把断开连接请求交给装饰的 ChannelHandler 处理。
1 2 3 4 5 6 7 8 9 10 --- AbstractServer @Override public void disconnected (Channel ch) throws RemotingException { Collection<Channel> channels = getChannels(); if (channels.isEmpty()) { logger.warn("All clients has discontected from " + ch.getLocalAddress() + ". You can graceful shutdown now." ); } super .disconnected(ch); }
服务关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public void close () { if (logger.isInfoEnabled()) { logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } ExecutorUtil.shutdownNow(executor, 100 ); try { super .close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doClose(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
还有一些不是很重要的其它方法就不分析了,下面继续分析抽象客户端实现。
AbstractClient AbstractClient 同样继承了 AbstractEndpoint 抽象类,并且实现了 Client 接口,是客户端的抽象类,实现了公用的逻辑。Client 接口在 在 远程通信模块总览 中已经介绍过,就不再重复说明。
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public abstract class AbstractClient extends AbstractEndpoint implements Client { private static final Logger logger = LoggerFactory.getLogger(AbstractClient.class ) ; protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler" ; private static final AtomicInteger CLIENT_THREAD_POOL_ID = new AtomicInteger(); private static final ScheduledThreadPoolExecutor reconnectExecutorService = new ScheduledThreadPoolExecutor(2 , new NamedThreadFactory("DubboClientReconnectTimer" , true )); private final Lock connectLock = new ReentrantLock(); private final boolean send_reconnect; private final AtomicInteger reconnect_count = new AtomicInteger(0 ); private final AtomicBoolean reconnect_error_log_flag = new AtomicBoolean(false ); private final int reconnect_warning_period; private final long shutdown_timeout; protected volatile ExecutorService executor; private volatile ScheduledFuture<?> reconnectExecutorFuture = null ; private long lastConnectedTime = System.currentTimeMillis(); }
AbstractClient 中的相关属性已经详细标注,因为是客户端,会涉及到重连服务的情况,属性相对比服务端要多些,但是这些属性都是很有用的 。
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 --- AbstractClient public AbstractClient (URL url, ChannelHandler handler) throws RemotingException { super (url, handler); send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false ); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); reconnect_warning_period = url.getParameter("reconnect.waring.period" , 1800 ); try { doOpen(); } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null , "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } try { connect(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress()); } } catch (RemotingException t) { if (url.getParameter(Constants.CHECK_KEY, true )) { close(); throw t; } else { logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t); } } catch (Throwable t) { close(); throw new RemotingException(url.toInetSocketAddress(), null , "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t); } executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class ).getDefaultExtension ().get (Constants .CONSUMER_SIDE , Integer .toString (url .getPort ())) ; ExtensionLoader.getExtensionLoader(DataStore.class ).getDefaultExtension ().remove (Constants .CONSUMER_SIDE , Integer .toString (url .getPort ())) ; }
构造方法中不仅初始化了属性,还调用了模版方法,用于完成子类的初始化工作,即完成客户端的初始化并连接上服务。具体的客户端实现同样在后面的文章中说明。
模版方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 --- AbstractClient protected abstract void doOpen () throws Throwable ; protected abstract void doClose () throws Throwable ; protected abstract void doConnect () throws Throwable ; protected abstract void doDisConnect () throws Throwable ; protected abstract Channel getChannel () ;
与 AbstractServer 类似,AbstractClient 定义了 doOpen()、doClose()、doConnect()、 doDisConnect() 和 getChannel() 抽象方法给子类实现以完成特定的功能。其中 doClose() 方法在 Netty 实现中是个空方法。
连接服务的通用逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 --- AbstractClient protected void connect () throws RemotingException { connectLock.lock(); try { if (isConnected()) { return ; } initConnectStatusCheckCommand(); doConnect(); if (!isConnected()) { throw new RemotingException(this , "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms." ); } else { if (logger.isInfoEnabled()) { logger.info("Successed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", channel is " + this .getChannel()); } } reconnect_count.set(0 ); reconnect_error_log_flag.set(false ); } catch (RemotingException e) { throw e; } catch (Throwable e) { throw new RemotingException(this , "Failed connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: " + e.getMessage(), e); } finally { connectLock.unlock(); } }
连接服务通用逻辑主要做了以下工作:
获得锁,用于实现在连接和断开连接同时操作时,通过加锁以防止并发问题。
判断是否连接,如果连接了就无需再连接,是否连接逻辑是对Channel接口方法的实现。1 2 3 4 5 6 7 8 9 10 11 12 13 14 --- AbstractClient @Override public boolean isConnected () { Channel channel = getChannel(); if (channel == null ) { return false ; } return channel.isConnected(); }
开启断线重连机制,即初始化重连线程,定时检查连接状态。
调用具体客户端实现的连接服务的方法去连接对应的服务。
连接失败抛出异常,连接成功则打印日志并归零重连次数。
断线重连机制 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 --- AbstractClient private synchronized void initConnectStatusCheckCommand () { int reconnect = getReconnectParam(getUrl()); if (reconnect > 0 && (reconnectExecutorFuture == null || reconnectExecutorFuture.isCancelled())) { Runnable connectStatusCheckCommand = new Runnable() { @Override public void run () { try { if (!isConnected()) { connect(); } else { lastConnectedTime = System.currentTimeMillis(); } } catch (Throwable t) { String errorMsg = "client reconnect to " + getUrl().getAddress() + " find error . url: " + getUrl(); if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout) { if (!reconnect_error_log_flag.get()) { reconnect_error_log_flag.set(true ); logger.error(errorMsg, t); return ; } } if (reconnect_count.getAndIncrement() % reconnect_warning_period == 0 ) { logger.warn(errorMsg, t); } } } }; reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS); } }
断线重连机制就是在客户端连接服务端时,会创建后台任务,定时检查连接,若断开会进行重连。
发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 --- AbstractClient @Override public void send (Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } Channel channel = getChannel(); if (channel == null || !channel.isConnected()) { throw new RemotingException(this , "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); }
客户端连接服务时只会有对应的一个 Channel 通道,客户端发送消息时使用的是 Dubbo 接入具体NIO库的 Channel 实例,如 NettyChannel 实例,它内部封装的 Channel 实例是 Netty 的通道实例 NioClientSocketChannel 。这个在后面的文章中详细说明。
断开连接 该方法目前用在 reconnect() 重连方法和 close() 关闭方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void disconnect () { connectLock.lock(); try { destroyConnectStatusCheckCommand(); try { Channel channel = getChannel(); if (channel != null ) { channel.close(); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doDisConnect(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } } finally { connectLock.unlock(); } }
重连 先断开连接,在进行连接。
1 2 3 4 5 6 7 @Override public void reconnect () throws RemotingException { disconnect(); connect(); }
关闭 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Override public void close () { try { if (executor != null ) { ExecutorUtil.shutdownNow(executor, 100 ); } } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { super .close(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { disconnect(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } try { doClose(); } catch (Throwable e) { logger.warn(e.getMessage(), e); } }
超时关闭 对线程池超时关闭
1 2 3 4 5 @Override public void close (int timeout) { ExecutorUtil.gracefulShutdown(executor, timeout); close(); }
抽象通道 AbstractChannel AbstractChannel 同样继承了 AbstractPeer 这个抽象类,同时还实现了 Channel 接口。AbstractChannel 实现非常简单,只是在 send() 方法中检测了底层连接的状态,没有实现具体的发送消息的逻辑。注意,一般情况下 Dubbo 层面的 Channel 和 具体NIO库的通道是一对一的关系,前者会对后者进行装饰,前者的功能本质上是后者的职能 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public abstract class AbstractChannel extends AbstractPeer implements Channel { public AbstractChannel (URL url, ChannelHandler handler) { super (url, handler); } @Override public void send (Object message, boolean sent) throws RemotingException { if (isClosed()) { throw new RemotingException(this , "Failed to send message " + (message == null ? "" : message.getClass().getName()) + ":" + message + ", cause: Channel closed. channel: " + getLocalAddress() + " -> " + getRemoteAddress()); } } }
继承关系图如下:
各子类实现会对 send 方法进行重写。
ChannelHandler 前文介绍的 AbstractEndpoint、AbstractChannel 都是通过对 AbstractPeer 继承间接实现了 ChannelHandler 接口并关联了 ChannelHandler 对象,仅仅是对 ChannelHandler 的装饰,方法都是委托给底层关联的这个 ChannelHandler 对象。下面我们对 Transporter 层相关的 ChannelHandler 进行详细分析。继承关系如下图所示:
ChannelHandlerAdapter ChannelHandlerAdapter 是 ChannelHandler 的一个空实现,TelnetHandlerAdapter 继承了它并实现了 TelnetHandler 接口,用于支持 Dubbo 命令行的服务治理。关于 Telnet 的实现,会在后面单独进行介绍,这里就不展开说明了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ChannelHandlerAdapter implements ChannelHandler { @Override public void connected (Channel channel) throws RemotingException { } @Override public void disconnected (Channel channel) throws RemotingException { } @Override public void sent (Channel channel, Object message) throws RemotingException { } @Override public void received (Channel channel, Object message) throws RemotingException { } @Override public void caught (Channel channel, Throwable exception) throws RemotingException { } }
ChannelHandlerDispatcher 在前面的文章中有提到过 ChannelHandlerDispatcher,它维护了一个 CopyOnWriteArraySet 集合,负责将多个 ChannelHandler 对象聚合成一个 ChannelHandler 对象。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ChannelHandlerDispatcher implements ChannelHandler { private static final Logger logger = LoggerFactory.getLogger(ChannelHandlerDispatcher.class ) ; private final Collection<ChannelHandler> channelHandlers = new CopyOnWriteArraySet<ChannelHandler>(); public ChannelHandlerDispatcher () { } public ChannelHandlerDispatcher (ChannelHandler... handlers) { this (handlers == null ? null : Arrays.asList(handlers)); } public ChannelHandlerDispatcher (Collection<ChannelHandler> handlers) { if (handlers != null && !handlers.isEmpty()) { this .channelHandlers.addAll(handlers); } } }
ChannelHandlerDispatcher 实现了 ChannelHandler 接口中的所有方法,每个方法都是循环通道处理器集合调用相应的方法。
ChannelHandlerDelegate 实现 ChannelHandler 接口,通道处理器装饰者接口,即是对其它 ChannelHandler 进行装饰的接口,这个接口非常重要 。
1 2 3 4 5 6 7 8 public interface ChannelHandlerDelegate extends ChannelHandler { ChannelHandler getHandler () ; }
ChannelHandlerDelegate 有三个直接的实现类,分别是 AbstractChannelHandlerDelegate 、WrappedChannelHandler 和 HeaderExchangeHandler ,它们就是对其它 ChannelHandler 的装饰。其中 HeaderExchangeHandler 是 Exchange 层涉及的对象,我们先不讨论。我们先来分析 AbstractChannelHandlerDelegate 继承体系。
AbstractChannelHandlerDelegate 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate { protected ChannelHandler handler; protected AbstractChannelHandlerDelegate (ChannelHandler handler) { Assert.notNull(handler, "handler == null" ); this .handler = handler; } @Override public ChannelHandler getHandler () { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } return handler; } @Override public void connected (Channel channel) throws RemotingException { handler.connected(channel); } @Override public void disconnected (Channel channel) throws RemotingException { handler.disconnected(channel); } @Override public void sent (Channel channel, Object message) throws RemotingException { handler.sent(channel, message); } @Override public void received (Channel channel, Object message) throws RemotingException { handler.received(channel, message); } @Override public void caught (Channel channel, Throwable exception) throws RemotingException { handler.caught(channel, exception); } }
实现 ChannelHandlerDelegate 接口,在每个实现的方法里都是直接调用被装饰的 ChannelHandler 对象对应的方法,没有其它逻辑。它的三个子类都是在被装饰的 ChannelHandler 的基础上添加了一些增强的功能,使用的是装饰者模式。因为 HeartbeatHandler 属于 Exchange 层的 ChannelHandler ,在分析 Exchange 层时再进行分析,这里不再展开说明。
DecodeHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class DecodeHandler extends AbstractChannelHandlerDelegate { private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class ) ; public DecodeHandler (ChannelHandler handler) { super (handler); } @Override public void received (Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); } private void decode (Object message) { if (message != null && message instanceof Decodeable) { try { ((Decodeable) message).decode(); if (log.isDebugEnabled()) { log.debug("Decode decodeable message " + message.getClass().getName()); } } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } } } }
DecodeHandler 是一个解码处理器,专门用于处理 Decodeable 类型消息的 ChannelHandler实现类 ,因此该实现类只重写了 received() 接收消息的方法,它的作用和含义如下:
请求解码可在IO线程上执行,也可在线程池中执行,取决于配置。DecodeHandler 存在的意义就是保证请求体或响应体可在线程池中被解码。
在Codec2解码器实现中,如果请求体和响应结果需要在线程池中进行解码,那么就不进行直接解码,而是把解码任务最终交给线程池来处理,最后由 DecodeHandler来处理,因为 DecodeHandler 也参与了对上层 ChannelHandler 的包装。
实现了 Decodeable 接口的类都会提供了一个 decode() 方法实现对自身的解码,DecodeHandler.received() 方法就是通过该方法得到解码后的消息,然后传递给底层的 ChannelHandler 对象继续处理。
MultiMessageHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class MultiMessageHandler extends AbstractChannelHandlerDelegate { public MultiMessageHandler (ChannelHandler handler) { super (handler); } @SuppressWarnings ("unchecked" ) @Override public void received (Channel channel, Object message) throws RemotingException { if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } } }
MultiMessageHandler 是专门处理 MultiMessage 类型消息 的 ChannelHandler 实现类。MultiMessage 是 Exchange 层的一种消息类型,它其中封装了多个消息。在 MultiMessageHandler 收到 MultiMessage 消息的时候,received() 方法会遍历其中的所有消息,并交给底层的 ChannelHandler 对象进行处理。
至此,Transport 层的 AbstractChannelHandlerDelegate 继承体系分析完毕。下面我们继续看 ChannelHandlerDelegate 的另一条继承体系分支。
WrappedChannelHandler WrappedChannelHandler 也实现了 ChannelHandlerDelegate 接口,也是对其它 ChannelHandler 装饰的类。WrappedChannelHandler 在 ChannelHandler 接口方法实现上和 AbstractChannelHandlerDelegate 基本一致,那为什么又要搞一个新的继承体系而不是直接继承 AbstractChannelHandlerDelegate 呢?因为 WrappedChannelHandler 继承体系不仅是对其它 ChannelHandler 的装饰而且还决定了 Dubbo 的线程模型 ,有关 Dubbo 中的线程池会单独分析,这里先不展开说明。WrappedChannelHandler 关联体系如下图所示:
从上图可知,每个 WrappedChannelHandler 的子类都有一个对应的 Dispatcher 实现类,这些实现类就是用来创建 WrappedChannelHandler 的子类们。 Dispatcher 接口已经在 远程通信模块总览 中已经介绍过,它主要支持了 Dubbo 的线程模型,通过它的实现类可以创建不同的 ChannelHandler 来决定消息是交给线程池处理还是IO线程处理。
WrappedChannelHandler 实现了 ChannelHandlerDelegate 接口,其子类实现了消息派发功能,即决定了 Dubbo 以哪种线程模型处理收到的事件和消息。每个子类都由对应的Dispatcher 实现类创建。
属性
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class WrappedChannelHandler implements ChannelHandlerDelegate { protected static final Logger logger = LoggerFactory.getLogger(WrappedChannelHandler.class ) ; protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler" , true )); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url; }
WrappedChannelHandler 中有四个核心的属性,因为是对 ChannelHandler 的装饰,因此 ChannelHandler 是必须的。需要说明的是共享线程池和当前端点关联的线程池,共享线程池对每个子类公用,当前端点关联的线程池属于每个子类对象独有,它是在构造方法中初始化的。
构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 --- WrappedChannelHandler public WrappedChannelHandler (ChannelHandler handler, URL url) { this .handler = handler; this .url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class ).getAdaptiveExtension ().getExecutor (url ) ; String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class ).getDefaultExtension () ; dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }
通过 WrappedChannelHandler 的构造方法可知,每个子类对象都会创建一个线程池并添加到 DataStore 缓存起来,我们上面介绍的 AbstractClient 和 AbstractServer 是从 DataStore 获得线程池的,而数据来源正是这里。关于线程池的介绍,会在后面的文章中详细分析,这里先不展开说明。
DataSource 核心就是一个 Map 结构缓存,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class SimpleDataStore implements DataStore { private ConcurrentMap<String, ConcurrentMap<String, Object>> data = new ConcurrentHashMap<String, ConcurrentMap<String, Object>>(); @Override public Map<String, Object> get (String componentName) { ConcurrentMap<String, Object> value = data.get(componentName); if (value == null ) return new HashMap<String, Object>(); return new HashMap<String, Object>(value); } @Override public Object get (String componentName, String key) { if (!data.containsKey(componentName)) { return null ; } return data.get(componentName).get(key); } @Override public void put (String componentName, String key, Object value) { Map<String, Object> componentData = data.get(componentName); if (null == componentData) { data.putIfAbsent(componentName, new ConcurrentHashMap<String, Object>()); componentData = data.get(componentName); } componentData.put(key, value); } @Override public void remove (String componentName, String key) { if (!data.containsKey(componentName)) { return ; } data.get(componentName).remove(key); } }
获取线程池
获取线程池,供子类使用调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 --- WrappedChannelHandler public ExecutorService getExecutorService () { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } return cexecutor; }
WrappedChannelHandler 实现 ChannelHandler 接口的方法都是直接调用装饰的 ChannelHandler 对应的方法,就不再进行分析。
线程模型 如果事件处理的逻辑能迅速完成,并且不会发起新的 IO 请求,比如只是在内存中记个标识,则直接在 IO 线程上处理更快,因为减少了线程池调度。但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,比如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。因此,需要通过不同的派发策略和不同的线程池配置的组合来应对不同的场景:
1 <dubbo:protocol name ="dubbo" dispatcher ="all" threadpool ="fixed" threads ="100" />
Dubbo 的线程模型需要具有线程派发能力的 ChannelHandler 和 定制化的线程池来支撑。Dispatcher 的职责就是用来创建具有线程派发能力的 ChannelHandler,其本身并不具备线程派发能力。关于 Dispatcher 在 远程通信模块总览 中已经介绍,这里不再重复说明。
Dispatcher 派发策略:
all: 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等。
direct: 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
message: 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
execution: 只有请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
connection: 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其实是交给专门处理连接事件的线程池处理的。其它消息派发到线程池。
关于线程池部分在后面的文章中详细说明,先不在这里展开介绍。
Dispatcher 实现类用来创建 WrappedChannelHandler 的子类对象,每个子类对象代表不同的派发策略,同时子类对象在创建的时候会初始化一个线程池。下面我们来分析 Dispatcher 扩展实现和对应的 WrappedChannelHandler 的子类。
AllDispatcher & AllChannelHandler AllDispatcher 用来创建 AllChannelHandler 对象,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class AllDispatcher implements Dispatcher { public static final String NAME = "all" ; @Override public ChannelHandler dispatch (ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
AllChannelHandler 实现 WrappedChannelHandler 抽象类,所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,心跳等,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler (ChannelHandler handler, URL url) { super (handler, url); } @Override public void connected (Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event" , channel, getClass() + " error when process connected event ." , t); } } @Override public void disconnected (Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event" , channel, getClass() + " error when process disconnected event ." , t); } } @Override public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return ; } } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } } @Override public void caught (Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event" , channel, getClass() + " error when process caught event ." , t); } } }
AllChannelHandler 重写了 WrappedChannelHandler 中除了发送消息的 sent() 方法之外的其它方法,执行底层的 ChannelHandler 的逻辑都交给线程池处理,请求执行完毕后发送消息 AllChannelHandler 会直接在 IO 线程中进行处理。
ExecutionDispatcher & ExecutionChannelHandler ExecutionDispatcher 用来创建 ExecutionChannelHandler 对象,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution" ; @Override public ChannelHandler dispatch (ChannelHandler handler, URL url) { return new ExecutionChannelHandler(handler, url); } }
ExecutionChannelHandler 实现 WrappedChannelHandler 抽象类,只会将请求消息派发到线程池进行处理。对于响应消息以及其他网络事件(例如,连接建立事件、连接断开事件、心跳消息等),ExecutionChannelHandler 会直接在 IO 线程中进行处理,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler (ChannelHandler handler, URL url) { super (handler, url); } @Override public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); if (message instanceof Request) { try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return ; } } throw new ExecutionException(message, channel, getClass() + " error when process received event." , t); } } else { handler.received(channel, message); } } }
由上面代码可知,ExecutionChannelHandler 只重写了 received() 方法并且只处理请求消息,其它方法的调用直接调用父类的,是直接在 IO 线程中进行处理。
DirectDispatcher & DirectChannelHandler direct 类型,所有消息都不派发到线程池,全部在 IO 线程上直接执行,相关代码如下:
1 2 3 4 5 6 7 8 public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct" ; @Override public ChannelHandler dispatch (ChannelHandler handler, URL url) { return handler; } }
MessageOnlyDispatcher & MessageOnlyChannelHandler MessageOnlyDispatcher 用来创建 MessageOnlyChannelHandler 对象,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class MessageOnlyDispatcher implements Dispatcher { public static final String NAME = "message" ; @Override public ChannelHandler dispatch (ChannelHandler handler, URL url) { return new MessageOnlyChannelHandler(handler, url); } }
MessageOnlyChannelHandler 实现 WrappedChannelHandler 抽象类,会将所有收到的消息(请求/响应)提交到线程池处理,其他网络事件(连接断开事件,心跳等消息)则是由 IO 线程直接处理,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler (ChannelHandler handler, URL url) { super (handler, url); } @Override public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } } }
由上面代码可知,ExecutionChannelHandler 只重写了 received() 方法,其它方法的调用是直接调用父类的方法,直接在 IO 线程中进行处理。
ConnectionOrderedDispatcher & ConnectionOrderedChannelHandler ConnectionOrderedDispatcher 用来创建 ConnectionOrderedChannelHandler 对象,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class ConnectionOrderedDispatcher implements Dispatcher { public static final String NAME = "connection" ; @Override public ChannelHandler dispatch (ChannelHandler handler, URL url) { return new ConnectionOrderedChannelHandler(handler, url); } }
ConnectionOrderedChannelHandler 实现 WrappedChannelHandler 抽象类,会将收到的消息交给线程池进行处理,对于连接建立以及断开事件是通过 IO 线程将连接、断开事件交给 connectionExecutor 线程池排队处理的,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit; public ConnectionOrderedChannelHandler (ChannelHandler handler, URL url) { super (handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); connectionExecutor = new ThreadPoolExecutor(1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true ), new AbortPolicyWithReport(threadName, url) ); queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } @Override public void connected (Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event" , channel, getClass() + " error when process connected event ." , t); } } @Override public void disconnected (Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event" , channel, getClass() + " error when process disconnected event ." , t); } } @Override public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return ; } } throw new ExecutionException(message, channel, getClass() + " error when process received event ." , t); } } @Override public void caught (Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event" , channel, getClass() + " error when process caught event ." , t); } } private void checkQueueLength () { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); } } }
和 AllChannelHandler 一样,发送消息由 ConnectionOrderedChannelHandler 直接在 IO 线程中进行处理,区别在于后者的连接建立、断开事件不是通过父类中创建的线程池处理,而是创建了一个排队线程池。之所以叫它排队线程池,是该线程池只有一个线程,并且使用的阻塞队列是有序的。
ChannelEventRunnable 线程派发任务体 实现Runnable接口,该任务体被不同的线程派发机制使用。
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class ChannelEventRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ChannelEventRunnable.class ) ; private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Object message; private final Throwable exception; public ChannelEventRunnable (Channel channel, ChannelHandler handler, ChannelState state) { this (channel, handler, state, null ); } public ChannelEventRunnable (Channel channel, ChannelHandler handler, ChannelState state, Object message) { this (channel, handler, state, message, null ); } public ChannelEventRunnable (Channel channel, ChannelHandler handler, ChannelState state, Throwable t) { this (channel, handler, state, null , t); } public ChannelEventRunnable (Channel channel, ChannelHandler handler, ChannelState state, Object message, Throwable exception) { this .channel = channel; this .handler = handler; this .state = state; this .message = message; this .exception = exception; } }
ChannelEventRunnable 中的属性都是由线程派发相关的 ChannelHandler 传入的,不同的派发策略传入的属性不同,通过不同的构造方法也可以看出。
任务体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public class ChannelEventRunnable implements Runnable { @Override public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break ; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break ; default : logger.warn("unknown state: " + state + ", message is " + message); } } } public enum ChannelState { CONNECTED, DISCONNECTED, SENT, RECEIVED, CAUGHT } }
该任务体功能和作用如下:
1 请求和响应消息出现频率比其他类型消息高,因此这里对消息类型进行了针对性判断,便于提前处理。 2 ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,只是判断对应的通道状态,然后将参数传给装饰的 ChannelHandler 对象进行针对性处理。
至此,ChannelHandlerDelegate 的另一条继承体系分析完毕,Transport 层的主要 ChannelHandler 分析到此结束。
ChannelHandlers ChannelHandler 的工具类,主要是对传入的 ChannelHandler 进行层层包装,具体怎么包装的我们看下面的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class ChannelHandlers { private static ChannelHandlers INSTANCE = new ChannelHandlers(); protected ChannelHandlers () { } public static ChannelHandler wrap (ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected static ChannelHandlers getInstance () { return INSTANCE; } protected ChannelHandler wrapInternal (ChannelHandler handler, URL url) { return new MultiMessageHandler( new HeartbeatHandler( ExtensionLoader.getExtensionLoader(Dispatcher.class ) .getAdaptiveExtension () .dispatch (handler , url ) // 返回的是一个 ChannelHandlerDelegate 类型的对象,默认是 AllChannelHandler ,确定了具体的线程模型 ) ) ; } }
很容易发现,包装器其实就是前文介绍的 ChannelHandlerDelegate 类型的 ChannelHandler。该包装逻辑无论在 Client 端还是 Server 端都会使用 ,也就意味着上层传入的 ChannelHandler 会增加很多的逻辑,即支持多消息处理、心跳处理以及支持 Dubbo 线程模型机制。我们在下一篇文章中还会再次介绍,这里先以 netty4 实现的网络通信简单说明。
NettyServer1 2 3 4 5 6 public class NettyServer extends AbstractServer implements Server { public NettyServer (URL url, ChannelHandler handler) throws RemotingException { super (url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } }
NettyClient1 2 3 4 5 6 public class NettyClient extends AbstractClient { public NettyClient (final URL url, final ChannelHandler handler) throws RemotingException { super (url, wrapChannelHandler(url, handler)); } }
编解码 关于 Codec2 扩展接口已经在 远程通信模块总览 中进行了介绍,下面介绍在 Transport 层相关的实现和扩展。
编解码工具类 CodecSupport 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 public class CodecSupport { private static final Logger logger = LoggerFactory.getLogger(CodecSupport.class ) ; private static Map<Byte, Serialization> ID_SERIALIZATION_MAP = new HashMap<Byte, Serialization>(); private static Map<Byte, String> ID_SERIALIZATIONNAME_MAP = new HashMap<Byte, String>(); static { Set<String> supportedExtensions = ExtensionLoader.getExtensionLoader(Serialization.class ).getSupportedExtensions () ; for (String name : supportedExtensions) { Serialization serialization = ExtensionLoader.getExtensionLoader(Serialization.class ).getExtension (name ) ; byte idByte = serialization.getContentTypeId(); if (ID_SERIALIZATION_MAP.containsKey(idByte)) { logger.error("Serialization extension " + serialization.getClass().getName() + " has duplicate id to Serialization extension " + ID_SERIALIZATION_MAP.get(idByte).getClass().getName() + ", ignore this Serialization extension" ); continue ; } ID_SERIALIZATION_MAP.put(idByte, serialization); ID_SERIALIZATIONNAME_MAP.put(idByte, name); } } private CodecSupport () { } public static Serialization getSerializationById (Byte id) { return ID_SERIALIZATION_MAP.get(id); } public static Serialization getSerialization (URL url) { return ExtensionLoader.getExtensionLoader(Serialization.class ).getExtension (url .getParameter (Constants .SERIALIZATION_KEY , Constants .DEFAULT_REMOTING_SERIALIZATION )) ; } public static Serialization getSerialization (URL url, Byte id) throws IOException { Serialization serialization = getSerializationById(id); String serializationName = url.getParameter(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); if (serialization == null || ((id == 3 || id == 7 || id == 4 ) && !(serializationName.equals(ID_SERIALIZATIONNAME_MAP.get(id))))) { throw new IOException("Unexpected serialization id:" + id + " received from network, please check if the peer send the right id." ); } return serialization; } public static ObjectInput deserialize (URL url, InputStream is, byte proto) throws IOException { Serialization s = getSerialization(url, proto); return s.deserialize(url, is); } }
上面代码已经详细注释,整个逻辑分为两点,Dubbo 应用启动时缓存序列化并提供获取序列化的方法。 关于序列化在之前的文章中已经详细介绍过,这里就不再重复说明。
编解码适配器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class CodecAdapter implements Codec2 { private Codec codec; public CodecAdapter (Codec codec) { Assert.notNull(codec, "codec == null" ); this .codec = codec; } @Override public void encode (Channel channel, ChannelBuffer buffer, Object message) throws IOException { UnsafeByteArrayOutputStream os = new UnsafeByteArrayOutputStream(1024 ); codec.encode(channel, os, message); buffer.writeBytes(os.toByteArray()); } @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { byte [] bytes = new byte [buffer.readableBytes()]; int savedReaderIndex = buffer.readerIndex(); buffer.readBytes(bytes); UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream(bytes); Object result = codec.decode(channel, is); buffer.readerIndex(savedReaderIndex + is.position()); return result == Codec.NEED_MORE_INPUT ? DecodeResult.NEED_MORE_INPUT : result; } public Codec getCodec () { return codec; } }
CodecAdapter 使用对象适配模式 完成对 Codec 类型的适配工作,即将 Codec 适配成 Codec2 。关于适配器模式可以参考 适配器模式 。
编解码继承关系 编解码 Codec2 的继承关系如下图所示:
继承关系中包含了各层的编解码实现,本篇文章只介绍 Transport 层相关的实现,其它层相关的实现会在对应的层进行介绍。需要注意的是,Exchange 层的编解码实现依赖了 Transport 层的编解码实现,Protocol 层又依赖了 Exchange 层的编解码实现。可以发现,编解码器的实现通过继承的方式以获得更多的功能,每个编码器实现类编解码消息的逻辑都不一样。
AbstractCodec 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public abstract class AbstractCodec implements Codec2 { private static final Logger logger = LoggerFactory.getLogger(AbstractCodec.class ) ; protected static void checkPayload (Channel channel, long size) throws IOException { int payload = Constants.DEFAULT_PAYLOAD; if (channel != null && channel.getUrl() != null ) { payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD); } if (payload > 0 && size > payload) { ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel); logger.error(e); throw e; } } protected Serialization getSerialization (Channel channel) { return CodecSupport.getSerialization(channel.getUrl()); } protected boolean isClientSide (Channel channel) { String side = (String) channel.getAttribute(Constants.SIDE_KEY); if ("client" .equals(side)) { return true ; } else if ("server" .equals(side)) { return false ; } else { InetSocketAddress address = channel.getRemoteAddress(); URL url = channel.getUrl(); boolean client = url.getPort() == address.getPort() && NetUtils.filterLocalHost(url.getIp()).equals( NetUtils.filterLocalHost(address.getAddress() .getHostAddress())); channel.setAttribute(Constants.SIDE_KEY, client ? "client" : "server" ); return client; } } protected boolean isServerSide (Channel channel) { return !isClientSide(channel); } }
是 Codec2 的抽象实现,提供了公用的一些方法,如校验消息长度是否超过阈值,根据URL获取 Serialization 扩展实现,判断当前通道属于客户端侧还是服务端侧。
TransportCodec TransportCodec 的逻辑简单、粗暴,使用 Serialize 对所有消息直接序列化或者反序列化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class TransportCodec extends AbstractCodec { @Override public void encode (Channel channel, ChannelBuffer buffer, Object message) throws IOException { OutputStream output = new ChannelBufferOutputStream(buffer); ObjectOutput objectOutput = getSerialization(channel).serialize(channel.getUrl(), output); encodeData(channel, objectOutput, message); objectOutput.flushBuffer(); if (objectOutput instanceof Cleanable) { ((Cleanable) objectOutput).cleanup(); } } @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { InputStream input = new ChannelBufferInputStream(buffer); ObjectInput objectInput = getSerialization(channel).deserialize(channel.getUrl(), input); Object object = decodeData(channel, objectInput); if (objectInput instanceof Cleanable) { ((Cleanable) objectInput).cleanup(); } return object; } protected void encodeData (Channel channel, ObjectOutput output, Object message) throws IOException { encodeData(output, message); } protected Object decodeData (Channel channel, ObjectInput input) throws IOException { return decodeData(input); } protected void encodeData (ObjectOutput output, Object message) throws IOException { output.writeObject(message); } protected Object decodeData (ObjectInput input) throws IOException { try { return input.readObject(); } catch (ClassNotFoundException e) { throw new IOException("ClassNotFoundException: " + StringUtils.toString(e)); } } }
小结 本篇文章简单介绍了 Transport 层及其必要性,然后从端点抽象类 AbstractPeer、AbstractEndpoint,语义端点抽象类 AbstractServer、AbstractClient,抽象通道 AbstractChannel 以及 通道关联的 ChannelHandler 多方面介绍了 Transport 层的实现,最后介绍了编解码的继承体系。不难发现,作为底层的 Transport,支持了消息/事件发送、处理、响应以及编解码,涉及的接口和类在功能层面上已经是一个闭环了。后面两篇文章会对本篇文章的抽象进行具体化。