本文主要介绍 TNonblockingServer 服务模型,这是thrift框架提供的一种非阻塞式IO服务模型,目前是thrift框架中最好的模型,这也是我们重点介绍的模型。

thrift是Facebook开源的一款开源跨语言的RPC通信框架,主要提供三种服务模型:1)TThreadPoolServer 服务模型,这是线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求;2)TSimpleServer 服务模型,这是简单的单线程服务模型,一般用于测试;3)TNonblockingServer 服务模型,这是thrift框架提供的一种非阻塞式IO服务模型,目前是thrift框架中最好的模型,这也是我们重点介绍的模型。

整个TNonblockingServer可以分为3部分,如下图所示,分别是IO线程,状态同步,以及工作线程,下面我们分别介绍这三部分。

1 IO线程

为了更好的叙述,我们简单介绍一下socket,socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。IO线程在thrift框架可以简单分为两种,一种是只执行读写,另外一种是既执行读写任务,还要负责socket监听,thrift中所有IO线程都保存在IOthread这个数组中, IOthread[0] 可以看成服务的主线程,整个服务的启动也是从 IOthread[0] 开始。

TNonblockingServer::serve() {

  if (ioThreads_.empty())
    registerEvents(NULL);

  // Run the primary (listener) IO thread loop in our main thread; this will
  // only return when the server is shutting down.
  ioThreads_[0]->run();

  // Ensure all threads are finished before exiting serve()
  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
    ioThreads_[i]->join();
    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
  }
}

IOthread[0]就是我们上述所说的既执行读写任务,还要负责socket监听的线程,除此之外,数组IOthread中其他线程只负责读写。

// Register the server event
   event_set(&serverEvent_,
             listenSocket_,
             EV_READ | EV_PERSIST,
             TNonblockingIOThread::listenHandler,
             server_);
   event_base_set(eventBase_, &serverEvent_);

2 状态转移

当IOthread[0] 监听到一个Socket的accept事件时,会同时间建立一个TConnection 并为其分配一个线程进行处理,并将TConnection::eventHandler和TNonblockingIOThread::notifyHandler注册到iothread的event_base上,用于记录状态转移。

TConnection 中主要有两种方法:

2.1 socket状态转移

void TNonblockingServer::TConnection::workSocket()

socket端主要分为3种状态:

/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
  1. SOCKET_RECV_FRAMING:主要标记接收帧头部。每个帧分为两个部分:帧头部和帧数据。帧头部以字节为单位标明帧数据的长度。由于整个过程是非阻塞的,读取过程可以不连续,每次尽力而为的从socket读取字节,如果头部没达到4byte(thrift-0.10.0版本),表示没有读取结束,可以存起来,下次调用的时候继续读取,直到获取到完整的帧头部,然后调用transition。(这里不会多读数据。如果对方断开连接,读取的长度会是0,此时关闭socket。server规定帧数据部分的长度不得超过256MB,这已经是相当大的一个范围了。如果帧头部指明的长度超过了server预设的最大值,则认为是读出了非法的帧格式,也会关闭socket);
  2. SOCKET_RECV:接收帧数据。依然是非阻塞,也支持非连续读取,当获得了完整的帧数据,调用transition;
  3. SOCKET_SEND:发送帧头部和帧数据。因为是非阻塞,所以要尽力而为的发送。

2.2 server状态转移

void TNonblockingServer::TConnection::transition()

server端一共有6种状态:

/**
 * Five states for the nonblocking server:
 *  1) initialize
 *  2) read 4 byte frame size
 *  3) read frame of data
 *  4) send back data (if any)
 *  5) force immediate connection close
 */
enum TAppState {
  APP_INIT,
  APP_READ_FRAME_SIZE,
  APP_READ_REQUEST,
  APP_WAIT_TASK,
  APP_SEND_RESULT,
  APP_CLOSE_CONNECTION
};
  1. APP_INIT:server最开始的状态。设置读写缓冲区等等基本工作,需要设置等待读的标记位,并开始注册读时间。
// Clear write buffer variables
    writeBuffer_ = NULL;
    writeBufferPos_ = 0;
    writeBufferSize_ = 0;

    // Into read4 state we go
    socketState_ = SOCKET_RECV_FRAMING;
    appState_ = APP_READ_FRAME_SIZE;

    readBufferPos_ = 0;

    // Register read event
    setRead();
  1. APP_READ_FRAME_SIZE:server已经读到了帧长度。调整读缓冲区大小,以适应帧数据接收,如果出现残缺,直接放弃。
readWant_ += 4;

// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) {
  if (readBufferSize_ == 0) {
    readBufferSize_ = 1;
  }
  uint32_t newSize = readBufferSize_;
  while (readWant_ > newSize) {
    newSize *= 2;
  }

  uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
  if (newBuffer == NULL) {
    // nothing else to be done...
    throw std::bad_alloc();
  }
  readBuffer_ = newBuffer;
  readBufferSize_ = newSize;
}

readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4);

// Move into read request state
socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
  1. APP_READ_REQUEST:server已经获得了完整的帧。将输入缓冲区封装为inputTransport结构,并且重置输入缓冲以待将来使用。此时,满足处理请求的条件了。如果后台不是线程池模式,那么立即执行。否则,构造一个Task结构。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// We are done reading the request, package the read buffer into transport
    // and get back some data from the dispatch function
    if (server_->getHeaderTransport()) {
      inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
      outputTransport_->resetBuffer();
    } else {
      // We saved room for the framing size in case header transport needed it,
      // but just skip it for the non-header case
      inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
      outputTransport_->resetBuffer();

      // Prepend four bytes of blank space to the buffer so we can
      // write the frame size there later.
      outputTransport_->getWritePtr(4);
      outputTransport_->wroteBytes(4);
    }

    server_->incrementActiveProcessors();
  1. APP_WAIT_TASK:server已经处理好这个请求,准备回送返回结果。一般的,计算回送帧大小填充到帧首部,然后调用setFlags(EV_WRITE | EV_PERSIST)声明有数据需要写入socket。
// We have now finished processing a task and the result has been written
    // into the outputTransport_, so we grab its contents and place them into
    // the writeBuffer_ for actual writing by the libevent thread

    server_->decrementActiveProcessors();
    // Get the result of the operation
    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);

    // If the function call generated return data, then move into the send
    // state and get going
    // 4 bytes were reserved for frame size
    if (writeBufferSize_ > 4) {

      // Move into write state
      writeBufferPos_ = 0;
      socketState_ = SOCKET_SEND;

      // Put the frame size into the write buffer
      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
      memcpy(writeBuffer_, &frameSize, 4);

      // Socket into write mode
      appState_ = APP_SEND_RESULT;
      setWrite();

      // Try to work the socket immediately
      // workSocket();

      return;
    }
  1. APP_SEND_RESULT:如果设置了每隔若干请求就重新调整输入输出缓冲区大小再执行。其实对接口比较少的应用来说意义并不大,只是之前一直是缓冲不够的时候变大,在这里把缓冲区收回来,然后就是执行APP_INIT逻辑了。
/ it's now safe to perform buffer size housekeeping.
    if (writeBufferSize_ > largestWriteBufferSize_) {
      largestWriteBufferSize_ = writeBufferSize_;
    }
    if (server_->getResizeBufferEveryN() > 0
        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
                              server_->getIdleWriteBufferLimit());
      callsForResize_ = 0;
    }
  1. APP_CLOSE_CONNECTION:关闭连接,减少活跃Processor计数,不在基本状态机里面。
server_->decrementActiveProcessors();
    close();
    return;

这6种状态大致是这样的:

3. 工作线程

thrift 框架在处理task时,设置了一个ThreadManager ,这个ThreadManager的主要任务是线程分配和状态同步,IOthread 将待处理的任务统一放到任务队列,ThreadManager 会去监测这个任务队列是否为空,不为空时,会从队列pop任务,分配处理线程。

//标记manager状态(当前工作线程数达到小于最大数或者manager_工作中&&任务队列不为空)
bool isActive() const {
    return (manager_->workerCount_ <= manager_->workerMaxCount_)
         || (manager_->state_ == JOINING && !manager_->tasks_.empty());
  }
while (active) {
    //active=true 只有有空余线程时才会计入这个状态,这个时候框架运行是非阻塞模式
    active = isActive();

    while (active && manager_->tasks_.empty()) {
      //可用线程数加1
      manager_->idleCount_++;
      //任务队列为空时进入等待
      manager_->monitor_.wait();
      //manager_离开等待时,必须有空余线程
      active = isActive();
      //一旦manager_离开等待状态,可用线程数减1
      manager_->idleCount_--;
    }

    shared_ptr<ThreadManager::Task> task;

    if (active) {
      if (!manager_->tasks_.empty()) {
        //从队列中拿取任务
        task = manager_->tasks_.front();
        manager_->tasks_.pop_front();
        if (task->state_ == ThreadManager::Task::WAITING) {
          // If the state is changed to anything other than EXECUTING or TIMEDOUT here
          // then the execution loop needs to be changed below.
          //任务执行时间超时设置
          task->state_ =
              (task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
                  ThreadManager::Task::TIMEDOUT :
                  ThreadManager::Task::EXECUTING;
        }
      }


      if (manager_->pendingTaskCountMax_ != 0
          && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
        manager_->maxMonitor_.notify();
      }
    }

    if (task) {
      if (task->state_ == ThreadManager::Task::EXECUTING) {


        //释放锁,任务执行过程中,manager不需要同步信息了
        manager_->mutex_.unlock();

        try {
          //开始执行任务(也就是用户的处理逻辑)
          task->run();
        } catch (const std::exception& e) {
          GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
        } catch (...) {
          GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
        }

        // 重新加锁,下次调度线程进入这个过程需要也要同步信息
        manager_->mu