且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

从时延毛刺问题定位到 Netty 的性能统计设计(下)

更新时间:2022-08-17 13:52:40

  • 正确的消息发送速率性能统计策略:

 

正确的消息发送速率性能统计方法如下:

(1)调用writeAndFlush方法之后获取ChannelFuture

(2)新增消息发送ChannelFutureListener并注册到ChannelFuture中,监听消息发送结果,如果消息写入SocketChannel成功,则Netty会回调ChannelFutureListeneroperationComplete方法。

(3)在消息发送ChannelFutureListeneroperationComplete方法中进行性能统计。


正确的性能统计代码示例如下:

public voidchannelRead(ChannelHandlerContext ctx, Object msg) {

        int sendBytes =((ByteBuf)msg).readableBytes();

        ChannelFuture writeFuture =ctx.write(msg);

        writeFuture.addListener((f) ->

        {

           totalSendBytes.getAndAdd(sendBytes);

        });

        ctx.flush();

}


Netty消息发送相关源码进行分析,当发送的字节数大于0时,进行ByteBuf的清理工作,代码如下:

protected voiddoWrite(ChannelOutboundBuffer in) throws Exception {

    //代码省略...

     if (localWrittenBytes <= 0) {

                        incompleteWrite(true);

                        return;

                    }

                   adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes,maxBytesPerGatheringWrite);

                    in.removeBytes(localWrittenBytes);

                    --writeSpinCount;

                    break;

//代码省略...

}


接着分析ChannelOutboundBufferremoveBytes(long writtenBytes)方法,将发送的字节数与当前ByteBuf可读的字节数进行对比,判断当前的ByteBuf是否完成发送,如果完成则调用remove()清理它,否则只更新下发送进度,相关代码如下:

protected voiddoWrite(ChannelOutboundBuffer in) throws Exception {

    //代码省略...

      if (readableBytes <=writtenBytes) {

                if (writtenBytes != 0) {

                    progress(readableBytes);

                    writtenBytes -=readableBytes;

                }

                remove();

            } else {

                if (writtenBytes != 0) {

                    buf.readerIndex(readerIndex+ (int) writtenBytes);

                    progress(writtenBytes);

                }

                break;

            }

//代码省略...

}


当调用remove()方法时,最终会调用消息发送ChannelPromisetrySuccess方法,通知监听Listener消息已经完成发送,相关代码如下所示:

public booleantrySuccess(V result) {

//代码省略...

        if (setSuccess0(result)) {

            notifyListeners();

            return true;

        }

        return false;

    }

//代码省略...

}


经过以上分析可以看出,调用write/writeAndFlush方法本身并不代表消息已经发送完成,只有监听write/writeAndFlush的操作结果,在异步回调监听中计数,结果才更精确。


需要注意的是,异步回调通知由NettyNioEventLoop线程执行,即便异步回调代码写在业务线程中,也是由NettyI/O线程来执行累加计数的,因此这块儿需要考虑多线程并发安全问题,调用堆栈示例如下:


从时延毛刺问题定位到 Netty 的性能统计设计(下)


图4 消息发送结果异步回调通知执行线程


如果消息报文比较大,或者一次批量发送的消息比较多,可能会出现“写半包”问题,即一个消息无法在一次write操作中全部完成发送,可能只发送了一半,针对此类场景,可以创建GenericProgressiveFutureListener用于实时监听消息发送进度,做更精准的统计,相关代码如下所示:

privatestatic void notifyProgressiveListeners0(

            ProgressiveFuture<?> future,GenericProgressiveFutureListener<?>[] listeners, long progress,long total) {

        for(GenericProgressiveFutureListener<?> l: listeners) {

            if (l == null) {

                break;

            }

            notifyProgressiveListener0(future,l, progress, total);

        }

}


问题定位出来之后,按照正确的做法对Netty性能统计代码进行了修正,上线之后,结合调用链日志,很快定位出了业务高峰期偶现的部分服务时延毛刺较大问题,优化业务线程池参数配置之后问题得到解决。


  • 常见的消息发送性能统计误区:

在实际业务中比较常见的性能统计误区如下:

(1)调用write/ writeAndFlush方法之后就开始统计发送速率。

(2)消息编码时进行性能统计:编码之后,获取out可读的字节数,然后做累加。编码完成并不代表消息被写入到SocketChannel中,因此性能统计也不准确。

 

  • Netty关键性能指标采集:

