贝壳找房中工作流程解析
本文主要介绍 TNonblockingServer 服务模型,这是thrift框架提供的一种非阻塞式IO服务模型,目前是thrift框架中最好的模型,这也是我们重点介绍的模型。
thrift是Facebook开源的一款开源跨语言的RPC通信框架,主要提供三种服务模型:1)TThreadPoolServer 服务模型,这是线程池服务模型,使用标准的阻塞式IO,预先创建一组线程处理请求;2)TSimpleServer 服务模型,这是简单的单线程服务模型,一般用于测试;3)TNonblockingServer 服务模型,这是thrift框架提供的一种非阻塞式IO服务模型,目前是thrift框架中最好的模型,这也是我们重点介绍的模型。
整个TNonblockingServer可以分为3部分,如下图所示,分别是IO线程,状态同步,以及工作线程,下面我们分别介绍这三部分。
1 IO线程
为了更好的叙述,我们简单介绍一下socket,socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议。IO线程在thrift框架可以简单分为两种,一种是只执行读写,另外一种是既执行读写任务,还要负责socket监听,thrift中所有IO线程都保存在IOthread这个数组中, IOthread[0] 可以看成服务的主线程,整个服务的启动也是从 IOthread[0] 开始。
TNonblockingServer::serve() {
if (ioThreads_.empty())
registerEvents(NULL);
// Run the primary (listener) IO thread loop in our main thread; this will
// only return when the server is shutting down.
ioThreads_[0]->run();
// Ensure all threads are finished before exiting serve()
for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
ioThreads_[i]->join();
GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
}
}
IOthread[0]就是我们上述所说的既执行读写任务,还要负责socket监听的线程,除此之外,数组IOthread中其他线程只负责读写。
// Register the server event
event_set(&serverEvent_,
listenSocket_,
EV_READ | EV_PERSIST,
TNonblockingIOThread::listenHandler,
server_);
event_base_set(eventBase_, &serverEvent_);
2 状态转移
当IOthread[0] 监听到一个Socket的accept事件时,会同时间建立一个TConnection 并为其分配一个线程进行处理,并将TConnection::eventHandler和TNonblockingIOThread::notifyHandler注册到iothread的event_base上,用于记录状态转移。
TConnection 中主要有两种方法:
2.1 socket状态转移
void TNonblockingServer::TConnection::workSocket()
socket端主要分为3种状态:
/// Three states for sockets: recv frame size, recv data, and send mode
enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
- SOCKET_RECV_FRAMING:主要标记接收帧头部。每个帧分为两个部分:帧头部和帧数据。帧头部以字节为单位标明帧数据的长度。由于整个过程是非阻塞的,读取过程可以不连续,每次尽力而为的从socket读取字节,如果头部没达到4byte(thrift-0.10.0版本),表示没有读取结束,可以存起来,下次调用的时候继续读取,直到获取到完整的帧头部,然后调用transition。(这里不会多读数据。如果对方断开连接,读取的长度会是0,此时关闭socket。server规定帧数据部分的长度不得超过256MB,这已经是相当大的一个范围了。如果帧头部指明的长度超过了server预设的最大值,则认为是读出了非法的帧格式,也会关闭socket);
- SOCKET_RECV:接收帧数据。依然是非阻塞,也支持非连续读取,当获得了完整的帧数据,调用transition;
- SOCKET_SEND:发送帧头部和帧数据。因为是非阻塞,所以要尽力而为的发送。
2.2 server状态转移
void TNonblockingServer::TConnection::transition()
server端一共有6种状态:
/**
* Five states for the nonblocking server:
* 1) initialize
* 2) read 4 byte frame size
* 3) read frame of data
* 4) send back data (if any)
* 5) force immediate connection close
*/
enum TAppState {
APP_INIT,
APP_READ_FRAME_SIZE,
APP_READ_REQUEST,
APP_WAIT_TASK,
APP_SEND_RESULT,
APP_CLOSE_CONNECTION
};
- APP_INIT:server最开始的状态。设置读写缓冲区等等基本工作,需要设置等待读的标记位,并开始注册读时间。
// Clear write buffer variables
writeBuffer_ = NULL;
writeBufferPos_ = 0;
writeBufferSize_ = 0;
// Into read4 state we go
socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_READ_FRAME_SIZE;
readBufferPos_ = 0;
// Register read event
setRead();
- APP_READ_FRAME_SIZE:server已经读到了帧长度。调整读缓冲区大小,以适应帧数据接收,如果出现残缺,直接放弃。
readWant_ += 4;
// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) {
if (readBufferSize_ == 0) {
readBufferSize_ = 1;
}
uint32_t newSize = readBufferSize_;
while (readWant_ > newSize) {
newSize *= 2;
}
uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
if (newBuffer == NULL) {
// nothing else to be done...
throw std::bad_alloc();
}
readBuffer_ = newBuffer;
readBufferSize_ = newSize;
}
readBufferPos_ = 4;
*((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
// Move into read request state
socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
- APP_READ_REQUEST:server已经获得了完整的帧。将输入缓冲区封装为inputTransport结构,并且重置输入缓冲以待将来使用。此时,满足处理请求的条件了。如果后台不是线程池模式,那么立即执行。否则,构造一个Task结构。
|
|
- APP_WAIT_TASK:server已经处理好这个请求,准备回送返回结果。一般的,计算回送帧大小填充到帧首部,然后调用setFlags(EV_WRITE | EV_PERSIST)声明有数据需要写入socket。
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
// the writeBuffer_ for actual writing by the libevent thread
server_->decrementActiveProcessors();
// Get the result of the operation
outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
// If the function call generated return data, then move into the send
// state and get going
// 4 bytes were reserved for frame size
if (writeBufferSize_ > 4) {
// Move into write state
writeBufferPos_ = 0;
socketState_ = SOCKET_SEND;
// Put the frame size into the write buffer
int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
memcpy(writeBuffer_, &frameSize, 4);
// Socket into write mode
appState_ = APP_SEND_RESULT;
setWrite();
// Try to work the socket immediately
// workSocket();
return;
}
- APP_SEND_RESULT:如果设置了每隔若干请求就重新调整输入输出缓冲区大小再执行。其实对接口比较少的应用来说意义并不大,只是之前一直是缓冲不够的时候变大,在这里把缓冲区收回来,然后就是执行APP_INIT逻辑了。
/ it's now safe to perform buffer size housekeeping.
if (writeBufferSize_ > largestWriteBufferSize_) {
largestWriteBufferSize_ = writeBufferSize_;
}
if (server_->getResizeBufferEveryN() > 0
&& ++callsForResize_ >= server_->getResizeBufferEveryN()) {
checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
server_->getIdleWriteBufferLimit());
callsForResize_ = 0;
}
- APP_CLOSE_CONNECTION:关闭连接,减少活跃Processor计数,不在基本状态机里面。
server_->decrementActiveProcessors();
close();
return;
这6种状态大致是这样的:
3. 工作线程
thrift 框架在处理task时,设置了一个ThreadManager ,这个ThreadManager的主要任务是线程分配和状态同步,IOthread 将待处理的任务统一放到任务队列,ThreadManager 会去监测这个任务队列是否为空,不为空时,会从队列pop任务,分配处理线程。
//标记manager状态(当前工作线程数达到小于最大数或者manager_工作中&&任务队列不为空)
bool isActive() const {
return (manager_->workerCount_ <= manager_->workerMaxCount_)
|| (manager_->state_ == JOINING && !manager_->tasks_.empty());
}
while (active) {
//active=true 只有有空余线程时才会计入这个状态,这个时候框架运行是非阻塞模式
active = isActive();
while (active && manager_->tasks_.empty()) {
//可用线程数加1
manager_->idleCount_++;
//任务队列为空时进入等待
manager_->monitor_.wait();
//manager_离开等待时,必须有空余线程
active = isActive();
//一旦manager_离开等待状态,可用线程数减1
manager_->idleCount_--;
}
shared_ptr<ThreadManager::Task> task;
if (active) {
if (!manager_->tasks_.empty()) {
//从队列中拿取任务
task = manager_->tasks_.front();
manager_->tasks_.pop_front();
if (task->state_ == ThreadManager::Task::WAITING) {
// If the state is changed to anything other than EXECUTING or TIMEDOUT here
// then the execution loop needs to be changed below.
//任务执行时间超时设置
task->state_ =
(task->getExpireTime() && task->getExpireTime() < Util::currentTime()) ?
ThreadManager::Task::TIMEDOUT :
ThreadManager::Task::EXECUTING;
}
}
if (manager_->pendingTaskCountMax_ != 0
&& manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
manager_->maxMonitor_.notify();
}
}
if (task) {
if (task->state_ == ThreadManager::Task::EXECUTING) {
//释放锁,任务执行过程中,manager不需要同步信息了
manager_->mutex_.unlock();
try {
//开始执行任务(也就是用户的处理逻辑)
task->run();
} catch (const std::exception& e) {
GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
} catch (...) {
GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
}
// 重新加锁,下次调度线程进入这个过程需要也要同步信息
manager_->mu
- 原文作者:知识铺
- 原文链接:https://geek.zshipu.com/post/%E4%BA%92%E8%81%94%E7%BD%91/%E8%B4%9D%E5%A3%B3%E6%89%BE%E6%88%BF%E4%B8%AD%E5%B7%A5%E4%BD%9C%E6%B5%81%E7%A8%8B%E8%A7%A3%E6%9E%90/
- 版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可,非商业转载请注明出处(作者,原文链接),商业转载请联系作者获得授权。
- 免责声明:本页面内容均来源于站内编辑发布,部分信息来源互联网,并不意味着本站赞同其观点或者证实其内容的真实性,如涉及版权等问题,请立即联系客服进行更改或删除,保证您的合法权益。转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。也可以邮件至 sblig@126.com