作为一个宏大的、功能健全的muduo库,考虑的肯定是众多情况是否可以高效满足;而作为学习者,我们需要抽取其中的精华进行简要实现,这要求我们足够了解muduo库。
做项目 = 模仿 + 修改,不要担心自己学了也不会写怎么办,重要的是积累,学到了这些方法,如果下次在遇到通用需求的时候你能够回想起之前的解决方法就够了。送上一段话!
EventLoop这个类,是整个muduo的核心所在。分为mainLoop和subLoop。每一个Loop都是相互独立的,,有自己的事件循环,包括一个Poller监听者,Channel监听通道列表。mainLoop的逻辑除了对自己的事件循环做出相应的判断与行为之外,如果遇到新的连接建立,还会新申请一个subLoop。这个功能跟mainLoop中的acceptor有关,之后会讲到。整个Loop遵循一个重要原则叫做one loop one thread。意思是一个事件循环对应着一个线程。线程不能处理除自己对应的loop之外的其他loop。这种reactor优点是分开处理监听到的事件,能够将负载均衡,并且契合多处理器的特点,更大程度地接收并处理并发连接。相比于TinyWebserver那种reactor,少了很多线程安全处理上的花销,如请求队列的开锁解锁。那种reactor,所有的线程都从一个请求队列上获取连接请求,开锁与解锁等线程安全性操作花销太大。这种one loop one thread,将一个业务整理逻辑放在一个线程内,这样单个线程则不存在线程安全性花销,效率会更高。
EventLoop还用到了RAII机制,防止了指针的内存泄漏问题,将内存管理交给操作系统来完成。
EventLoop中有一个Poller监听者和Channel通道列表。这里对应的变量是std::unique_ptr<Poller> poller_;
与ChannelList activeChannels_;
设置了一个threadId_
对应one loop one thread原则。对于RAII机制,使用了智能指针。unique_ptr是独享指针,该指针指向的变量意味着不能够被拷贝构造,只能够移动构造,支持右值引用,将指针的所有权转移出去。atomic_bool是c++11提供的新特性,让该变量具有原子性,多线程访问是安全的。mutex也是用于多线程安全性的。callingPendingFunctors_是一个状态位,表示此时EventLoop正在处理一些函数回调。pendingFunctors_是函数回调队列。除此以外,某个thread运行某个loop时,如果loop锁定的thread不是现在的thread,就不符合one loop one thread
。此时使用到wakeup唤醒线程的功能,单独用一个eventfd来监听线程唤醒事件,以及一个wakeupChannel通道。
这是一个线程局部变量,意思是在变量的生命周期只在该线程中,是用来记录该线程是否已经有绑定的EventLoop了。前面一个__thread前缀来表示线程局部变量。
为唤醒线程创建一个套接字,eventfd支持NONBLOCK与CLOEXEC等宏定义。
创建一个EventLoop,需要判断这个thread是否已经有了对应的EventLoop,没有才会继续创建。还需要给wakeupChannel设定感兴趣的事件,并设定对应的回调函数。
loop循环中,包含poll监听,以及对监听到的channel通道列表中的channel进行handleEvent。对于其他的函数回调,还需要doPendingFunctors来处理对应的函数回调。
如果想要结束事件循环,也需要在自己的线程中退出事件循环。
对于处理某个事件,如果是在对应线程中,直接调用该事件回调就行了;如果是在其他线程中,就调用queueInLoop,将该事件回调放在回调队列中,等到一个事件循环最后执行doPendingFunctors来处理。
对于不在自己线程的函数回调,会将函数回调放在函数回调处理队列中。对于该loop不属于该thread以及在loop进行函数回调的过程中又来了新的回调,都会进行wakeup操作,通知对应thread进行处理。
对于EventLoop中的updateChannel和removeChannel方法,其实也是调用了其中poller的update与remove的方法。
对于wakeupFd,有读写功能,这样可以触发唤醒功能,能够让thread及时处理对应loop。
这段代码中执行了事件循环中的处理回调操作。用了一个局域unique_lock确保了functors的线程安全问题。这段代码之所以用一个vector functors来装pendingFunctors中的回调函数,而不是直接向下面这样:
void EventLoop::doPendingFunctors() {
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
for (const Functor &functor : pendingFunctors)
{
functor(); // 执行当前loop需要执行的回调操作
}
}
callingPendingFunctors_ = false;
}
有个好处就是一个线程对pendingFunctors的访问时间降低了很多,就一个swap的时间,等到有新的回调产生能够及时地正确地放入到pendingFunctors中。而向下面的代码,在执行回调的同时,占用着pendingFunctors的访问权限,这样的效率会远低于上面的代码。这也是muduo的精妙之处!
//EventLoop.h
#pragma once
#include <functional>
#include <atomic>
#include <memory>
#include <semaphore.h>
#include "Timestamp.h"
#include "noncopyable.h"
#include "Thread.h"
class Poller;
class Channel;
class EventLoop : noncopyable {
public:
using Functor = std::function<void()>;
EventLoop();
~EventLoop();
void loop();
void quit();
void runInLoop(Functor cb);
void queueInLoop(Functor cb);
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
Timestamp getPollReturnTime() const { return pollReturnTime_; }
void wakeup();
bool hasChannel(Channel* channel);
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
private:
void handleRead();
void doPendingFunctors();
using ChannelList = std::vector<Channel*>;
const pid_t threadId_;
std::unique_ptr<Poller> poller_;
Timestamp pollReturnTime_;
std::atomic_bool looping_;
std::atomic_bool quit_;
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
std::mutex mutex_;
std::atomic_bool callingPendingFunctors_;
std::vector<Functor> pendingFunctors_;
};
//EventLoop.cc
#include <sys/eventfd.h>
#include <mutex>
#include "EventLoop.h"
#include "Poller.h"
#include "Log.h"
#include "Channel.h"
__thread EventLoop* t_loopInThisThread = nullptr; //线程中对应的EventLoop
const int kPollTimeMs = 10000; //poller的等待时间
int createEventFd() {
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0) {
LOG_FATAL("%s--%s--%d--%d : eventfd error\n", __FILE__, __FUNCTION__, __LINE__, errno);
}
return evtfd;
}
EventLoop::EventLoop() : threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), looping_(false), quit_(false), callingPendingFunctors_(false), wakeupFd_(createEventFd()), wakeupChannel_(new Channel(this, wakeupFd_)) {
LOG_INFO("%s--%s--%d : EventLoop created %p in thread %d\n", __FILE__, __FUNCTION__, __LINE__, this, threadId_);
if (t_loopInThisThread) {
LOG_FATAL("%s--%s--%d--%d : Another EventLoop %p exists in this thread %d\n", __FILE__, __FUNCTION__, __LINE__, errno, this, threadId_);
}
else t_loopInThisThread = this;
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop() {
wakeupChannel_->disableAll();
wakeupChannel_->remove();
::close(wakeupFd_);
t_loopInThisThread = nullptr;
}
void EventLoop::loop() {
looping_ = true;
quit_ = false;
LOG_INFO("%s--%s--%d : EventLoop %p start looping\n", __FILE__, __FUNCTION__, __LINE__, this);
while (!quit_) {
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (Channel* channel : activeChannels_) {
channel->handleEvent(pollReturnTime_);
}
doPendingFunctors();
}
LOG_INFO("%s--%s--%d : EventLoop %p stop looping\n", __FILE__, __FUNCTION__, __LINE__, this);
looping_ = false;
}
void EventLoop::quit() {
quit_ = true;
if (!isInLoopThread()) {
wakeup();
}
}
void EventLoop::runInLoop(Functor cb) {
if (isInLoopThread()) {
cb();
}
else {
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(Functor cb) {
{
std::unique_lock<std::mutex> lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_) {
wakeup();
}
}
void EventLoop::updateChannel(Channel* channel) {
poller_->updateChannel(channel);
}
void EventLoop::removeChannel(Channel* channel) {
poller_->removeChannel(channel);
}
void EventLoop::wakeup() {
uint64_t one = 1;
ssize_t n = ::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one) {
LOG_ERROR("%s--%s--%d--%d : write error\n", __FILE__, __FUNCTION__, __LINE__, errno);
}
}
bool EventLoop::hasChannel(Channel* channel) {
return poller_->hasChannel(channel);
}
void EventLoop::handleRead() {
uint64_t one = 1;
ssize_t n = ::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one) {
LOG_ERROR("%s--%s--%d--%d : read error\n", __FILE__, __FUNCTION__, __LINE__, errno);
}
}
void EventLoop::doPendingFunctors() {
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
std::unique_lock<std::mutex> lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor &functor : functors)
{
functor(); // 执行当前loop需要执行的回调操作
}
callingPendingFunctors_ = false;
}
以上就是事件循环EventLoop类的相关介绍,以及我在进行项目重写的时候遇到的一些问题,和我自己的一些心得体会。发现写博客真的会记录好多你的成长,而且对于一个好的项目,写博客也是证明你确实有过深度思考,并且在之后面试或者工作时遇到同样的问题能够进行复盘的一种有效的手段。所以,希望uu们也可以像我一样,养成写博客的习惯,逐渐脱离菜鸡队列,向大佬前进!!!加油!!!
也希望我能够完成muduo网络库项目的深度学习与重写,并在功能上能够拓展。也希望在完成这个博客系列之后,能够引导想要学习muduo网络库源码的人,更好地探索这篇美丽繁华的土壤。致敬chenshuo大神!!!
鉴于博主只是一名平平无奇的大三学生,没什么项目经验,所以可能很多东西有所疏漏,如果有大神发现了,还劳烦您在评论区留言,我会努力尝试解决问题!