这里将分析 netty 的 channelPipeline
从上一章可以发现, netty 的每个eventloop 都绑着一个 channelPipeline
而在 netty 中 channelPipeline 的 默认实现是 DefaultChannelPipeline
, 内部维护了一个 AbstractChannelHandlerContext 链表。
当 channel 完成 register、active、read 等操作时,会触发 pipeline 的相应方法。
我们拿下面的代码举例子。
1 2 3 4 5 6 public void initChannel (SocketChannel ch) throws Exception { ch.pipeline() .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(new EchoHandler()); }
对应的内部结构图
从上面可以看出几个点, 结合源码分析下
channelPipeline 初始化 AbstractChannelHandlerContext 链的时候默认添加了 head, tail 2个头尾节点 HeadContext 继承了 ChannelOutboundHandler
和 ChannelInboundHandler
2 个 handler TailContext 仅继承了 ChannelInboundHandler
1 个 handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class DefaultChannelPipeline implements ChannelPipeline { final DefaultChannelHandlerContext head; final DefaultChannelHandlerContext tail; final Channel channel; public DefaultChannelPipeline (AbstractChannel channel) { if (channel == null ) { throw new NullPointerException("channel" ); } this .channel = channel; tail = new TailContext(this ); head = new HeadContext(this ); head.next = tail; tail.prev = head; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { } final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { } }
对于 pipeline.fireChannelRead
这种 fire
开头的API 的请求从 HeadContext 开始找链的next Handler 且继承了 ChannelInboundHandler 的 Handler inboundhandler 里面传递事件的方式是不断的传递给链条的 next
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 @Override public final ChannelPipeline fireChannelActive () { AbstractChannelHandlerContext.invokeChannelActive(head); return this ; } @Override public final ChannelPipeline fireChannelInactive () { AbstractChannelHandlerContext.invokeChannelInactive(head); return this ; } @Override public final ChannelPipeline fireExceptionCaught (Throwable cause) { AbstractChannelHandlerContext.invokeExceptionCaught(head, cause); return this ; } @Override public final ChannelPipeline fireUserEventTriggered (Object event) { AbstractChannelHandlerContext.invokeUserEventTriggered(head, event); return this ; } @Override public final ChannelPipeline fireChannelRead (Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this ; } @Override public final ChannelPipeline fireChannelReadComplete () { AbstractChannelHandlerContext.invokeChannelReadComplete(head); return this ; } @Override public final ChannelPipeline fireChannelWritabilityChanged () { AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head); return this ; } static void invokeChannelRead (final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg" ), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run () { next.invokeChannelRead(m); } }); } } private void invokeChannelRead (Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this , msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandlerContext fireChannelRead (final Object msg) { invokeChannelRead(findContextInbound(), msg); return this ; } private AbstractChannelHandlerContext findContextInbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
相反对于 pipeline.write
(还有 bind
, connect
, flush
这种 非fire
开头) 的请求相反从 TailContext 开始找 outboundhandler 里面传递事件的方式是不断的传递给链条的 next
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Override public final ChannelFuture write (Object msg) { return tail.write(msg); } @Override public final ChannelFuture bind (SocketAddress localAddress) { return tail.bind(localAddress); } @Override public final ChannelFuture connect (SocketAddress remoteAddress) { return tail.connect(remoteAddress); } private AbstractChannelHandlerContext findContextOutbound () { AbstractChannelHandlerContext ctx = this ; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
在 handler 中直接 ctx.xxx
比 ctx.pipeline.xxx
更快 因为不用从 链头开始遍历, 路径更短,更快