目录
场景一:父进程长时间不写入,此时管道为空,子进程尝试读取?编辑
场景二:子进程长时间不读取数据,父进程不断写入,直到管道写满
在以前我们使用多进程时,我们一般都是让父子进程分别执行各自的代码,进程之间几乎没有通信,但是实际过程中,进程间的通信是十分重要的的一件事情,一个复杂的程序一般都是有多个进程相互通信,协调进行来完成工作的。
进程间通信简称为?
IPC
(Interprocess communication),是两个不同进程间进行任务协同的必要基础。
为了满足各种需求,进程之间是需要通信的。进程间通信的主要目的包括如下几个方面:
我们知道:进程是具有独立性的,进程的数据是不能相互干扰的,如果我们要让他们进行通信无疑增加了通信的成本。
要让两个不同的进程,进行通信,前提条件肯定是:先让两个进程,看到同一份“资源”。并且这份资源由?OS
?直接或间接提供,不能属于任何一个进程,否则就会影响进程的独立性!当然如果多个进程无法看到同一份资源它们就无法进行进程间的通信。
无论后续的哪种进程间通信的解决方案,都要解决以下两个问题:
进程间通信的解决方案划分为以下几种:
管道:
- 匿名管道
- 命名管道
System V 标准:
- System V ?消息队列
- System V ?共享内存
- System V ?信号量
POSIX 标准:
- POSIX?消息队列
- POSIX?共享内存
- POSIX?信号量
- POSIX?互斥量(互斥锁)
- POSIX?条件变量
- POSIX?读写锁
?这三类方法,都是在解决第一个步骤,即让不同的进程看到同一份“资源”。
管道是Unix中最古老的进程间通信的形式。我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”。
例如我们通过 who | wc -l 命令可以看到who进程将数据传递给了?wc -l 进程,两个进程通过管道完成了简单的通信。
在命令行中输入?|
?即可使用管道;创建两个睡眠时间较长的?后台进程,注:&
?表示令当前进程变为后台进程:
可以看出,两个?sleep
?进程的?PPID
?一致,同时?PID
?连续。有一点需要注意的是我们使用管道时,管道两边的的进程都会运行起来,而不是先运行管道左边的进程然后运行管道右边的进程。而且在命令行中用管道链接的进程属于兄弟进程关系。
管道分为?匿名管道?和?命名管道,两者绝大部分原理、特点都一致,本文主要介绍?匿名管道,同时适用于?命名管道?的知识点统一称为?管道。
我们知道文件描述符表的 0、1、2 默认为标准输入、标准输出、标准错误。这些文件都有对应的缓冲区。
该进程通过 fork 创建一个子进程,子进程拷贝了父进程的PCB结构,包括 task_struct 与 struct files_struct 。因此子进程的文件描述符表中存储的指针也被拷贝下来了,指向同一批文件对象。这时,我们就做到了进程间通信的前提:让不同的进程看到同一份“资源”。
这个时候我们的父子进程就可以一个向管道文件写入数据一个从管道中读取数据,这样两个进程就可以完成通信了。
最后一个步骤就是关闭不需要的文件描述符了,如果父进程进行写入,就关闭父进程的读端,关闭子进程的写端。反之则同理。
站在?文件描述符?的角度理解进程创建管道的具体过程:
关闭不需要的文件描述符的原因:
管道这种进程间的通信方式只能进行单向通信。因为文件的缓冲区只有一个,一个缓冲区只有一个读和写位置,管道也是。例如父进程向管道中写入数据,然后子进程也写入数据,由于缓冲区的读写位置只有一个,那么我们在读取数据时,父进程与子进程写入管道的数据根本没有办法区分,就有可能造成通信错误。因此在进程通信的时候,需要确定数据的流向,分别关闭和保留父子进程文件描述符表中的读与写端。这是因为文件对象只有一个缓冲区,难以做到同时读写。
如果我们想要父子进程都能够进行读写,我们可以创建两个管道,这样它们的读写的数据就不会相互影响了。
注意:
这个系统调用可以帮我们打开一个匿名管道文件。
int
的空间)pipefd[0] 的位置放的是读端的文件描述符fd
,pipefd[1] 的位置放的是写端的文件描述符fd
。0
,代表打开管道文件成功,返回值为-1
,代表打开管道文件失败了。使用匿名管道步骤
fd
#include <iostream>
#include <cassert>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
int main()
{
// 1. 创建匿名管道
int pipefd[2] = {0};// 数组
int ret = pipe(pipefd);
if (ret < 0)
{
std::cout << "pipe error:" << errno << ", errmsg:" << strerror(errno) << std::endl;
exit(-1);
}
std::cout << "pipefd[0]: " << pipefd[0] << std::endl; //读端
std::cout << "pipefd[1]: " << pipefd[1] << std::endl; //写端
// 2. 创建子进程
pid_t id = fork();
if (id < 0)
{
std::cout << "fork error:" << errno << ", errmsg:" << strerror(errno) << std::endl;
exit(-1);
}
else if (id == 0)
{
//子进程
char buffer[1024] = {0};// 缓冲区
// 3. 关闭不需要的fd, 这里让父进程写,子进程读
close(pipefd[1]);
// 4. 开始通信
ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);//注意预留一个位置存储 '\0'
buffer[n] = '\0';
std::cout << "子进程给父进程的信息是:" << buffer << std::endl;
// 5. 子进程关闭读端并且退出
close(pipefd[0]);
exit(0);
}
// 父进程
close(pipefd[0]);// 关闭读端
int cnt = 1;// 计数器
char buffer[1024] = {0};
std::string str = "good morning, my child";
snprintf(buffer, sizeof(buffer), "%s : %d, PID: %d", str.c_str(), cnt++, getpid());
// 父进程开始通信
ssize_t n = write(pipefd[1], buffer, strlen(buffer));
if (n < 0)
{
std::cout << "write error" << errno << ", errmsg:" << strerror(errno) << std::endl;
}
// 父进程关闭写端并退出
close(pipefd[1]);
// 父进程等待子进程退出
int status = 0;
waitpid(id, &status, 0);
// 通过 status 判断子进程运行情况
if ((status & 0x7F))
{
printf("子进程异常退出,core dump: %d 退出信号:%d\n", (status >> 7) & 1, (status & 0x7F));
}
else
{
printf("子进程正常退出,退出码:%d\n", (status >> 8) & 0xFF);
}
return 0;
}
?可以看到通过管道我们能够让两个进程完成进程之间的通信。
站在?内核(管道本质)?的角度理解上述代码:
所以,看待?管道
?,就如同看待?文件
?一样!管道
?的使用和?文件
?一致,迎合?Linux一切皆文件思想。
管道是一种 半双工、单向流 的通信方式,因此在成功创建匿名管道后,需要两个待通信的进程都能获得同一个 pipefd 数组。
这就是匿名管道比较特殊的地方了:匿名管道只支持具有血缘关系的进程通信,如 父子进程、兄弟进程等,因为只有 继承 了,才能共享到 同一个 pipefd 数组。
当通信双方都获得 pipefd 数组后,需要根据情况关闭不需要的 fd,确保 单流向 的原则
注:命名管道可以支持不具有血缘关系进程间通信
关于匿名管道还有一个函数:pipe2 (了解),比 pipe?函数多一个参数2 flags,可以使匿名管道在发生特殊情况时,作出不同的动作,当 flags 为 0 时,pipe2 等价于 pipe
管道的读写规则:
PIPE_BUF 为管道大小,Linux 中为 4096 字节
原子性:不存在中间状态,确保数据的安全性。
- 管道只能进行单向通信,管道的一种特殊半双工通信(管道两侧只能一个进行写入,一个读取,一旦分工确定就不能够再进行更改了)。如果想要实现两个进程间相互进行通信,需要创建两条管道,如管道1:父进程写,子进程读;管道2:子进程写,父进程读。
- 管道的本质是文件,文件描述符fd的生命周期是随进程的,进程退出时,文件描述符fd也会消失,所以管道的生命周期是随进程的,当进程终止运行时,管道资源也会被?
OS
?回收。- 用匿名管道进行通信,这种方式通常只能够让有血缘关系的进程进行通信,因为没有血缘关系的进程并不知道应该打开哪一个管道文件,有血缘关系的进程可以通过继承来打开同一个管道文件。
循环写入循环读入,当父进程写的比较慢(每隔一秒写一次),子进程读的比较快时:
else if (id == 0)
{
//子进程
char buffer[1024] = {0};// 缓冲区
// 3. 关闭不需要的fd, 这里让父进程写,子进程读
close(pipefd[1]);
// 4. 开始通信
while(true)
{
ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);//注意预留一个位置存储 '\0'
if (n < 0) std::cout << "read error:" << errno << ", errmsg:" << strerror(errno) << std::endl;
buffer[n] = '\0';
std::cout << "子进程成功读取到父进程发送的信息:" << buffer << std::endl;
}
// 5. 子进程关闭读端并且退出
close(pipefd[0]);
exit(0);
}
// 父进程
close(pipefd[0]);// 关闭读端
int cnt = 1;// 计数器
char buffer[1024] = {0};
std::string str = "good morning, my child";
while (true)
{
std::cout << "===========================================================" <<std::endl;
std::cout << "父进程发送给子进程的信息是:" << str << std::endl;
snprintf(buffer, sizeof(buffer), "%s : %d, PID: %d", str.c_str(), cnt++, getpid());
// 父进程开始通信
ssize_t n = write(pipefd[1], buffer, strlen(buffer));
if (n < 0) std::cout << "write error" << errno << ", errmsg:" << strerror(errno) << std::endl;
sleep(1);// 每一秒写入一次
}
// 父进程关闭写端并退出
close(pipefd[1]);
运行结果是正常的,说明这种通信是合理的。
循环写入循环读入,当父进程写的比较快,子进程读的比较慢时(每隔一秒读一次):
else if (id == 0)
{
//子进程
char buffer[1024] = {0};// 缓冲区
// 3. 关闭不需要的fd, 这里让父进程写,子进程读
close(pipefd[1]);
// 4. 开始通信
while(true)
{
ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);//注意预留一个位置存储 '\0'
if (n < 0) std::cout << "read error:" << errno << ", errmsg:" << strerror(errno) << std::endl;
buffer[n] = '\0';
std::cout << "子进程成功读取到父进程发送的信息:" << buffer << std::endl;
}
sleep(1);// 每一秒读入一次
// 5. 子进程关闭读端并且退出
close(pipefd[0]);
exit(0);
}
// 父进程
close(pipefd[0]);// 关闭读端
int cnt = 1;// 计数器
char buffer[1024] = {0};
std::string str = "good morning, my child";
while (true)
{
std::cout << "===========================================================" <<std::endl;
std::cout << "父进程发送给子进程的信息是:" << str << std::endl;
snprintf(buffer, sizeof(buffer), "%s : %d, PID: %d", str.c_str(), cnt++, getpid());
// 父进程开始通信
ssize_t n = write(pipefd[1], buffer, strlen(buffer));
if (n < 0) std::cout << "write error" << errno << ", errmsg:" << strerror(errno) << std::endl;
//sleep(1);// 每一秒写入一次
}
// 父进程关闭写端并退出
close(pipefd[1]);
可以看出,父进程写了多次的内容,子进程一次就全部读取出来了,这就显现出了管道的第四个特点:4. 读和写的次数并没有强相关。?
在管道中,写入 与 读取 的次数并不是严格匹配的,此时读写次数没有强相关关系,管道是面向字节流读写的。
- 面向字节流读写又称为 流式服务:数据没有明确的分割,不分一定的报文段;与之相对应的是 数据报服务:数据有明确的分割,拿数据按报文段拿
- 不论写端写入了多少数据,只要写端停止写入,读端都可以将数据读取
管道还存在四种特殊场景:管道为空、管道为满、写端关闭、读端关闭,四种场景对应四种不同的特殊情况,都可以通过代码进行演示。
注意:?当前大部分场景中,子进程为读端,父进程为写端。
结果:如果读端读取完毕了所有的管道数据,此时管道为空,因此子进程无法读取,即 读端阻塞。只有当写端写入数据后,读端才能正常读取。
else if (id == 0)
{
//子进程
char buffer[1024] = {0};// 缓冲区
// 3. 关闭不需要的fd, 这里让父进程写,子进程读
close(pipefd[1]);
// 4. 开始通信
while(true)
{
sleep(10);// 长时间不读取数据
ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);//注意预留一个位置存储 '\0'
if (n < 0) std::cout << "read error:" << errno << ", errmsg:" << strerror(errno) << std::endl;
buffer[n] = '\0';
std::cout << "子进程成功读取到父进程发送的信息:" << buffer << std::endl;
}
// 5. 子进程关闭读端并且退出
close(pipefd[0]);
exit(0);
}
// 父进程
close(pipefd[0]);// 关闭读端
int cnt = 1;// 计数器
char buffer[1024] = {0};
while (true)
{
char ch = 'a';
ssize_t n = write(pipefd[1], &ch, 1);
std::cout << "已写入 " << cnt++ << " 字节数据" << std::endl;
if (n < 0)
std::cout << "write error" << errno << ", errmsg:" << strerror(errno) << std::endl;
}
当你运行时,你可以以看到写入65535个后,不再进行写入,过几秒以后,可以看到,子进程读取了数据:
结果:在一段时间后,管道被写满,写端无法写入数据,进入阻塞状态,而且也说明了管道的大小是64KB。只有当读端尝试将管道中的数据读走一部分后,写端才能继续写入。
通过1、2这两种特殊情况,我们能够总结出管道的第五个特点:5.?管道有一定的协同能力,让读端和写端能够按照一定的步骤进行通信(自带同步机制)
- 当读端进行从管道中读取数据时,如果没有数据,则会阻塞,等待写端写入数据;如果读端正在读取,那么写端将会阻塞等待读端,因此?管道自带 同步与互斥 机制
形象化理解:
else if (id == 0)
{
//子进程
char buffer[1024] = {0};// 缓冲区
// 3. 关闭不需要的fd, 这里让父进程写,子进程读
close(pipefd[1]);
// 4. 开始通信
while(true)
{
ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);
if (n > 0)
{
buffer[n] = '\0';
std::cout << "子进程成功读取到父进程发送的信息:" << buffer << std::endl;
}
else if (n == 0)
{
std::cout << "写端已关闭,读取数据量为:" << n << " 字节" << std::endl;
break;
}
else if (n < 0)
{
std::cout << "read error" << errno << " , errmsg:" << strerror(errno) << std::endl;
}
}
// 5. 子进程关闭读端并且退出
close(pipefd[0]);
exit(0);
}
// 父进程
close(pipefd[0]);// 关闭读端
int cnt = 0;// 计数器
char buffer[1024] = {0};
std::string str = "good morning, my child";
//父进程只写入一次数据,然后关闭写端
std::cout << "父进程发送给子进程的信息是:" << str << std::endl;
snprintf(buffer, sizeof(buffer), "%s : %d, PID: %d", str.c_str(), cnt++, getpid());
ssize_t n = write(pipefd[1], buffer, strlen(buffer));
// 父进程关闭写端并退出
close(pipefd[1]);
// 父进程等待子进程退出
int status = 0;
waitpid(id, &status, 0);
// 通过 status 判断子进程运行情况
if ((status & 0x7F))
{
printf("子进程异常退出,core dump: %d 退出信号:%d\n", (status >> 7) & 1, (status & 0x7F));
}
else
{
printf("子进程正常退出,退出码:%d\n", (status >> 8) & 0xFF);
}
return 0;
结果:关闭写端后,读端会将匿名管道中的数据读取完后,再读,会读到?0
,表示已读到文件末尾
如何理解?
0
?字节数据,表明此时已经读到了结尾,读端也可以结束读取了注:这里将角色变换一下,方便父进程捕捉到子进程的退出信号,切换:父进程 -> 读端,子进程 -> 写端
else if (id == 0)
{
//子进程
// 3. 关闭不需要的fd
close(pipefd[0]);
int cnt = 0;// 计数器
char buffer[1024] = {0};
std::string str = "good morning, my father";
// 4. 开始通信
while(true)
{
std::cout << "===========================================================" <<std::endl;
std::cout << "子进程发送给父进程的信息是:" << str << std::endl;
snprintf(buffer, sizeof(buffer), "%s : %d, PID: %d", str.c_str(), cnt++, getpid());
// 子进程开始通信
ssize_t n = write(pipefd[1], buffer, strlen(buffer));
if (n < 0) std::cout << "write error" << errno << ", errmsg:" << strerror(errno) << std::endl;
sleep(1);
}
// 5. 子进程关闭写端并且退出
close(pipefd[1]);
exit(0);
}
// 父进程
close(pipefd[1]);// 关闭写端
char buffer[1024] = {0};
int cnt = 3;
// 父进程开始通信
while (cnt--)
{
ssize_t n = read(pipefd[0], buffer, sizeof(buffer)-1);
if (n > 0)
{
buffer[n] = '\0';
std::cout << "父进程成功读取到父进程发送的信息:" << buffer << std::endl;
}
else if (n == 0)
{
std::cout << "写端已关闭,读取数据量为:" << n << " 字节" << std::endl;
break;
}
else if (n < 0)
{
std::cout << "read error" << errno << " , errmsg:" << strerror(errno) << std::endl;
}
}
// 父进程关闭读端
close(pipefd[0]);
// 父进程等待子进程退出
int status = 0;
waitpid(id, &status, 0);
// 通过 status 判断子进程运行情况
if ((status & 0x7F))
{
printf("子进程异常退出,core dump: %d 退出信号:%d\n", (status >> 7) & 1, (status & 0x7F));
}
else
{
printf("子进程正常退出,退出码:%d\n", (status >> 8) & 0xFF);
}
return 0;
结果:OS不会维护无意义,低效率,或者浪费资源的事情,这种行为没有意义!如果关闭了读端,那么证明写端写了也没用,即没有存在的意义,所以OS会杀死一直在写入的进程! OS会通过信号来终止进程,13)SIGPIPE
场景:父进程创建了一批子进程,并通过多条匿名管道与它们链接,父进程选择某个子进程,并通过匿名管道与子进程通信,并下达指定的任务让其执行。
首先创建一批子进程及匿名管道 -> 子进程(读端)阻塞,等待写端写入数据 -> 选择相应的进程,并对其写入任务编号(数据)-> 子进程拿到数据后,执行相应任务:
注意:?因为是创建子进程,所以存在关系重复继承的情况,此时应该统计当前子进程的写端?fd ,在创建下一个进程时,关闭无关的?fd
具体体现为:每次都把 写端?fd?存储起来,在确定关系前 “清理” 干净
const int gnum = 3;
Task t;
class EndPoint
{
private:
static int number;
public:
pid_t _child_id;
int _write_fd;
string processname;
public:
EndPoint(int id, int fd):_child_id(id),_write_fd(fd)
{
char namebuffer[64];
snprintf(namebuffer, sizeof(namebuffer), "process-%d[%d:%d]", number++, _child_id, _write_fd);
processname = namebuffer;
}
const string& name() const
{
return processname;
}
~EndPoint(){}
};
int EndPoint::number = 0;
void creatProcesses(vector<EndPoint>* end_points)
{
for(int i = 0; i < gnum; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
//创建进程
pid_t id = fork();
assert(id != -1);
if(id == 0)
{
//子进程
close(pipefd[1]);
//我们期望,所有的子进程读取“指令”,都从标准输入读取
//输入重定向,也可以不重定向,只要在WaitCommand函数里传参fd就可以了
dup2(pipefd[0], 0);
//子进程开始等待获取命令
WaitCommand();
exit(0);
}
//父进程
close(pipefd[0]);
//将新的子进程和他的管道写端构造对象
end_points->push_back(EndPoint(id, pipefd[1]));
}
}
子进程在创建完成后,需要进入一个?等待阶段?->?读端阻塞,同时当子进程读取到相应的?指令?时,需要执行相应任务,这里将封装成了一个类,并通过对象调用函数
//子进程要执行的方法
void WaitCommand()
{
while(1)
{
char command = 0;
int n = read(0, &command, sizeof(int));
if(n == sizeof(int))
{
t.Execute(command);
}
else if(n == 0)
{
cout << "父进程让我退出,我的pid:" << getpid() << endl;
break;
}
else
{
break;
}
}
}
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
typedef void (*fun_t)();
void Print()
{
std::cout << "pid: " << getpid() << "打印任务正在执行" << std::endl;
}
void InsertMySQL()
{
std::cout << "pid: " << getpid() << "数据库任务正在执行" << std::endl;
}
void NetRequest()
{
std::cout << "pid: " << getpid() << "网络请求任务正在执行" << std::endl;
}
//约定每一个command都是4个字节
#define COMMAND_PRINT 0
#define COMMAND_MYSQL 1
#define COMMAND_REQEUST 2
class Task
{
public:
Task()
{
funcs.push_back(Print);
funcs.push_back(InsertMySQL);
funcs.push_back(NetRequest);
}
void Execute(int command)
{
if(command >= 0 && command < funcs.size())
{
funcs[command];
}
}
~Task(){}
public:
std::vector<fun_t> funcs;
};
当所有子进程都完成注册后(统计至数组中),可以让用户输入下标选择程序、输入任务编号选择任务、或者输入程序退出
注意:?因为当前子进程编号从?1
?开始,所以在进行下标访问时,需要?-1
?避免越界
int ShowBoard()
{
cout << "########################################" << endl;
cout << "### 0.执行打印任务 1.执行数据库任务###" << endl;
cout << "### 2.执行请求任务 3.退出 ###" << endl;
cout << "########################################" << endl;
cout << "请选择" << endl;
int command = 0;
cin >> command;
return command;
}
void ctrlProcess(const vector<EndPoint>& end_points)
{
int num = 0;
int cnt = 0;
while(1)
{
//选择任务
int command = ShowBoard();
if(command == 3) break;
if(command < 0 || command > 2) continue;
//选择进程
int index = cnt++; // 从编号1开始
cnt %= end_points.size();
cout << "选择了进程: " << end_points[index].name() << " | 处理任务: " << command << endl;
//下发任务
write(end_points[index]._write_fd, &command, sizeof(command));
}
}
子进程回收十分简单,因为已经在数组中存储了各个子进程的?PID
,只需要遍历等待回收即可
//子进程要执行的方法
void WaitProcess(const vector<EndPoint>& end_points)
{
for(int end = end_points.size() - 1; end >= 0; --end)
{
cout << "父进程让我退出: " << end_points[end]._child_id << endl;
close(end_points[end]._write_fd);
waitpid(end_points[end]._child_id, nullptr, 0);
cout << "父进程回收了子进程:" << end_points[end]._child_id << endl;
}
sleep(10);
}
其中, 退出进程的循环要在?vector?中从后向前遍历;这是因为我们使用的进程退出方式,是通过关闭父进程写端的文件对象,让OS杀死子进程实现的。而在我们创建子进程与管道的?creatProcesses?函数中,是通过循环一个一个创建的:
这种创建方式在创建第一组管道与子进程时不会有任何问题。但是在之后创建第二、第三组乃至更多时,由于它们的?task_struct?也是父进程的拷贝,就会导致后面子进程的文件描述符表里保留指向前几个管道的指针:?
如果是从前向后依次关闭父进程的写端,那么因为第一个子进程对应的管道的写端不只有父进程一个,还有其他两个子进程,因此第一个子进程就不会被OS杀死,从而在下面执行 waitpid 函数时造成堵塞。
为了真正构建每一个管道的写端都只有父进程一个的构造,则可以改写 creatProcesses 函数的代码,在创建时就直接关闭子进程对应的写端文件:
演示:
总体来说,在使用这个小程序时,以下关键点还是值得多注意的
- 注册子进程信息时,存储的是 写端 fd,目的是为了通过此 fd 向对应的子进程写数据,即使用不同的匿名管道
- 创建管道后,需要关闭父、子进程中不必要的 fd
- 需要特别注意父进程写端 fd 被多次继承的问题,避免因写端没有关干净,而导致读端持续阻塞
- 关闭读端对应的写端后,读端会读到 0,可以借助此特性结束子进程的运行
- 在选择进程 / 任务 时,要做好越界检查
- 等待子进程退出时,需要先关闭写端,子进程才会退出,然后才能正常等待
(三)完整代码
ctrlProc.cc
#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include "Task.hpp"
using namespace std;
const int gnum = 3;
Task t;
class EndPoint
{
private:
static int number;
public:
pid_t _child_id;
int _write_fd;
string processname;
public:
EndPoint(int id, int fd):_child_id(id),_write_fd(fd)
{
char namebuffer[64];
snprintf(namebuffer, sizeof(namebuffer), "process-%d[%d:%d]", number++, _child_id, _write_fd);
processname = namebuffer;
}
const string& name() const
{
return processname;
}
~EndPoint(){}
};
int EndPoint::number = 0;
//子进程要执行的方法
void WaitCommand()
{
while(1)
{
char command = 0;
int n = read(0, &command, sizeof(int));
if(n == sizeof(int))
{
t.Execute(command);
}
else if(n == 0)
{
cout << "父进程让我退出,我的pid:" << getpid() << endl;
break;
}
else
{
break;
}
}
}
void creatProcesses(vector<EndPoint>* end_points)
{
vector<int> fds;
for(int i = 0; i < gnum; i++)
{
//创建管道
int pipefd[2] = {0};
int n = pipe(pipefd);
assert(n == 0);
(void)n;
//创建进程
pid_t id = fork();
assert(id != -1);
if(id == 0)
{
for(auto& fd : fds) close(fd);
//子进程
close(pipefd[1]);
//我们期望,所有的子进程读取“指令”,都从标准输入读取
//输入重定向,也可以不重定向,只要在WaitCommand函数里传参fd就可以了
dup2(pipefd[0], 0);
//子进程开始等待获取命令
WaitCommand();
exit(0);
}
//父进程
close(pipefd[0]);
//将新的子进程和他的管道写端构造对象
end_points->push_back(EndPoint(id, pipefd[1]));
fds.push_back(pipefd[1]);
}
}
int ShowBoard()
{
cout << "########################################" << endl;
cout << "### 0.执行打印任务 1.执行数据库任务###" << endl;
cout << "### 2.执行请求任务 3.退出 ###" << endl;
cout << "########################################" << endl;
cout << "请选择" << endl;
int command = 0;
cin >> command;
return command;
}
void ctrlProcess(const vector<EndPoint>& end_points)
{
int num = 0;
int cnt = 0;
while(1)
{
//选择任务
int command = ShowBoard();
if(command == 3) break;
if(command < 0 || command > 2) continue;
//选择进程
int index = cnt++;// 从编号1开始
cnt %= end_points.size();
cout << "选择了进程: " << end_points[index].name() << " | 处理任务: " << command << endl;
//下发任务
write(end_points[index]._write_fd, &command, sizeof(command));
}
}
// void WaitProcess(const vector<EndPoint>& end_points)
// {
// //让子进程全部退出,通过关闭写端的方式
// for(const auto& ep : end_points) close(ep._write_fd);
// cout << "父进程让所有子进程退出" << endl;
// sleep(5);
// //父进程回收子进程的僵尸状态
// for(const auto& ep : end_points) waitpid(ep._child_id, nullptr, 0);
// cout << "父进程回收了所有的子进程" << endl;
// sleep(1);
// }
void WaitProcess(const vector<EndPoint>& end_points)
{
for(int end = end_points.size() - 1; end >= 0; --end)
{
cout << "父进程让我退出: " << end_points[end]._child_id << endl;
close(end_points[end]._write_fd);
waitpid(end_points[end]._child_id, nullptr, 0);
cout << "父进程回收了子进程:" << end_points[end]._child_id << endl;
}
sleep(10);
}
int main()
{
//先进行构建控制结构,父进程写入,子进程读取
vector<EndPoint> end_points;
creatProcesses(&end_points);
ctrlProcess(end_points);
//处理退出问题
WaitProcess(end_points);
return 0;
}
?Task.hpp
#pragma once
#include <iostream>
#include <vector>
#include <unistd.h>
typedef void (*fun_t)();
void Print()
{
std::cout << "pid: " << getpid() << "打印任务正在执行" << std::endl;
}
void InsertMySQL()
{
std::cout << "pid: " << getpid() << "数据库任务正在执行" << std::endl;
}
void NetRequest()
{
std::cout << "pid: " << getpid() << "网络请求任务正在执行" << std::endl;
}
//约定每一个command都是4个字节
#define COMMAND_PRINT 0
#define COMMAND_MYSQL 1
#define COMMAND_REQEUST 2
class Task
{
public:
Task()
{
funcs.push_back(Print);
funcs.push_back(InsertMySQL);
funcs.push_back(NetRequest);
}
void Execute(int command)
{
if(command >= 0 && command < funcs.size())
{
funcs[command];
}
}
~Task(){}
public:
std::vector<fun_t> funcs;
};
简单,给匿名管道起个名字就变成了命名管道
那么如何给?匿名管道?起名字呢?
可以将命名管道理解为 “挂名” 后的匿名管道,把匿名管道加入文件系统中,但仅仅是挂个名而已,目的就是为了让其他进程也能看到这个文件(文件系统中的文件可以被所有进程看到)
创建命名管道指令:
mkfifo 管道名
命名管道可以从命令行上创建,命令行方法是使用下面这个命令:
注意:因为没有 Data block,所以命名管道这个特殊文件大小为 0
从侧面说明?管道文件就是一个纯纯的内存级文件,有自己的上限,出现在文件系统中,只是单纯挂个名而已。
可以直接在命令行中使用命名管道:
echo
?可以进行数据写入,可以重定向至?fifo
cat
?可以进行数据读取,同样也可以重定向于?fifo
??另一种方式是在代码中创建有名管道:
返回值 int? | 创建成功返回 0,否则 -1 |
const char *pathname | 创建命名管道文件的路径+名字 |
mode_t mode | 创建命名管道文件时的权限 |
/home/xxx/namePipeCode/fifo
,也可以传递相对路径?./fifo
,当然绝对路径更灵活,但也更长mode_t
?其实就是对?unsigned int
?的封装,等价于?uint32_t
,而?mode
?就是创建命名管道时的初始权限,实际权限需要经过?umask
?掩码计算#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
int main()
{
umask(0);
mkfifo("./fifo", 0666);
return 0;
}
生成命名管道文件:?
把视角拉回文件系统:当重复多次打开同一个文件时,并不会费力的打开多次,而且在第一次打开的基础上,对?struct file
?结构体中的引用计数?++
,所以对于同一个文件,不同进程打开了,看到的就是同一个
?因为命名管道适用于独立的进程间?IPC
,所以无论是读端和写端,进程?A
、进程?B
?为其分配的?fd
?是一致的,都是 3?。
思路:
- 创建 服务端 server 和 客户端 client 两个独立的进程,
- 服务端 server 创建并以 读 的方式打开管道文件,客户端 client 以 写 的方式打开管道文件,
- 打开后俩进程可以进程通信,
- 通信结束后,由客户端关闭(写端), 服务端(读端) 读取到 0 后也关闭并删除命令管道文件)
注意:
unlink 命令管道文件名 //删除管道文件
为了让服务端和客户端能享有同一个文件名,可以创建一个公共头文件?common.h
,其中存储 命名管道文件名及默认权限等公有信息
comm.hpp
#pragma once
#include <iostream>
#include <string>
const std::string fifo_name = "./fifo"; //管道名
uint32_t mode = 0666; //权限
服务端 server.cc
#include <iostream>
#include <cerrno>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "comm.hpp"
int main()
{
// 1. 创建一个管道文件
umask(0);// 这个设置不会影响系统默认设置,只会影响当前进程
int n = mkfifo(fifoname.c_str(), mode);
if (n != 0)
{
std::cerr << "create error:" << errno << " | errmsg:" << strerror(errno) << std::endl;
exit(0);
}
std::cout << "creart fifo file success!" << std::endl;
// 2. 以读的方式打开文件
int rfd = open(fifoname.c_str(), O_RDONLY);
if (rfd < 0)
{
std::cerr << "open error:" << errno << " | errmsg:" << strerror(errno) << std::endl;
exit(0);
}
std::cout << "open fifo file success!" << std::endl;
// 3. 开始通信 - 等待客户端发送的信息
char buff[NUM];
while(true)
{
buff[0] = 0;
ssize_t n = read(rfd, buff, sizeof(buff)-1);
if (n > 0)
{
buff[n] = 0;
std::cout << "message that client sent### " << buff << std::endl;
}
else if (n == 0)
{
std::cout << "client quit, me too" << std::endl;
break;
}
else
{
std::cout << "open error:" << errno << ", errmsg:" << strerror(errno) << std::endl;
break;
}
}
close(rfd);
// 4. 删除管道文件
unlink(fifoname.c_str());
return 0;
}
客户端 client.cc
#include <iostream>
#include <cstdio>
#include <cerrno>
#include <cstring>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "comm.hpp"
int main()
{
// 不需创建管道文件,只需打开即可
int wfd = open(fifoname.c_str(), O_WRONLY);
if (wfd < 0)
{
std::cerr << "open error:" << errno << " | errmsg:" << strerror(errno) << std::endl;
exit(0);
}
// 2. 开始通信 - 写入数据
char buff[NUM];
while(true)
{
std::cout << "client send message### ";
char *msg = fgets(buff, sizeof(buff), stdin);
assert(msg);
(void)msg;
buff[strlen(buff) - 1] = 0; // 去除 '\n'
// abc'\n''\0' ——> abc'\0''\0' <==> abc'\0'
if(strcasecmp("quit", buff) == 0) break;
ssize_t n = write(wfd, buff, strlen(buff));
assert(n > 0);
(void)n;
}
close(wfd);
return 0;
}
我们需先执行服务端,创建管道文件,然后再执行客户端,才能打开管道文件
运行效果:
System V 是一套独立于操作系统外的标准,是一个专门为了通信设计出的内核模块,我们称之为 System V 的 IPC 通信机制。共享内存 全称?System V ?共享内存,是一种进程间通信解决方案,并且是所有解决方案中最快的一个,在通信速度上可以做到一骑绝尘。
先来看看?System V?共享内存的工作原理:
这块公共区域就是?共享内存。
显然,共享内存的目的也是?让不同的进程看到同一份资源(同一个内存)
关于共享区:共享区作为虚拟地址空间中一块缓冲区域,既可作为堆栈生长扩展的区域,也可用来存储各种进程间的公共资源,比如这里的共享内存,以及之前学习的动态库,相关信息都是存储在共享区中
注意:?共享内存块的创建、进程间建立映射都是由?OS
?实际执行的
共享内存也不止存在一份,当出现多块共享内存时,操作系统不可能一一比对进行使用,秉持着高效的原则,操作系统会把已经创建的共享内存组织起来,更好的进行管理。
所以共享内存,不是我们想的那样,只要在内存中开辟空间即可,系统也要为了管理共享内存,构建对应的描述共享内存的结构体对象! 在Linux中这个结构体是struct shmid_ds,这个结构体里面存放了共享内存的各种属性。
所以:共享内存 = 共享内存的内核数据结构(struct shmid_ds) + 真正开辟的内存空间
共享内存不止用于两个进程间通信,所以共享内存必须确保能持续存在,这也就意味着共享内存的生命周期不随进程,而是随操作系统,一旦共享内存被创建,除非被删除,否则将会一直存在,因此?操作系统需要对共享内存的状态加以描述。
注:shm
?表示共享内存
struct shmid_ds
{
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};
其中 struct ipc_perm 中存储了?共享内存中的基本信息,具体包含内容如下:
struct ipc_perm
{
__kernel_key_t key;
__kernel_uid_t uid;
__kernel_gid_t gid;
__kernel_uid_t cuid;
__kernel_gid_t cgid;
__kernel_mode_t mode;
unsigned short seq;
};
共享内存虽然属于文件系统,但它的结构是经过特殊设计的,与文件系统中的?inode
?那一套结构逻辑不一样。
组成部分 | 含义 |
返回值 int | 创建成功返回共享内存的 shmid,失败返回 -1 |
参数1 key_t key | 创建共享内存的唯一 key 值,通过 ftok 函数获取 |
参数2 size_t size | 创建共享内存的大小,一般为 4096 |
参数3 int shmflg? | 位图,可以设置共享内存的创建方式及创建权限 |
因为共享内存拥有自己的数据结构,所以?返回值?int?实际就是?shmid,类似于文件系统中的?fd
,用来对不同的共享内存块进行操作。
参数2为创建共享内存的大小,单位是字节,一般设为?4096
?字节(4kb
),与一个?PAGE
?页大小相同,有利于提高?IO
?效率。
参数3是位图结构,类似于 open 函数中的参数3(文件打开方式),常用的标志位有以下几个:
参数1比较特殊,key_t 实际就是对 int?进行了封装,表示一个数字,用来标识不同的共享内存块,可以理解为 inode,因为是标识值,所以必须确保 唯一性,需要使用函数 ftok 根据不同的 项目路径 + 项目编号 + 特殊的算法,生成一个碰撞率低的标识值,供操作系统对共享内存进行区分和调用。
组成部分 | 含义 |
---|---|
返回值 key_t | 返回生成的标识值,等价于?int ?类型 |
参数1 const char *pathname | 项目路径,可使用 绝对 或 相对 路径 |
参数2 int proj_id | 项目编号,可以根据实际情况编写 |
这个key值单独看起来是没有任何作用的,它要与其他的系统调用结合起来才会发挥作用,未来要通信的两个进程就可以在函数?ftok 输入相同的相同的参数,生成相同的key值,然后通过这个key值就能确定它们想要通信的共享内存是哪一个了。
实操代码:
common.hpp
#include <iostream>
#include <cerrno>
#include <cstring>
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/shm.h>
using namespace std;
#define PATHNAME "." // 项目名
#define PROJID 0x0001 // 项目编号
const int gsize = 4096;
const mode_t mode = 0666;
// 将十进制转换成十六进制
string toHex(int x)
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "0x%x", x);
return buffer;
}
// 获取共享内存唯一标识
key_t getKey()
{
key_t key = ftok(PATHNAME, PROJID);
if (key == -1)
{
cerr << errno << ":" << strerror(errno) << endl;
exit(1);
}
return key;
}
// 共享内存助手
int createShmHelper(key_t key, int size, int flag)
{
int shmid = shmget(key, size, flag);
if(shmid == -1)
{
cerr << errno << ":" << strerror(errno) << endl;
exit(2);
}
return shmid;
}
// 创建共享内存
int createShm(key_t key, int size)
{
return createShmHelper(key, size, IPC_CREAT | IPC_EXCL | mode);
}
// 获取共享内存
int getShm(key_t key, int size)
{
return createShmHelper(key, size, IPC_CREAT);
}
server.cc?
#include "common.hpp"
int main()
{
// 创建key
key_t key = getKey();
cout << "server key: " << toHex(key) << endl;
// 服务端创建共享内存
int shmid = createShm(key, gsize);
cout << "server shmid: " << shmid << endl;
return 0;
}
?client.cc
#include "common.hpp"
int main()
{
key_t key = getKey();
cout << "client key: " << toHex(key) << endl;
// 客户端打开内存
int shmid = getShm(key, gsize);
cout << "client shmid: " << shmid << endl;
return 0;
}
通过 shmget 和 ftok 函数获得唯一的 key 和 shmid
查看共享内存资源指令:
ipcs -m
从左到右依次为key值、 shmid、拥有者、权限、大小、挂接数、状态
共享内存 5 就是通过上述代码生成的
注意:?因为共享内存每次都是随机生成的,所以每次生成的 key 和 shmid?都不一样
?当我们再次运行程序时,会出现下面这种情况:
服务端运行失败,原因是 shmget 创建共享内存失败,这是因为服务端创建共享内存时,传递的参数为 IPC_CREAT | IPC_EXCL,其中 IPC_EXCL 注定了当共享内存存在时,创建失败。
而客户端只是单纯的获取共享内存,同时也只传递了 IPC_CREAT 参数,所以运行才会成功。
综上所述,服务端运行失败的根本原因是 待创建的共享内存已存在,如果想要成功运行,需要先将原共享内存释放。
共享内存的释放方式主要有以下两种:
可以直接在命令行中通过指令,根据?shmid
?释放指定共享内存
ipcrm -m shmid
下文会详解这个函数
shmctl(shmid, IPC_RMID, NULL);
server.cc
#include "common.hpp"
int main()
{
// 创建key
key_t key = getKey();
cout << "server key: " << toHex(key) << endl;
// 服务端创建共享内存
int shmid = createShm(key, gsize);
cout << "server shmid: " << shmid << endl;
int n = 5;
while (n)
{
// 运行五秒后删除共享内存
cout << n-- << endl;
sleep(1);
}
// 删除共享内存
shmctl(shmid, IPC_RMID, NULL);
cout << "共享内存" << shmid << "已删除" << endl;
return 0;
}
当服务端运行结束时,自动删除共享内存?:
共享内存在被成功创建后,进程还不 “认识” 它,只有让待通信进程都 “认识” 同一个共享内存后,才能进行正常通信,让进程 “认识” 共享内存这一操作称为?关联。
当进程与共享内存关联后,共享内存才会?通过页表映射至进程的虚拟地址空间中的共享区中。
组成部分 | 含义 |
返回值 void* | 如图malloc一样,返回的是 void* 指针,可以根据需求进行强转 |
参数1 int shmid | 待关联的共享内存 id |
参数2 const void *shmaddr | 共享内存挂接成功后得到的共享内存的虚拟地址的起始地址,一般 设置为?NULL?,让系统自己挂接,可以不用管 |
参数3 int shmflg | 关联后,进程对共享内存的读写属性 |
当进程与共享内存关联后,返回的就是共享内存映射至共享区的起始地址
共享内存映射至共享区时,我们可以指定映射位置(即传递参数2),但我们一般不知道具体地址,所以?可以传递?NULL
,让编译器自动选择位置进行映射。
关于参数3,一般直接设为?0
,表示关联后,共享内存属性为 默认读写权限,更多选项如下所示:
SHM_RND
?若?shmaddr
?不为?NULL
,则关联地址自动向下调整为?SHMLBA
?的整数倍,SHMLBA
?的值为?PAGE_SIZE
,具体调整公式:shmaddr - (shmaddr % SHMLBA)
一般通信数据为字符,所以可以将?shmat
?的返回值强转为?char*
在common.hpp添加一个接口:
// 关联共享内存
char* attachShm(int shmid)
{
char *start = (char*)shmat(shmid, nullptr, 0);
return start;
}
server.cc
#include "common.hpp"
int main()
{
// 创建key
key_t key = getKey();
cout << "server key: " << toHex(key) << endl;
// 服务端创建共享内存
int shmid = createShm(key, gsize);
cout << "server shmid: " << shmid << endl;
sleep(3);
// 将自己与共享内存关联起来
char *start = attachShm(shmid);
sleep(3);
sleep(5);
// 删除共享内存
shmctl(shmid, IPC_RMID, NULL);
cout << "共享内存" << shmid << "已删除" << endl;
return 0;
}
?client.cc
#include "common.hpp"
int main()
{
key_t key = getKey();
cout << "client key: " << toHex(key) << endl;
int shmid = getShm(key, gsize);
cout << "client shmid: " << shmid << endl;
sleep(3);
// 将自己与共享内存关联起来
char *start = attachShm(shmid);
sleep(5);
return 0;
}
共享内存信息中的?nattch
?表示当前共享内存中的进程关联数 。注意:?程序运行结束后,会自动取消关联状态:
结论:共享内存的生命周期是随内核的,即进程退出以后如果没有删除共享内存,则共享内存不会消失!
这个函数使用非常简单,将已关联的共享内存地址传递进行去关联即可、
返回值:去关联成功返回 0,失败返回 -1,并将错误码设置
如同关闭 FILE*、fd、free 等一些列操作一样,当我们关联共享内存,使用结束后,需要进行去关联,否则会造成内存泄漏(指针指向共享内存,访问数据)。
// common.hpp
// 去关联共享内存
void detachShm(char *start)
{
int n = shmdt(start);
assert(n != -1);
(void)n;
}
// server.cc / client.cc
// 去关联
detachShm(start);
注意:
共享内存在被删除后,已成功挂接的进程仍然可以进行正常通信,不过此时无法再挂接其他进程
共享内存被提前删除后,状态?status
?变为 销毁?dest
System V标准中还为共享内存提供了一个控制函数 shmctl , 其原型如下图所示:
组成部分 | 含义 |
返回值 int | 成功返回 0,失败返回 -1 |
参数1 int shmid | 待控制的共享内存 id |
参数2 int cmd | 控制共享内存的具体动作,同样是位图 |
参数3 struct shmid_ds *buf | 用于获取或设置所控制共享内存的具体数据结构, shmid_ds结构中,调用者必须要有读权限。 |
之前在释放共享内存时,我们就已经使用过了 shmctl ,给参数2传入的是 IPC_RMID,表示释放共享内存,调用者必须是共享内存的创建者,或者是特权用户。除此之外,还可以给参数2传递以下动作:
IPC_SET
?在进程有足够权限的前提下,将共享内存的当前关联值设置为?buf
?数据结构中的值buf
?就是共享内存的数据结构,可以使用?IPC_STAT
?获取,也可以使用?IPC_SET
?设置。
获取共享内存的属性时,需要具备相应的权限,这里先设置一下:
const mode_t mode = 0666;
// 创建共享内存
int createShm(key_t key, int size)
{
return createShmHelper(key, size, IPC_CREAT | IPC_EXCL | mode);
}
?server.cc
int main()
{
// 创建key
key_t key = getKey();
cout << "server key: " << toHex(key) << endl;
// 服务端创建共享内存
int shmid = createShm(key, gsize);
cout << "server shmid: " << shmid << endl;
// 将自己与共享内存关联起来
char *start = attachShm(shmid);
// 获取共享内存数据结构信息
struct shmid_ds ds;
int n = shmctl(shmid, IPC_STAT, &ds);
if (n != -1)
{
cout << "perm: " << toHex(ds.shm_perm.__key) << endl;
cout << "create pid: " << ds.shm_cpid << " : " << getpid() << endl;
}
// 去关联
detachShm(start);
// 删除共享内存
//shmctl(shmid, IPC_RMID, NULL);
//cout << "共享内存" << shmid << "已删除" << endl;
return 0;
}
通过程序证明了?共享内存确实有自己的数据结构
证明结论:?共享内存 = 共享内存的内核数据结构(struct shmid_ds) + 真正开辟的内存空间
当两个进程与同一块共享内存成功关联后,可以直接对该区域进行读写操作,就像?父子进程读取同一个数据一样,不过不能进行写入,因为会发生 写时拷贝 机制,拷贝共享数据
但共享内存就不一样了,真·共享,不会发生 写时拷贝。
简单使用共享内存流程如下:
为了使操作更加简洁,可以将 common.h 中的代码封装为一个类,创建、关联、去关联等操作一气呵成。
common.h
#include <iostream>
#include <cerrno>
#include <cstring>
#include <unistd.h>
#include <assert.h>
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/shm.h>
using namespace std;
#define PATHNAME "." // 项目名
#define PROJID 0x0001 // 项目编号
#define SERVER 1
#define CLIENT 0
class Shm
{
public:
Shm(int type):_type(type)
{
// 获取共享内存唯一标识
_key = getKey();
// 根据不同的身份,创建或者打开共享内存
if (_type == SERVER) _shmid = createShm(_key, gsize);
else _shmid = getShm(_key, gsize);
// 关联共享内存
_start = attachShm(_shmid);
}
~Shm()
{
detachShm(_start);
if(_type == SERVER)
{ // 删除共享内存
shmctl(_shmid, IPC_RMID, NULL);
cout << "共享内存" << _shmid << "已删除" << endl;
}
}
char* getStart() { return _start; }
int getShmid() const { return _shmid; }
void *getStart() const { return _start; }
// 获取共享内存唯一标识
key_t getKey()
{
key_t key = ftok(PATHNAME, PROJID);
if (key == -1)
{
cerr << errno << ":" << strerror(errno) << endl;
exit(1);
}
return key;
}
// 创建共享内存
int createShm(key_t key, int size)
{
return createShmHelper(key, size, IPC_CREAT | IPC_EXCL | mode);
}
// 获取共享内存
int getShm(key_t key, int size)
{
return createShmHelper(key, size, IPC_CREAT);
}
// 关联共享内存
char* attachShm(int shmid)
{
char *start = (char*)shmat(shmid, nullptr, 0);
return start;
}
// 去关联共享内存
void detachShm(char *start)
{
int n = shmdt(start);
assert(n != -1);
(void)n;
}
protected:
static const int gsize = 4096;
static const mode_t mode = 0666;
// 将十进制转换成十六进制
string toHex(int x)
{
char buffer[64];
snprintf(buffer, sizeof(buffer), "0x%x", x);
return buffer;
}
// 共享内存助手
int createShmHelper(key_t key, int size, int flag)
{
int shmid = shmget(key, size, flag);
if(shmid == -1)
{
cerr << errno << ":" << strerror(errno) << endl;
exit(2);
}
return shmid;
}
private:
key_t _key;
int _shmid = 0;
char *_start;
int _type; // 身份标识符,用来区分服务端跟客户端
};
?server.cc
#include "common.hpp"
int main()
{
Shm shm(SERVER);
char *start = shm.getStart();
while (true)
{
cout << "client->server#" << start << endl;
// 读取到第26个字母后,退出服务端
if(strlen(start) == 26) break;
sleep(1);
}
return 0;
}
?client.cc
#include "common.hpp"
int main()
{
Shm shm(CLIENT);
char *start = shm.getStart();
// 开始通信
int n = 0;
cout << "client sent#";
fflush(stdout);
while (n < 26)
{
start[n] = ('A' + n);
printf("%c", start[n]);
fflush(stdout);
n++;
start[n] = '\0';
sleep(1);
}
cout << endl;
return 0;
}
注意:
fflush
?手动刷新?printf
?的缓冲区。共享内存通信快的秘籍在于?减少数据拷贝(IO
),IO
是很慢、很影响效率的
比如在使用管道通信时,需要经过以下几个步骤:
A
?中读取数据(IO
)IO
)IO
)B
(IO
)但共享内存就不一样,直接访问同一块内存进行数据读写。
在使用共享内存通信时,只需要经过以下两步:
A
?直接将数据写入共享内存中B
?直接从共享内存中读取数据显然,使用共享内存只需要经过?2
?次?IO
?所以共享内存的秘籍是?减少拷贝(IO
)次数
共享内存这么快,为什么不直接只使用共享内存呢?
因为快是要付出代价的,因为 “快” 导致共享内存有以下缺点:
总的来说,共享内存没有任何的保护机制,不加规则限制的共享内存是不推荐使用的。
当然可以利用其他通信方式,控制共享内存的写入与读取规则
A
?写完数据后,才通知进程?B
?读取B
?读取后,才通知进程?A
?写入假如是多端写入、多端读取的场景,则?可以引入生产者消费者模型,加入互斥锁和条件变量等待工具,控制内存块的读写。
在 System V 通信标准中,还有一种通信方式:消息队列;随着时代的发展,这些陈旧的标准都已经较少使用了,但作为 IPC 中的经典知识,我们可以对其做一个简单了解,扩展?IPC 的知识栈。
消息队列(Message Queuing)是一种比较特殊的通信方式,它不同于管道与共享内存那样借助一块空间进行数据读写,而是?在系统中创建了一个队列,这个队列的节点就是数据块,包含类型和信息。
遍历消息队列时,存数据块?还是?取数据块?取决于?数据块中的类型?type
?
注意:?消息队列跟共享内存一样,是由操作系统创建的,其生命周期不随进程,因此在使用结束后需要删除
同属于?System V?标准,消息队列也有属于自己的数据结构
可以通过 man msgctl 查看函数使用手册,其中就包含了?消息队列?的数据结构信息 。注:msg
?表示?消息队列:
struct msqid_ds
{
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages in queue */
msglen_t msg_qbytes; /* Maximum number of bytes allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};
和?共享内存?一样,其中 struct ipc_perm 中存储了?消息队列的基本信息,具体包含内容如下:
struct ipc_perm
{
key_t __key; /* Key supplied to msgget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
论标准的重要性,消息队列的大小接口风格与共享内存一致,消息队列的使用也涉及很多的系统调用,都是出自?System V?标准。
?关于?msgget 函数
组成部分 | 含义 |
---|---|
返回值 int? | 创建成功返回消息队列的 msgid ,失败返回?-1 |
参数1 key_t key | 创建共享内存时的唯一 key 值,通过函数计算获取 |
参数2 int msgflg | 位图,可以设置消息队列的创建方式及创建权限 |
?与?共享内存?的 shmget 可以说是十分相似了,关于 ftok 函数计算 key 值,这里也不再阐述。
?简单使用函数?msgget 创建?消息队列,并使用 ipcs -q 指令查看资源情况:
#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
using namespace std;
int main()
{
//创建消息队列
int n = msgget(ftok("./", 668), IPC_CREAT | IPC_EXCL | 0666);
if(n == -1)
{
cerr << "msgget fail!" << endl;
exit(1);
}
return 0;
}
程序运行后,创建出了一个 msqid 为?1
?的消息队列
因为此时并?没有使用消息队列进行通信,所以已使用字节?used-bytes
?和 消息数?messages
?都是?0。
注意:
IPC_CREAT
、IPC_EXCL
、权限
?等信息释放指令:ipcrm -q msqid?释放消息队列,其他 System V 通信资源也可以这样释放:
释放函数:msgctl?(msqid, IPC_RMID, NULL)
?释放指定的消息队列,跟 shmctl 删除共享内存一样:
关于?msgctl?函数
组成部分 | 含义 |
---|---|
返回值 int? | 成功返回?0 ,失败返回?-1 |
参数1?int msqid | 待控制的消息队列?id |
参数2?int cmd | 控制消息队列的具体动作,同样是位图 |
参数3 struct msqid_ds *buf | 用于获取或设置所控制消息队列的数据结构 |
?简单回顾下参数2部分可传递参数:
IPC_RMID
?表示删除共享内存IPC_STAT
?用于获取或设置所控制共享内存的数据结构IPC_SET
?在进程有足够权限的前提下,将共享内存的当前关联值设置为?buf
?数据结构中的值同样的,消息队列 = 消息队列的内核数据结构(struct msqid_ds) + 真正开辟的空间
利用消息队列发送信息,即?将信息打包成数据块,入队尾,所使用函数为?msgsnd
关于 msgsnd 函数
参数2 表示待发送的数据块,这显然是一个结构体类型,需要自己定义,结构如下:
struct msgbuf
{
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
mtype 就是传说中数据块类型,据发送方而设定;mtex 是一个比较特殊的东西:柔性数组,其中存储待发送的?信息,因为是?柔性数组,所以可以根据?信息?的大小灵活调整数组的大小。
消息发送后,总得接收吧,既然发送是往队尾中添加数据块,那么接收就是?从队头中取数据块,假设所取数据块为自己发送的,那么就不进行操作,其他情况则取出数据块,使用?msgrcv 函数接收信息:
?关于 msgrcv 函数:
同样的,接收的数据结构如下所示,也包含了?类型?和?柔性数组
struct msgbuf
{
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};
消息队列 的大部分接口都与 共享内存 近似,所以掌握 共享内存 后,即可快速上手 消息队列
但是如你所见,System V 版的 消息队列 使用起来比较麻烦,并且过于陈旧,现在已经较少使用了,所以我们不必对其进行深究,知道个大概就行了。
信号量 (semaphore)一种特殊的工具,主要用于实现?同步和互斥。
在正式学习?信号量?相关知识前,需要先简单了解下?互斥相关四个概念,为后续?多线程中信号量的学习作铺垫(重点)
1、并发
?是指系统中同时存在多个独立的活动单元
2、互斥
?是指同一时刻只允许一个活动单元使用共享资源
3、临界资源 与 临界区,多执行流环境中的共享资源就是 临界资源,涉及 临界资源 操作的代码区间即 临界区
4、原子性:只允许存在 成功 和 失败 两种状态
所以?互斥?是为了解决?临界资源 在多执行流环境中的并发访问问题,需要借助?互斥锁 或 信号量?等工具实现?原子操作,实现?互斥。
关于互斥锁 (mutex) 的相关知识在?多线程?中介绍,现在先来学习?信号量,搞清楚它是如何实现?互斥?的
将整个程序看作现实世界,形色各异的人看作?执行流,电影院 等公共资源看作?临界区,而单场电影的电影票看作?临界资源,主角?信号量?就是电影院中单场电影余票的?计数器,即余票越多,计数器值越大,当有人买票时,计数器?-1
,当有人看完电影时,计数器?+1;
当电影票卖完时,计数器归零,其他想看电影的人也无法购票观看本场电影
下面这些情况应运而生:
信号量?的设计初衷也是如此,就是为了避免 因多执行流对临界资源的并发访问,而导致程序运行出现问题。
因为电影院一次能容纳几十个人,所以可能不太好理解?互斥?这个概念,将场景特殊化,现在有一个?顶级VIP放映室,每天饮料零食随便吃,但?一次只允许一个人看电影,与普通电影院一样,这个?顶级VIP放映室?也有自己的售票系统,其本质同样是?计数器,但此时?计数器初始值为?1。
所以:当一群人都想进这个顶级VIP放映室看电影时,必须等到 计数器 为?1
?时,才能进行抢票,才有资格进去看电影,当然一次只能放一个人进去,同时计数器是否恢复?1
,取决于上一个看电影的人是否出了放映室 -> 看电影结束 -> 计数器?+1
规定:只允许一个人看电影
透过现象看本质,在?顶级VIP看电影?不就是代码中?多个执行流对同一个临界资源的互斥访问吗??此时的?信号量?可以设为?1
,确保?只允许一个执行流进行访问,这种?信号量?被称为?二元信号量,常用来实现 互斥。
综上所述,信号量本质上就是 计数器?count
,所谓的?P
?操作(申请)就是在对?count--
,V
?操作(归还)则是在对?count++。
下面来看看?信号量?的数据结构,通过 man semctl 进行查看
注:sem?表示?信号量
struct semid_ds
{
struct ipc_perm sem_perm; /* Ownership and permissions */
time_t sem_otime; /* Last semop time */
time_t sem_ctime; /* Last change time */
unsigned long sem_nsems; /* No. of semaphores in set */
};
System V
?家族基本规矩,struct ipc_perm 中存储了?信号量的基本信息,具体包含内容如下:
struct ipc_perm
{
key_t __key; /* Key supplied to semget(2) */
uid_t uid; /* Effective UID of owner */
gid_t gid; /* Effective GID of owner */
uid_t cuid; /* Effective UID of creator */
gid_t cgid; /* Effective GID of creator */
unsigned short mode; /* Permissions */
unsigned short __seq; /* Sequence number */
};
显然,无论是 共享内存、消息队列、信号量,它们的 ipc_perm 结构体中的内容都是一模一样的,结构上的统一可以带来管理上的便利,具体原因可以接着往下看
信号量的申请比较特殊,一次可以申请多个信息量,官方称此为?信号量集,所使用函数为 semget
关于 semget?函数
除了参数2,其他基本与另外俩兄弟一模一样,实际传递时,一般传 1,表示只创建一个?信号量
使用函数创建?信号量集,并通过指令 ipcs -s 查看创建的?信号量集?信息:
#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
using namespace std;
int main()
{
//创建一个信号量
int n = semget(ftok("./", 668), 1, IPC_CREAT | IPC_EXCL | 0666);
if(n == -1)
{
cerr << "semget fail!" << endl;
exit(1);
}
return 0;
}
?程序运行后,创建了一个?信号量集,nsems?为?1
,表示在当前?信号量集?中只有一个?信号量
注意:
IPC_CREAT
、IPC_EXCL
、权限
?等信息指令释放:直接通过指令 ipcrm -s semid 释放信号量集:
?通过函数释放:semctl(semid, semnum, IPC_RMID)
,信号量中的控制函数有一点不一样
关于 semctl 函数
注意:
1
?开始编号信号量的操纵比较ex,也比较麻烦,所以仅作了解即可
使用 semop 函数对?信号量?进行诸如?+1
、-1
?的基本操作
关于?semop 函数
组成部分 | 含义 |
---|---|
返回值?int | 成功返回?0 ,失败返回?-1 |
参数1?int semid | 待操作的信号量集?id |
参数2?struct sembuf *sops | 一个比较特殊的参数,需要自己设计结构体 |
参数3?unsigned nsops | 可以简单理解为信号量编号 |
?重点在于参数2,这是一个结构体,具体成员如下:
unsigned short sem_num; /* semaphore number */
short sem_op; /* semaphore operation */
short sem_flg; /* operation flags */
其中包含信号量编号、操作等信息,需要我们自己设计出一个结构体,然后传给 semop 函数使用
可以简单理解为:sem_op 就是要进行的操作,如果将 sem_op 设为 -1,表示信号量 -1(申请),同理 +1 表示信号量 +1(归还)
sem_flg 是设置动作,一般设为默认即可.
当然这些函数我们不必深入去研究,知道个大概就行了
信号量 是实现 互斥 的其中一种方法,具体表现为:资源申请,计数器 -1,资源归还,计数器 +1,只有在计数器不为 0 的情况下,才能进行资源申请,可以设计 二元信号量 实现 互斥
System V 中的 信号量 操作比较麻烦,但 信号量 的思想还是值得一学的,等后面学习 多线程 时,也会使用 POSIX 中的 信号量 实现 互斥,相比之下,POSIX 版的信号量操作要简单得多,同时应用也更为广泛。
因为 信号量 需要被多个独立进程看到,所以 信号量 本身也是 临界资源,不过它是 原子 的,所以可以用于 互斥
多个独立进程看到同一份资源,这就是 IPC 的目标,所以 信号量 被划分至进程间通信中
不难发现,共享内存、消息队列、信号量的数据结构基本一致,并且都有同一个成员?struct ipc_perm
,所以实际对于?操作系统?来说,对?System V
?中各种方式的描述管理只需要这样做:
这样一来,操作系统可以只根据一个地址,灵活访问?两个结构体中的内容,比如?struct ipc_perm shm_perm?和?struct shmid_ds,并且操作系统还把多种不同的对象,描述融合入了一个?ipc_id_arr?指针数组中,真正做到了?高效管理。
注:默认?ipc_id_arr[n]?访问的是?struct ipc_perm?中的成员
注:上述图示只是一个草图,目的是为了辅助理解原理,并非操作系统中真实样貌
操作系统在进行比较判断时,如何判断类型呢?
?通过下标(id
) 访问对象,这与文件系统中的机制不谋而合,不过实现上略有差异,间接导致?System V
?的管理系统被边缘化(历史选择了文件系统)
shmid、msqid 和 semid 都是??ipc_id_arr?的下标,为什么值很大呢?
id % 数组大小
?进行转换,确保不会发生越界,事实上,这个值与开机时间有关,开机越长,值越大,当然到了一定程度后,会重新轮回将内核中的所有 ipc 资源统一以数组的方式进行管理
- 假设想访问具体?ipc 中的资源,可以通过?ipc_id_arr[n]?强转为对应类型指针,再通过?
->
?访问其中的其他资源以上方法就是?多态,通过父类指针,访问成员