? ? ? ?我们的服务器缺少了一个内容:超时。每个网络应用程序都需要处理超时,因为网络的另一边可能会消失。不要只进行持续的IO操作,如读/写需要超时,但启动空闲的TCP连接也是一个好主意。要实现超时,必须修改事件循环,因为轮询是唯一被阻塞的东西。
我们的代码如下:
int rv=poll(poll_args.data(),(nfds_t)poll_args.size(),1000);
? ? ? ? ?poll系统调用接受一个timeout参数,该参数规定了用于poll系统调用的时间上限。超时值目前是1000毫秒的任意值。如果我们根据计时器设置超时值,poll应该在过期时唤醒,或在此之前唤醒;然后我们就有机会在适当的时候启动计时器。
? ? ? ? 问题是我们可能有多个计时器,poll的超时值应该是最近的计时器的超时值。需要一些数据结构来查找最近的计时器。堆数据结构是查找最小/最大值的常用选择,通常用于此目的。此外,还可以使用任何用于排序的数据结构。例如,我们可以使用AVL树来排序计时器,并可能扩展树来跟踪最小值。
? ? ? ? 让我们从添加计时器来踢出空闲的TCP连接开始。对于每个连接都有一个计时器,设置为一个固定的超时,每次在连接上有IO活动时,计时器都会更新为一个固定的超时。注意,当我们更新计时器时,它变成了最遥远的一个;因此,我们可以利用这一事实来简化数据结构;一个简单的链表足以保持计时器的顺序:新的或更新的计时器只是到列表的末尾,列表保持有序的顺序。同样,链表上的操作是O(1),哪个比排序数据结构更好
定义链表是一个微不足道的任务:
struct DList {
DList *prev = NULL;
DList *next = NULL;
};
inline void dlist_init(DList *node) {
node->prev = node->next = node;
}
inline bool dlist_empty(DList *node) {
return node->next == node;
}
inline void dlist_detach(DList *node) {
DList *prev = node->prev;
DList *next = node->next;
prev->next = next;
next->prev = prev;
}
inline void dlist_insert_before(DList *target, DList *rookie) {
DList *prev = target->prev;
prev->next = rookie;
rookie->prev = prev;
rookie->next = target;
target->prev = rookie;
}
get_monotonic_usec是获取时间的函数。注意时间戳必须是单调的。时间戳向后跳转会给计算机系统带来各种各样的麻烦。
static uint64_t get_monotonic_usec() {
timespec tv = {0, 0};
clock_gettime(CLOCK_MONOTONIC, &tv);
return uint64_t(tv.tv_sec) * 1000000 + tv.tv_nsec / 1000;
}
下一步是将列表添加到服务器和连接结构中。
static struct {
HMap db;
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// timers for idle connections
DList idle_list;
} g_data;
struct Conn {
int fd = -1;
uint32_t state = 0; // either STATE_REQ or STATE_RES
// buffer for reading
size_t rbuf_size = 0;
uint8_t rbuf[4 + k_max_msg];
// buffer for writing
size_t wbuf_size = 0;
size_t wbuf_sent = 0;
uint8_t wbuf[4 + k_max_msg];
uint64_t idle_start = 0;
// timer
DList idle_list;
};
修改后的事件循环概述:
int main() {
// some initializations
dlist_init(&g_data.idle_list);
int fd = socket(AF_INET, SOCK_STREAM, 0);
// bind, listen & other miscs
// code omitted...
// the event loop
std::vector<struct pollfd> poll_args;
while (true) {
// prepare the arguments of the poll()
// code omitted...
// poll for active fds
int timeout_ms = (int)next_timer_ms();
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), timeout_ms);
if (rv < 0) {
die("poll");
}
// process active connections
for (size_t i = 1; i < poll_args.size(); ++i) {
if (poll_args[i].revents) {
Conn *conn = g_data.fd2conn[poll_args[i].fd];
connection_io(conn);
if (conn->state == STATE_END) {
// client closed normally, or something bad happened.
// destroy this connection
conn_done(conn);
}
}
}
// handle timers
process_timers();
// try to accept a new connection if the listening fd is active
if (poll_args[0].revents) {
(void)accept_new_conn(fd);
}
}
return 0;
}
修改了几件事:
1. poll的超时参数由next_timer_ms函数计算。
2. 销毁连接的代码被移到了conn_done函数中。
3. 添加了用于触发计时器的process_timers函数。
4. 计时器在connection_io中更新,在accept_new_conn中初始化。
?