除了消息发送速率,还有其它一些重要的指标需要采集和监控,无论是在调用链详情中展示,还是统一由运维采集、汇总和展示,这些性能指标对于故障的定界和定位帮助都很大。


  • Netty I/O线程池性能指标:

Netty I/O线程池除了负责网络I/O消息的读写,还需要同时处理普通任务和定时任务,因此消息队列积压的任务个数是衡量Netty I/O线程池工作负载的重要指标。由于Netty NIO线程池采用的是一个线程池/组包含多个单线程线程池的机制,因此不需要像原生的JDK线程池那样统计工作线程数、最大线程数等。相关代码如下所示:

publicvoid channelActive(ChannelHandlerContext ctx) throws Exception {

kpiExecutorService.scheduleAtFixedRate(()->

        {

            Iterator<EventExecutor>executorGroups = ctx.executor().parent().iterator();

            while (executorGroups.hasNext())

            {

                SingleThreadEventExecutorexecutor = (SingleThreadEventExecutor)executorGroups.next();

                int size = executor.pendingTasks();

                if (executor == ctx.executor())

                   System.out.println(ctx.channel() + "--> " + executor +" pending size in queue is : --> " + size);

                else

                    System.out.println(executor+ " pending size in queue is : --> " + size);

            }

        },0,1000, TimeUnit.MILLISECONDS);

   }

}


运行结果如下所示:



从时延毛刺问题定位到 Netty 的性能统计设计(下)


图5 Netty I/O线程池性能统计KPI数据

 

  • Netty发送队列积压消息数:

Netty消息发送队列积压数可以反映网络速度、通信对端的读取速度、以及自身的发送速度等,因此对于服务调用时延的精细化分析对于问题定位非常有帮助,它的采集方式代码示例如下:

publicvoid channelActive(ChannelHandlerContext ctx) throws Exception {

writeQueKpiExecutorService.scheduleAtFixedRate(()->

        {

            long pendingSize =((NioSocketChannel)ctx.channel()).unsafe().outboundBuffer().totalPendingWriteBytes();

            System.out.println(ctx.channel() +"--> " + " ChannelOutboundBuffer's totalPendingWriteBytes is: "

                    + pendingSize + "bytes");

        },0,1000, TimeUnit.MILLISECONDS);

}


执行结果如下:


从时延毛刺问题定位到 Netty 的性能统计设计(下)


图6 Netty Channel对应的消息发送队列性能KPI数据

由于totalPendingSizevolatile的,因此统计线程即便不是NettyI/O线程,也能够正确的读取其最新值。


  • Netty消息读取速率性能统计:

针对某个Channel的消息读取速率性能统计,可以在解码ChannelHandler之前添加一个性能统计ChannelHandler,用来对读取速率进行计数,相关代码示例如下(ServiceTraceProfileServerHandler)

public voidchannelActive(ChannelHandlerContext ctx) throws Exception {

       kpiExecutorService.scheduleAtFixedRate(()->

        {

            int readRates =totalReadBytes.getAndSet(0);

            System.out.println(ctx.channel() +"--> read rates " + readRates);

        },0,1000, TimeUnit.MILLISECONDS);

        ctx.fireChannelActive();

    }

    public void channelRead(ChannelHandlerContextctx, Object msg) {

        int readableBytes =((ByteBuf)msg).readableBytes();

        totalReadBytes.getAndAdd(readableBytes);

        ctx.fireChannelRead(msg);

}


运行结果如下所示:


从时延毛刺问题定位到 Netty 的性能统计设计(下)


图7  NettyChannel 消息读取速率性能统计

 

三、总结:

 

本文选自《Netty进阶之路:跟着案例学Netty》一书,由电子工业出版社出版,李林锋著。


当我们需要对服务调用时延进行精细化分析时,需要把Netty通信框架底层的处理耗时数据也采集走并进行分析,由于NettyI/O操作都是异步的,因此不能像传统同步调用那样的思路去做性能数据统计,需要注册性能统计监听器,在异步回调中完成计数。另外,NettyI/O线程池、消息发送队列等实现比较特殊,与传统的Tomcat等框架实现策略不同,因此对于Netty的关键性能KPI数据采集不能照搬JDKTomcat的做法。