且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Nio2Endpoint组件:Tomcat如何实现异步I/O?(下)

更新时间:2022-08-20 23:38:43

Nio2Endpoint组件:Tomcat如何实现异步I/O?(下)



Nio2Acceptor接收新的连接后,得到一个AsynchronousSocketChannel,Nio2Acceptor把AsynchronousSocketChannel封装成一个Nio2SocketWrapper,并创建一个SocketProcessor任务类交给线程池处理,并且SocketProcessor持有Nio2SocketWrapper对象。


Executor在执行SocketProcessor时,SocketProcessor的run方法会调用Http11Processor来处理请求,Http11Processor会通过Nio2SocketWrapper读取和解析请求数据,请求经过容器处理后,再把响应通过Nio2SocketWrapper写出。


需要你注意Nio2Endpoint跟NioEndpoint的一个明显不同点是,Nio2Endpoint中没有Poller组件,也就是没有Selector。这是为什么呢?因为在异步I/O模式下,Selector的工作交给内核来做了。



Nio2Endpoint各组件设计

Nio2Acceptor


和NioEndpint一样,Nio2Endpoint用LimitLatch控制连接数,但Nio2Acceptor监听连接的过程不是在一个死循环里不断地调accept,而是回调方法。

连接监听方法:

serverSock.accept(null, this);


第二个参数this,表明Nio2Acceptor自己就是处理连接的回调类,因此Nio2Acceptor实现了CompletionHandler接口。

@Override
public void completed(AsynchronousSocketChannel socket,
        Void attachment) {
        
    if (isRunning() && !isPaused()) {
        if (getMaxConnections() == -1) {
            // 若无连接限制,则继续接收新连接
            serverSock.accept(null, this);
        } else {
            // 若有连接限制,就在线程池里执行run,run会检查连接数
            getExecutor().execute(this);
        }
        // 处理请求
        if (!setSocketOptions(socket)) {
            closeSocket(socket);
        }
    } 

为什么要执行run方法?


因为在run方法里会检查连接数,当连接达到最大数时,线程可能会被LimitLatch阻塞。


为什么要放在线程池里跑?



若放在当前线程里执行,completed方法可能被阻塞,导致该回调方法一直无法返回。

接着completed方法会调用setSocketOptions方法,在这个方法里,会创建Nio2SocketWrapper和SocketProcessor,并交给线程池处理。


Nio2SocketWrapper

封装Channel,并提供接口给Http11Processor读写数据。


Http11Processor无法阻塞等待数据的,按异步I/O模式,Http11Processor在调用Nio2SocketWrapper#read时需注册回调类,调用read后会立即返回。


可若立即返回后Http11Processor还没有读到数据,怎么办?该请求的处理不就失败了?

为解决这个问题,Http11Processor通过2次read调用完成数据读取操作:


第一次read调用

连接刚刚建立好后,Acceptor创建SocketProcessor任务类交给线程池去处理,Http11Processor在处理请求的过程中,会调用Nio2SocketWrapper#read发出第一次读请求,同时注册回调类readCompletionHandler,因为数据没读到,Http11Processor把当前的Nio2SocketWrapper标记为数据不完整。


Nio2Endpoint组件:Tomcat如何实现异步I/O?(下)


接着SocketProcessor线程被回收,Http11Processor并未阻塞等待数据。

Http11Processor维护了一个Nio2SocketWrapper列表,也就是维护了连接的状态。



第二次read调用

当数据到达后,内核已经把数据拷贝到Http11Processor指定的Buffer里,同时回调类readCompletionHandler被调用,在这个回调处理方法里会重新创建一个新的SocketProcessor任务来继续处理这个连接,而这个新的SocketProcessor任务类持有原来那个Nio2SocketWrapper,这一次Http11Processor可以通过Nio2SocketWrapper读取数据了,因为数据已经到了应用层的Buffer。


Nio2SocketWrapper#read会被调用两次,但不是串行调两次,而是Poller会先后创建两个SocketProcessor任务类,在两个线程中执行,执行过程中每次Http11Processor都会调Nio2SocketWrapper#read。

public int read(boolean block, ByteBuffer to){

//第二次调用时直接通过这个方法取数据
int nRead = populateReadBuffer(to);

...

//第一次时数据没取到,会调用下面这个方法去真正执行I/O操作并注册回调函数:
nRead = fillReadBuffer(block);

...
}

两次read可以简单理解为,连接被保留着,数据没就绪处理的线程资源先释放了。收到异步数据就绪通知后,根据原有连接重建处理线程,继续处理。阻塞期间线程可复用。


回调类readCompletionHandler

Nio2SocketWrapper是作为附件类传递的,这样在回调函数里能拿到所有上下文。

this.readCompletionHandler = new CompletionHandler<Integer, SocketWrapperBase<Nio2Channel>>() {
    public void completed(Integer nBytes, SocketWrapperBase<Nio2Channel> attachment) {
        ...
        // 通过附件类SocketWrapper拿到所有的上下文
        Nio2SocketWrapper.this.getEndpoint().processSocket(attachment, SocketEvent.OPEN_READ, false);
    }

    public void failed(Throwable exc, SocketWrapperBase<Nio2Channel> attachment) {
        ...
    }
}

总结

在异步I/O模型里,内核做了很多事情,它把数据准备好,并拷贝到用户空间,再通知应用程序去处理,也就是调用应用程序注册的回调函数。Java在操作系统 异步IO API的基础上进行了封装,提供了Java NIO.2 API,而Tomcat的异步I/O模型就是基于Java NIO.2 实现的。


由于NIO和NIO.2的API接口和使用方法完全不同,可以想象一个系统中如果已经支持同步I/O,要再支持异步I/O,改动是比较大的,很有可能不得不重新设计组件之间的接口。但是Tomcat通过充分的抽象,比如SocketWrapper对Channel的封装,再加上Http11Processor的两次read调用,巧妙地解决了这个问题,使得协议处理器Http11Processor和I/O通信处理器Endpoint之间的接口保持不变。



FAQ


  • Tomcat里NIO为什么不参考netty,通过使用堆外内存来避免零拷贝问题?
    主要还是堆外内存管理起来没有JVM堆那么方便,为了稳定性的考虑吧,另外APR就是堆外内存的方案,也就是已经提供了这个选项。


tomcat 在哪里配置 使用nioendpoint 还是nio2endpoint呢?

server.xml中:


<Connector port="8443" protocol="org.apache.coyote.http11.Http11NioProtocol"
maxThreads="150" SSLEnabled="true">

</Connector>


参考