深入浅出netty - EventLoop, EventLoopGroup

近年来netty 十分流行,至于为何那么流行,因为它性能好,能更有效的利用系统资源。

但是, netty 很容易上手, 但是很多时候却无法理解设计的初衷, 比如为什么会有 EventLoop 这个东东, 本文从历史阐述原因。

同步网络模型

首先说说同步模型, 也就是常说的 bio, bio 是基于 stream 的。 网上找到一个图如下

从图可知,对于bio 来说,我的程序何时处理完需要靠 看 datasource 何时发完数据。

这是非常坑爹的, 举个例子, 如果我的服务是一个多线程的 bio 的服务(比如 10个线程),然后某个黑客建10个 telnet 连上你的server, holding住 不发数据,那么你的服务就down了。

那么如何解决这种问题? 如果 waiting client 发的数据不阻塞 worker 线程不就行了?

于是出现了 异步网络模型

异步网络模型

异步网络模型, 也就是非阻塞IO, 在java 中由于 jdk1.4 之后才被引入所以被称之为 Nio (New IO)

网上同样找到个图

由图可知, 实现异步的原理是 系统帮你给每一个连接(channel) 都维护了一个 buffer
将 client 发来的数据暂存到 buffer 中, 待 java 进程需要的时候一次性做一次内存拷贝,在 java 进程中使用。

当然, 写出也是同样的 写到 buffer 中,当 buffer 满 或者 手动调用 flush 写出去。

下面有个简单的 非阻塞 echo server demo, 当读到 q 时关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

final List<SocketChannel> socketChannelList = Lists.newLinkedList();

new Thread(new Runnable() {
@Override
public void run() {
while (true) {
// 处理 每个连接是否可读, 这里的逻辑是 读4个字节后切断连接
for (SocketChannel socketChannel : Lists.newArrayList(socketChannelList)) {
try {
ByteBuffer buf = ByteBuffer.allocate(4);
int readed = socketChannel.read(buf);
System.out.println(readed);
System.out.println(Arrays.toString(buf.array()));
if (readed > 0 && buf.array()[0] == 'q') {
// close
socketChannel.close();
// remove from list
socketChannelList.remove(socketChannel);
}
} catch (Throwable e) {
e.printStackTrace();
}
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}, "server-handler-thread").start();

// 等待新连接连进来
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannelList.add(socketChannel);
}
Thread.sleep(1000);
}

简单分析下上面的代码,

  1. 由于 accept 是非阻塞的, 所以我们要一直轮询 判断是否有新连接进来。
  2. 连接进来后,由于我们不知道 client 何时能发完数据, 所以我们维护了一个列表 socketChannelList 定期去轮询列表,看是否有数据可读。

但是后来大家发现, 每当用 nonblocking socket 的时候都要做类似的操作, 于是想把它们提出来,提取成一个通用模块, 让代码实现更容易,

berkeley socket 中这个模块被称为 select, poll, epoll. (后面2个是加强版)

而在java中, java 在他们之上抽象成 Selector

Selector(select, poll, epoll)

既然 select 是一个使用 nonblocking io 的通用封装, 那么在 linux 中, 为什么要有 select, poll, epoll 这么多的选择呢?

我猜测,最早其实只有 select, 然后由于linux 是一个开源, 所以, 后来别人实现了性能更好的 poll, 再之后 epoll 也是这样。

但是, 由于它们都在 linux 2.6.12 之前就已经被实现了, 在github上 都看不到具体提交历史了。。

select.c,
poll.h,
eventpoll.h

我们再来看看 使用了 selector 后的非阻塞 IO 的编码, 同样实现一个 echo server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
Selector selector = Selector.open();
ServerSocketChannel servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
// 建立一个server socket,到本地端口9999, backlog 1024
servChannel.socket().setReuseAddress(true);
servChannel.socket().bind(new InetSocketAddress(9999), 1024);
// selector 关心 server 上的 ACCEPT 事件
servChannel.register(selector, SelectionKey.OP_ACCEPT);

while (start) {
try {
// 阻塞等待 直到有IO事件可读(系统IO事件队列不为空)
selector.select();
// 获取 事件 以及 事件所对应的 channel (client server 的连接)
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
if (key.isValid()) {
// OP_ACCEPT 事件 表示有个新client 完成了三次握手。连接上了本服务器
if (key.isAcceptable()) {
// Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 将该连接的可读事件 注册到 selector, 到时候他发起请求的时候,我会收到新事件
sc.register(selector, SelectionKey.OP_READ);
}
// OP_READ 事件 说明 client 发的数据已经发到了系统缓冲区,server 可以去读了。
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
// 分配用户台空间, 将数据从内核态 拷贝到 用户态
ByteBuffer readBuffer = ByteBuffer.allocate(4);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
// 切换读写模式 详见下面的图, 表示自己目前可以读 [position, limit]
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
// 将buffer 数据拷贝到 bytes 数组
// 如果这里只收到一半的数据怎么办?
String body = new String(bytes, "UTF-8");
System.out.println(body);
// 将 read的数据 写回去
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
;
}
}
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
throw e;
}
}
1
2
3
4
5
~ ᐅ nc localhost 9999
1
1
2
2

但是, 很奇怪的是,代码更长了。。。

因为其实真正的 selector 将所有的socket 的时间都封装进去了。

带来的结果是, java 层的代码 只要一个线程就能处理所有了,(我之前的实现需要2个线程)。

当然还有个问题是, 及时用了 selector, 我们依然认为代码可读性非常差, 代码容易写错。

于是, netty 登场了, 他在 selector 之上, 又封装了一层, 让我们先看下 同样实现 netty 的代码。

