深入浅出netty - channelPipeline

这里将分析 netty 的 channelPipeline

从上一章可以发现, netty 的每个eventloop 都绑着一个 channelPipeline

而在 netty 中 channelPipeline 的 默认实现是 DefaultChannelPipeline, 内部维护了一个 AbstractChannelHandlerContext 链表。

当 channel 完成 register、active、read 等操作时,会触发 pipeline 的相应方法。

我们拿下面的代码举例子。

1
2
3
4
5
6
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new EchoHandler());
}

对应的内部结构图

从上面可以看出几个点, 结合源码分析下

channelPipeline 初始化 AbstractChannelHandlerContext 链的时候默认添加了 head, tail 2个头尾节点

HeadContext 继承了 ChannelOutboundHandlerChannelInboundHandler 2 个 handler
TailContext 仅继承了 ChannelInboundHandler 1 个 handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class DefaultChannelPipeline implements ChannelPipeline {
//head和tail都是handler上下文
final DefaultChannelHandlerContext head;
final DefaultChannelHandlerContext tail;

final Channel channel; // pipeline所属的channel

// ...

public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {

// ...
}

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
// ...
}
}

对于 pipeline.fireChannelRead 这种 fire 开头的API 的请求从 HeadContext 开始找链的next Handler 且继承了 ChannelInboundHandler 的 Handler

inboundhandler 里面传递事件的方式是不断的传递给链条的 next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// ========= in DefaultChannelPipeline =========

// 注意 下面的 head 都是 HeadContext
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}

@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}

@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this;
}

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}

@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this;
}

// ========= in AbstractChannelHandlerContext =========

// 这里的next 就是上面的 head
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
// 保证 在 eventloop 线程内
if (executor.inEventLoop()) {
// 调用下面的方法
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}

private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 这里调用了 head.channelRead() , 所以 所有 fire 开头的方法 都是从 head 进来的
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 寻找下一个 继承了 inboundHandler 的 handler
invokeChannelRead(findContextInbound(), msg);
return this;
}

private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
// inbound 不断找链的 next
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}

相反对于 pipeline.write (还有 bind, connect, flush 这种 非fire 开头) 的请求相反从 TailContext 开始找

outboundhandler 里面传递事件的方式是不断的传递给链条的 next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}

// ========= in AbstractChannelHandlerContext =========
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
// outbound 不断找链的 prev
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}

在 handler 中直接 ctx.xxxctx.pipeline.xxx 更快

因为不用从 链头开始遍历, 路径更短,更快

avatar

lelouchcr's blog