在看此篇文章,建议先看我的往期文章:?
基于多反应堆的高并发服务器【C/C++/Reactor】(中)在EventLoop的任务队列中添加新任务-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135346492?spm=1001.2014.3001.5501一、了解socketpair函数(CodeGeex生成)
Linux 中的 socketpair 函数用于创建一个双向的 socket 连接,通常用于父子进程之间的通信。在上述代码中,socketpair 函数用于初始化一个事件循环(EventLoop)对象,并为其分配一个 socket pair,用于接收来自子进程的数据。
socketpair 函数的原型为:
#include <sys/socket.h>
int socketpair(int domain, int type, int protocol, int sv[2]);
其中,family 参数指定套接字家族(如 AF_UNIX 或 AF_INET),type 参数指定套接字类型(如 SOCK_STREAM 或 SOCK_DGRAM),protocol 参数指定套接字协议(如 0),sv[2]?参数指向一个整数数组,用于存储创建的文件描述符对。
如果 socketpair 函数返回 0,则表示创建成功,如果返回 -1,则表示创建失败。
二、socketpair 初始化
socketpair 是一个在Unix-like系统上创建套接字对的函数。它创建的两个套接字描述符(socketpair[0] 和 socketpair[1])可以用于进程间通信(IPC)。socketpair[1] 的作用如下:
>>主线程唤醒子线程?
// 定义函数指针
typedef int(*handleFunc)(void* arg);
struct Channel {
// 文件描述符
int fd;
// 事件
int events;
// 回调函数
handleFunc readCallback;// 读回调
handleFunc writeCallback;// 写回调
// 回调函数的参数
void* arg;
};
#include "Channel.h"
struct Channel* channelInit(int fd, int events, handleFunc readFunc, handleFunc writeFunc, void* arg) {
struct Channel* channel = (struct Channel*)malloc(sizeof(struct Channel));
channel->fd = fd;
channel->events = events;
channel->readFunc = readFunc;
channel->writeFunc = writeFunc;
channel->arg = arg;
return channel;
}
对evLoop->socketPair[1]作了封装,得到了一个channel:?
struct Channel* channel = channelInit(evLoop->socketPair[1],ReadEvent,
readLocalMessage,NULL,evLoop);
通过调用eventLoopAddTask(evLoop,channel,ADD);添加到了任务队列?
// channel 添加到任务队列
eventLoopAddTask(evLoop, channel,ADD);
第一步:添加任务队列?
第二步:遍历任务队列,从中取出每一个节点,根据节点里边的类型对这个节点做操作,也就是说这个channel最终会被添加到对应的检测集合里边,那么对于evLoop->socketpair[1]这个文件描述符什么时候就能被被激活呢?
通过一个写数据的函数,往socketPair[0]里边写数据的时候,通过socketPair[1]就能够接收到数据,此时这个socketPair[1]就被激活了,就去调用其读事件。
这么做的原因是:底层的dispatcher的poll、epoll_wait或select,它们有可能是阻塞的,比如说它们检测的集合里边有文件描述符没有处于激活状态,我们往这个检测集合里安插了一个内线,通过socketPair[0]去写数据,只要一写数据,对应的poll、epoll_wait或select能够检测到socketPair[1]的读事件被激活了。
如果检测到socketPair[1]的读事件被激活了,那么poll、epoll_wait或select阻塞函数就直接被解除阻塞了。它们解除阻塞了,子线程就能够正常工作了。子线程能够正常工作,子线程就能够处理任务队列里边的任务,通过主线程往evLoop->socketPair[0]发送数据(写数据),void taskWakeup(struct EventLoop* evLoop);
// 写数据
void taskWakeup(struct EventLoop* evLoop) {
const char* msg = "我是要成为海贼王的男人!";
write(evLoop->socketPair[0],msg,strlen(msg));
}
此时socketPair[1]被激活了,它所对应的读回调就会被调用。这样就能解除子线程的阻塞,让它去处理任务队列里边的任务
// 读数据 读事件回调函数在执行时需要指定参数
int readLocalMessage(void* arg) {
struct EventLoop* evLoop = (struct EventLoop*)arg;
char buffer[256];
int ret = read(evLoop->socketPair[1],buffer,sizeof(buffer));
if (ret > 0) {
printf("read msg: %s\n",buffer);
}
return 0;
}
故在下面的这篇文章的基础上,增加一些代码内容:
基于多反应堆的高并发服务器【C/C++/Reactor】(中)EventLoop初始化和启动_一个eventloop 可以有多少个连接-CSDN博客https://blog.csdn.net/weixin_41987016/article/details/135225734?csdn_share_tail=%7B%22type%22%3A%22blog%22%2C%22rType%22%3A%22article%22%2C%22rId%22%3A%22135225734%22%2C%22source%22%3A%22weixin_41987016%22%7D在EventLoop.h中添加 int sockPair[2];表示存储本地通信的fd,通过socketpair初始化
struct EventLoop{
bool isQuit;// 开关
struct Dispatcher* dispatcher;
void* dispatcherData;
// 任务队列
struct ChannelElement* head;
struct ChannelElement* tail;
// 用于存储channel的map
struct ChannelMap* channelMap;
// 线程ID,Name,mutex
pthread_t threadID;
char threadName[32];
pthread_mutex_t mutex;
int socketPair[2]; //存储本地通信的fd 通过socketpair初始化
};
在EventLoop.c中续写?
struct EventLoop* eventLoopInitEx(const char* threadName) {
struct EventLoop* evLoop = (struct EventLoop*)malloc(sizeof(struct EventLoop));
evLoop->isQuit = false; // 没有运行
evLoop->dispatcher = &EpollDispatcher;
evLoop->dispatcherData = evLoop->dispatcher->init();
// 任务队列(链表)
evLoop->head = evLoop->tail = NULL;
// 用于存储channel的map
evLoop->channelMap = channelMapInit(128);
evLoop->threadId = pthread_self(); // 当前线程ID
strcpy(evLoop->threadName,threadName == NULL ? "MainThread" : threadName); // 线程的名字
pthread_mutex_init(&evLoop->mutex, NULL);
// 已续写
int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, evLoop->socketPair);
if(ret == -1) {
perror("socketpair");
exit(0);
}
// 指定规则:evLoop->socketPair[0] 发送数据,evLoop->socketPair[1]接收数据
struct Channel* channel = channelInit(evLoop->socketPair[1],ReadEvent,
readLocalMessage,NULL,evLoop);
// channel 添加到任务队列
eventLoopAddTask(evLoop, channel,ADD);
return evLoop;
}