netty echo server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class Netty4Demo {
public class EchoHandler extends SimpleChannelInboundHandler {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 为什么 这里可以强转String?
String in = (String) msg;
System.out.print(in);
// 将数据写回
ctx.writeAndFlush(in);
}
}

public void run() throws Exception {
EventLoopGroup acceptGroup = new NioEventLoopGroup(1); // 指定 Acceptor 线程池大小
EventLoopGroup ioGroup = new NioEventLoopGroup(1); // 指定 NIO线程池大小
try {
ServerBootstrap b = new ServerBootstrap(); // 创建 ServerBootstrap 对象,他是Netty 用于启动NIO 服务端的辅助启动类,目的是降低服务端的开发复杂度。
b.group(acceptGroup, ioGrou).channel(
NioServerSocketChannel.class) // 指定使用 java 的NioServerSocketChannel
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建 IOThread 的 pipeline
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new StringDecoder())
.addLast(new StringEncoder()); // 添加echo Handler
.addLast(new EchoHandler())
}
}).option(ChannelOption.SO_BACKLOG, 128) // server socket config backlog 设置为 128
.childOption(ChannelOption.SO_KEEPALIVE, true); // client socket config 设置 keepalive = true
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(9999).sync(); // 同步等待绑定本地端口

// 等待服务器 socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
// 释放两个线程池
acceptGroup.shutdownGracefully();
ioGrou.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
new Netty4Demo().run();
}
}
1
2
3
4
5
6
7
~ ᐅ nc localhost 9999
1
1
2
2
3
3

代码短了一些, 关键是少了很多 if 判断, 代码可读性更好了。

但是, 可读性提高(抽象)的同时, 又带出了了问题, EventLoop 是什么 , EventloopGroup 是什么? 为什么要有它们?

这里要解释以上的问题, 要先从 线程模型说起

线程模型

首先是 单线程模型, 如下图

上面直接用 selector 实现的 echo server 就是单线程模型,
还有类似存在 GIL 问题的语言比如 python 的 tornado 也是 单线程模型。

然后是多线程模型

多线程模型的区别就是 让接收连接 和 对连接读写的处理 分别用不同的线程处理。 比如上面的 echo server demo 就可以被称为一个 多线程模型。
只不过扩展性差了些。。。

netty 的 EventloopGroup 其实就是线程池, 通过它来配置 接收连接 和 处理连接读写 的线程池大小。

netty 之 EventloopGroup

上图就是 EventLoopGroup 的大致模块图。

Boss EventloopGroup 和 worker EventloopGroup 分别 处理 接收连接, 和读写。

而 Eventloop 里则封装了类似上面单线程的 echo server 的模块下面会详细说。

对于开发者来说,主要关心紫色圈出部分。 其余的都已经封装完毕

有几个关键点。

  1. Eventloopsize 建议设置为 2 的次方,dispatch 使用位移,更快。
  2. 侦听一个端口,只会绑定到 BossEventLoopGroup 中的一个 Eventloop,所以, BossEventLoopGroup 配置多个也无用。

EventLoop

如果只使用 tcp 和 异步阻塞的话主要关心以下2个 EventLoop (本文也只介绍这2个)

NioEventLoop - 基于java 原生nio

level-triggered (水平触发)

EpollEventLoop - native jni 直接调用 epoll, only work on linux

edge-triggered (边缘触发)更少的系统调用
C代码,更少GC,更少synchronized
暴露了更多的Socket配置参数

流程图

关键点

  1. 整个loop 干的事情就是 select -> processIO -> runAllTask
  2. 这是一个死循环
  3. 那么这个loop 如何自己优雅退出? noway,只能通过外部添加 CloseTask, 比如添加到 MpscQueue
  4. deadline 为 定时任务的触发时间,避免 select 阻塞, 让 定时任务不能及时执行。
  5. 在select 这一步 解决 epollbug

关于解决 epoll bug的原理是 应当 “阻塞”的 select 变得不再阻塞。
所以只需要统计下 select 次数就行了

部分关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for(;;){
int selectedKeys = selector.select(timeoutMillis); // select with timeout
selectCnt ++;
// 我由于 select 阻塞 而等待了 timeoutMillis 毫秒, 说明, 我阻塞了,说明没有bug
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 在小于 timeoutMillis 毫秒的时间内 select 的次数超过了 阀值(512) 次
rebuildSelector();
selector = this.selector;

selector.selectNow();// Select again
selectCnt = 1;
break;
}
}

other

Selector.wakeup()

java 的 Selector 在原生的 select api 之上 增加了个 Selector.wakeup()

她的目的是唤醒 阻塞在 select() 的线程, (通过写一个字节)

为什么要唤醒? 什么时候需要唤醒?

1
2
3
1. 注册了新的 channel 或者事件。
2. channel 关闭, 取消注册。
3. 优先级更高的事件触发(如定时器事件), 希望及时处理。

原理

Linux上利用pipe调用创建一个管道,Windows上则是一个loopback的tcp连接。这是因为win32的管道无法加入select的fd set,将管道或者TCP连接加入select fd set。

wakeup往管道或者连接写入一个字节,阻塞的select因为有I/O事件就绪,立即返回。可见,wakeup的调用开销不可忽视。

之前看到的 coolshell 也分析过 –> Java NIO类库Selector机制解析(

reference

http://www.infoq.com/cn/articles/netty-threading-model

http://calvin1978.blogcn.com/articles/netty-performance.html

http://calvin1978.blogcn.com/articles/netty-performance2.html

http://tech.meituan.com/nio.html

avatar

lelouchcr's blog