前言

周末向往常一样睡了一上午,惆怅了一个中午,下午学了会习,梳理了下Netty的线程模型是如何体现Reactor模式的。继上一篇对一些通信底层IO的C函数学习,这一篇主要是总结Java里对底层IO不同层次的抽象,每一层都为了解决什么问题?为什么Reator模型使得现在Netty处理网络IO时如此高效?带着问题,我们一起来学习。

BIO

让我们先回忆一下传统的服务器端同步阻塞I/O处理(也就是BIO,Blocking I/O)的经典编程模型:

class Server {
    public static void main() {
        ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(8088);
        while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
            Socket socket = serverSocket.accept();
            executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程
        }
    }
    static class ConnectIOnHandler implements Runnable {
        private Socket socket;
        public ConnectIOnHandler(Socket socket){
           this.socket = socket;
        }
        public void run(){
          while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){
              String someThing = socket.read();//读取数据
              if(someThing!=null){
                 ......//处理数据
                 socket.write()....//写数据
              }
          }
        }
    }
}

基本上所有的网络处理程序都有以下基本的处理过程:

  1. Read request

  2. Decode request

  3. Process service

  4. Encode reply

  5. Send reply

    如果采用一个handler去处理上面所有过程,那么上面代码的IO模型可以抽象为下图:

其实比我们读大学的时候学的高级多了,起码用到juc的线程池了,简化了使用上多线程的维护成本,降低了线程频繁创建和回收的开销(1.线程池创建线程时有个很大的全局锁。2. 线程堆栈内存分配代价),还有另外一个原因,不过不是多线程,对于socket的accept,read,write都是阻塞操作,单线程处理大量的连接必然,很容易导致此线程挂死在非CPU的IO上,进而CPU资源空闲,这也是合理的使用线程池的意义,通过多线程充分利用多核资源。之所以从上图对代码逻辑的抽象,除了编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求,菜逼容易上手外,很容易总结出BIO模型的不足之处:

  • 一个连接对应一个线程,严重依赖“高昂”的线程资源
  • 线程本身占用较大内存
  • 线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
  • 容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。

NIO AIO

所有的系统I/O都分为两个阶段:等待就绪和操作。举例来说,读函数,分为等待系统可读和真正的读;同理,写函数分为等待网卡可以写和真正的写。

需要说明的是等待就绪的阻塞是不使用CPU的,是在“空等”;而真正的读写操作的阻塞是使用CPU的,真正在”干活”,而且这个过程非常快,属于memory copy,带宽通常在1GB/s级别以上,可以理解为基本不耗时。

以socket.read()为例子:

  1. 传统的BIO里面socket.read(),如果TCP RecvBuffer里没有数据,函数会一直阻塞,直到收到数据,返回读到的数据。
  2. 对于NIO,如果TCP RecvBuffer有数据,就把数据从网卡读到内存,并且返回给用户;反之则直接返回0,永远不会阻塞。NIO底层在Linux 2.6之前是select、poll,2.6之后是epoll。
  3. 最新的AIO(Async I/O)里面会更进一步:不但等待就绪是非阻塞的,就连数据从网卡到内存的过程也是异步的。
  4. 换句话说,BIO里用户最关心“我要读”,NIO里用户最关心”我可以读了”,在AIO模型里用户更需要关注的是“读完了”。
  5. NIO一个重要的特点是:socket主要的读、写、注册和接收函数,在等待就绪阶段都是非阻塞的,真正的I/O操作是同步阻塞的(消耗CPU但性能非常高)。

Reactor模式

核心思想

分而治之

一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。

事件驱动

由于底层socket进行等待就绪的时候是阻塞的,不知道什么时候函数能返回,就只能“另起炉灶”再去创建线程处理新的连接,我们上一篇介绍过select/poll函数,使得读写操作可以立刻返回,这时候我们可以利用一个线程来轮询注册在seletor上的channel事件,我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。一次select调用会返回所有channel的就绪事件,我们的程序就可以根据不同的事件调用不同的处理器。

在NIO的支撑下基于事件驱动的Reactor应运而生,包括三种模型:

Basic Reactor Design

  • Reactor:负责响应事件,将事件分发给绑定了该事件的Handler处理;

  • Handler:事件处理器,绑定了某类事件,负责执行对应事件的Task对事件进行处理;

  • Acceptor:Handler的一种,绑定了connect事件。当客户端发起connect请求时,Reactor会将accept事件分发给Acceptor处理。

dog lea在ppt中展示的代码:

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;
    Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false); //非阻塞
        SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //分步处理,第一步,接收accept事件
        sk.attach(new Acceptor()); //attach callback object, Acceptor
    }
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey)(it.next()); //Reactor负责dispatch收到的事件
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment()); //调用之前注册的callback对象
        if (r != null)
            r.run();
    }
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                new Handler(selector, c);
            }
            catch(IOException ex) { /* ... */ }
        }
    }
}
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c; c.configureBlocking(false);
        // Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this); //将Handler作为callback对象
        sk.interestOps(SelectionKey.OP_READ); //第二步,接收Read事件
        sel.wakeup();
    }
    boolean inputIsComplete() { /* ... */ }
    boolean outputIsComplete() { /* ... */ }
    void process() { /* ... */ }
    public void run() {
        try {
            if (state == READING) read();
            else if (state == SENDING) send();
        } catch (IOException ex) { /* ... */ }
    }
    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE); //第三步,接收write事件
        }
    }
    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel(); //write完就结束了, 关闭select key
    }
}
  1. 一个线程需要执行处理所有的accept、read、decode、process、encode、send事件,处理成百上千的链路时性能上无法支撑;

  2. 一旦reactor线程意外跑飞或者进入死循环,会导致整个系统通信模块不可用。

  3. 当 NIO 线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了 NIO 线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

Worker Thread Pools

  1. 有专门一个reactor线程用于监听服务端ServerSocketChannel连接事件,接收客户端的TCP连接请求;
  2. 网络IO的读/写事件等由一个worker reactor线程池负责,由线程池中的NIO线程负责监听SocketChannel事件,进行消息的读取、解码、编码和发送。

代码示例:

Selector[] selectors; // 一个selector对应一个线程
int next = 0;
class Acceptor {
    public synchronized void run() { ...
        Socket connection = serverSocket.accept();
        if (connection != null)
            new Handler(selectors[next], connection);
        if (++next == selectors.length) next = 0;
    }
}

在极个别特殊场景中,一个 NIO 线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个 Acceptor 线程可能会存在性能不足问题,为了解决性能问题,产生了第三种 Reactor 线程模型 - 主从 Reactor 多线程模型。

Using Multiple Reactors

  1. 服务端用于接收客户端连接的不再是个1个单独的reactor线程,而是一个boss reactor线程池;
  2. 服务端启用多个ServerSocketChannel监听不同端口时,每个ServerSocketChannel的监听工作可以由线程池中的一个NIO线程完成。
  3. Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线