以下netty 版本为 4.1.x
最近项目的收官阶段,开始压测,然后发现效果没有想象的好,再用 gcutil
看内存,发现竟然有严重的内存泄露!
仔细研究了一下,发现是因为我的使用方式不对。
最初开发的时候,没有研究过netty的底层实现,但是看过 twitter发的对netty4 的 ByteBufPool 的测评 , 于是就在写代码的时候就想着尽量池化。
错误的方式 差不多是下面这样的代码.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Override protected void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { int available = in.readableBytes(); if (available > 4 ) { in.markReaderIndex(); final byte [] i32buf = new byte [4 ]; in.readBytes(i32buf); int totalSize = decodeFrameSize(i32buf); if (available < totalSize + 4 ) { in.resetReaderIndex(); } else { Constants.MessageType messageType = isXXProtocol(in); Message message = null ; switch (messageType) { case XX1: message = new Message(); message.setBody(ink.readRetainedSlice(totalSize)); out.add(message); break ; case xx2: break ; } } } } @Override protected void channelRead0 (final ChannelHandlerContext ctx, final Message request) throws Exception { threadPoolExecutor.execute(new Runnable() { try { Message response = messageHandler.handle(request); if (ctx.channel().isActive()) { ctx.writeAndFlush(response, ctx.voidPromise()); } } catch (Exception e) { } finally { assert request.getBody().refCnt() == 1 ; ReferenceCountUtil.release(request.getBody()); } } }
看起来代码没有问题,但是事实上由于netty4 的cache特性,造成了内存泄露。
netty4 有个 Thread-local object pool
netty 在很多地方都用它来做cache
用来做 cache。做什么cache呢, 用来做ByteBufPool 的cache,也就是说每当有请求server 的时候,先从Threadlocal 找之前cache 过的Bytebuf,
当使用 worker 的optional 是 PooledByteBufAllocator 时 netty 处理请求时 的内存分配流程是这样的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 +-----------------+ +-----------------+ |IO thread for write |IO thread for read 1. Inbound 字节流存入ByteBuf中。 3. 将encode好的Bytebuf | | | | Bytebuf 优先从Threadlocal获取, 写出到socket,回收Bytebuf| ^ | | + | ThreadLocal没有,则再从pool中获取。 到 ThreadLocal | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | + | | v | | | | | | +----------------------+ | +-------------------+ | | | | | | | heap or | +---------> | thread local cache | | | direct | | | | | | | | | +-------+---------+--+---+-----+-------+ 2.一般来说在business handler 中将解码的数据 | ByteBuf pool | | | | 放到自己的业务线程池,防止由于阻塞降低IO thread | | <----------------------------+ | 的吞吐 | | | +------------------+ | +------------------------+ +-------------------+ | | | | | business thread | | |decoder | | | +--------------------+ | | | | | | | | | | | | | | +--------------------+ | | |business handler+------------> | | | | | | | +--------------------+ | | | | | | | | | | |encoder | | | +--------------------+ | | | | | | | | | | | | ....... | | +------------------+ | | | | pipeline is also in IO thread | | | +--------------------------------------+ +------------------------+
所以,如果我将 1 中 获得的Bytebuf 在2 中的业务线程释放,于是 ByteBuf.release
的流程将会把Bytebuf 存入到当前的Threadlocal。(这个Threadlocal 无法释放)
于是内存泄漏了。
那么问题来了,既然netty 已经支持了 BytebufPool 为何还要用 Threadlocal 做Cache呢?
这里有个官方的 benchmark 解释了主要是因为 directByteBuf 的获取比较慢。。也就是说,这个优化主要是针对 DirectBytebufPool的
总结来说,如果要使用 PooledByteBuf,一定要注意 allocate 和 release 是同一个线程
但是这里要注意,我们的IOThread 可以配置 >1
的, 因为有时我们希望我们的 encoder 和 decoder 能过利用多个核, 这里要注意,在你使用多个线程的时候,你的 ThreadLocal 也变成了多个, 这些Threadlocal 都会进入老年代,并且永远不会被释放,需要压测看看,配置合理的内存大小,避免由于内存过小,造成频繁full gc
对了,这个cache的特性还能够关闭 具体代码在 io.netty.util.Recycler
static 代码块, 可以通过 System.setProperty("io.netty.recycler.maxCapacity", "0");
或以 java param 的方式 -Dio.netty.recycler.maxCapacity=0
关闭
还有PooledByteBuf 的cahce是这样子的cache,他cache的是自己的是自己的对象,所以,这个cache 也继承了整个netty 的类似jmelloc 的内存分页方式, 并且他的大小是和Pooled Bytebuf 大小是一样的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override protected PooledByteBuf<ByteBuffer> newByteBuf (int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } } static PooledUnsafeDirectByteBuf newInstance (int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.setRefCnt(1 ); buf.maxCapacity(maxCapacity); return buf; } static PooledHeapByteBuf newInstance (int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); buf.setRefCnt(1 ); buf.maxCapacity(maxCapacity); return buf; }
所以如果你用的是 PooledHeapByteBuf 如果有 2个 IOThread 那么 cache 的总数是3。
所以,如果关心内存的使用情况, 了解netty内存池的分配非常重要,关于这个发现这篇文章写的很好
然后根据自己的业务情况,以及可能的线程数,预估 最大的内存耗费. 或者直接压测, 23333
还有,如果 业务线程也使用了PooledBytebuf 的话, 同理.
于是,知道了这个原因后, 为了验证下,于是在 decoder 里面用 UnpooledHeapBytebuf 做一次内存拷贝,然后试试看是否能修复。
代码如下
1 2 3 4 5 6 7 8 9 switch (messageType) { case xx1: message = new Message(); message.setBody(Unpooled.copiedBuffer(in)); out.add(message); break ; case xx2: break ; }
压测后发现,老年代依然会逐步增加,最终导致fullgc
objectDump 后发现大量的Recycler 和 WeakHashMap 对象。明明Unpooled 申请Bytebuf 的时候并没有和ThreadLocal相关,为何还会有Recycle呢?
后来发现是在 business Thread writeAndFlush 的时候, 会区分线程, 如果是在 IO 线程
writeAndFlush 则会直接调用,不然则是提交一个runnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void write (Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
提交任务后, 依旧是从 ThreadLocal 中获取 WriteAndFlushTask
,这里的 WriteAndFlushTask
被重用了。
这里可以肯定,这里的 WriteAndFlushTask
一定是会在IO线程释放, 如果和上面的 PooledByteBuf 比较,岂不是内存泄漏了?
我们继续看下去,看释放的时候做了些什么
1 2 3 4 5 6 7 8 9 10 11 12 13 private static void safeExecute (EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null ) { ReferenceCountUtil.release(msg); } } } }
这个 executor 其实就是 当初 调用 businessThread 的 ChannelHandlerContext
这里插播一句, ChannelHandlerContext
和 eventLoop
是一对多的关系, 前者为一,后者为多
也就是说,这个runnable被 提交到 eventLoop 去了,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public void execute (Runnable task) { if (task == null ) { throw new NullPointerException("task" ); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
ps,这里的逻辑和我以前看tornado 的eventLoop 的 大逻辑差不多,但是 netty 多了不少细节(细节以后有时间再展开,这里就说说大体流程)
通过上面 addTask 将 上面的 WriteAndFlushTask
提交到一个taskQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 protected void addTask (Runnable task) { if (task == null ) { throw new NullPointerException("task" ); } if (!offerTask(task)) { reject(task); } } final boolean offerTask (Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
然后就是loop 环节,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 protected void run () { for (;;) { try { processSelectedKeys(); runAllTasks(); } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop." , t); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } } } }
再loop里面 执行 WriteAndFlushTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public final void run () { try { ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer(); if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null ) { buffer.decrementPendingOutboundBytes(size); } write(ctx, msg, promise); } finally { ctx = null ; msg = null ; promise = null ; handle.recycle(this ); } }
不过看到这里,我知道了大体流程是怎样的了,但是具体的跨线程是如何做到cache的还是没看到(有点跑偏),所以下面着重看Recycler 的get 和 recycle 的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue () { return new Stack<T>(Recycler.this , Thread.currentThread(), maxCapacity, maxSharedCapacityFactor); } }; public final T get () { if (maxCapacity == 0 ) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null ) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; } private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() { @Override protected Map<Stack<?>, WeakOrderQueue> initialValue() { return new WeakHashMap<Stack<?>, WeakOrderQueue>(); } }; public void recycle (Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle" ); } Thread thread = Thread.currentThread(); if (thread == stack.thread) { stack.push(this ); return ; } Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(stack); if (queue == null ) { queue = WeakOrderQueue.allocate(stack, thread); if (queue == null ) { return ; } delayedRecycled.put(stack, queue); } queue.add(this ); }
注意,这里的 queue.add
和 stack.pop
明明是2个对象,怎么就串联起来了呢
代码比较长,我直接画图表示下
也就是说 一个Thread 最多cache 256
个 WriteAndFlushTask
对象, 总数又 IO Thread
数量决定, 默认最多应该是 IOThreadCount * 256 个对象
超过这个数字的话都直接 new 一次性对象了。
所以这个是不会造成内存泄漏的,所以回到之前的问题,为什么用Unpool还会造成 老年代持续上升。
后来我将 selector 线程 和 IO 线程都改成 1个 然后,然后 print 出每次请求的id(每次请求id 自增),发现 id 并不同步,降低频率后可以同步,也就是说,
server 没有将buf 写出去,由于是在 businessThread 里写 response 是提交到 IOLoop 里面 的queue 的,然后 IOLoop 里面 是 给 读的IO 事件和别的 所有异步事件 各 50% 的时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 for (;;){ final int ioRatio = this .ioRatio; if (ioRatio == 100 ) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break ; } } }
这个Task 不仅仅是 WriteAndFlushTask 还有 CloseTask, registerTask, 等等返回xxxFuture 的基本都是靠他实现的。
虽然在我们的压测情况下,除了 WriteAndFlushTask, 别的可以忽略不计,还有就是 如果写的太快,造成task 积压过多。
所以,在一个异步的系统里面 设置 waterMark 非常重要
事后,在看netty issue的时候,也有人遇到了同样的问题 https://github.com/netty/netty/issues/5563
reference http://blog.csdn.net/pentiumchen/article/details/45372625
http://www.cnblogs.com/rainy-shurun/p/5213086.html
http://redis.io/commands/eval#available-libraries
http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html