目录
1.2. 进程间通信的必要性 --- 为什么要有进程间通信?
进程间通信(Inter-process Communication,简称IPC)是指在操作系统中,不同的进程之间进行信息交换和数据共享的一种机制。在多进程的环境中,通常需要进程之间相互协作,共同完成任务。为了实现进程之间的互相交流和数据传输,操作系统提供了各种的进程间通信技术和机制。
以前我们写的代码都是单进程的,那么也就无法使用并发能力, 更加无法实现多进程协同。注意:进程间通信不是目的,而是我们的手段,通过进程间通信这种手段实现多进程协同这种目的。
具体来说:
进程间通信(IPC)是指不同进程之间进行信息交换和数据共享的机制,它在多进程编程中起着非常重要的作用。以下是一些需要进程间通信的情况和原因:
1. 数据共享:有时候多个进程需要访问和共享同一份数据,通过进程间通信可以实现数据的共享和协作处理。例如,一个进程负责读取数据,另一个进程负责处理数据,它们之间需要进行数据交换和共享。
2. 任务协作:多个进程可以协作完成一个复杂的任务,各个进程负责不同的部分,通过进程间通信来交换任务的进度、结果和状态信息,实现任务的协同工作。
3. 并发控制:在多进程环境下,如果多个进程共享同一资源,需要进行并发控制,避免出现数据冲突和竞争条件。进程间通信可以提供同步和互斥机制,确保多个进程安全地对共享资源进行访问。
4. 提高系统的灵活性和可扩展性:通过进程间通信,不同的进程可以在不同的计算机或服务器上运行,实现分布式计算。这样可以更好地利用计算资源,提高系统的灵活性和可扩展性。
5. 任务分离和模块化设计:将一个大型系统分解为多个独立的进程,每个进程负责不同的模块或功能,通过进程间通信来进行模块间的协调和通信,有助于提高系统的可维护性和可扩展性。
总的来说,进程间通信允许不同的进程之间进行信息交流和数据共享,实现任务的协同工作和共享资源,在多进程编程中起到了至关重要的作用,提高了系统的灵活性、可扩展性和并发性。
我们知道,进程是具有独立性的!而进程的独立性主要体现在进程的地址空间和页表上,确保了进程的内核数据结构、代码和数据的独立性。
具体来说,进程的独立性是通过以下方式实现的:
1. 地址空间隔离:每个进程拥有独立的虚拟地址空间,包括代码区、数据区、堆区和栈区等。每个进程的地址空间都是从零开始的,地址范围不会相互重叠,因此进程的数据和代码互不干扰。
2. 页表映射:进程的地址空间通过页表进行映射,将虚拟地址转换为物理地址。每个进程都有自己的页表,根据进程的需要进行页表的建立和维护。这样每个进程访问内存时,可以根据自己的页表映射进行地址转换,实现进程间的地址隔离。不同进程之间的页表是独立的,因此进程的内核数据结构、代码和数据也具有独立性。
总结起来,进程的独立性是通过地址空间和页表的方式实现的,每个进程都有自己独立的虚拟地址空间和页表映射,使得进程间的内核数据结构、代码和数据都可以独立运行和访问,确保了进程的独立性和隔离性。
进程具有独立性的好处就是:每个进程都有自己的内存空间和资源,互不干扰。如果一个进程崩溃或出现问题,不会影响其他进程的正常运行,提高了系统的稳定性和可靠性。
进程具有独立性的坏处就是:进程间通信成本高。正因为由于进程之间的独立性,要进行数据交换和通信则需要使用额外的机制,如管道、消息队列、共享内存等,这增加了系统的复杂性和开发成本。
进程间通信的前提是: 需要让不同的进程看到同一份资源,这里的资源我们可以简单理解为一段内存空间,这段内存空间是由特定的结构组织的; 那么这个所谓的同一块 "内存空间"隶属于谁呢? 是属于某一个进程吗?答案是:不能隶属于某一个检查。由于进程具有独立性!因此如果这块内存隶属于某一个进程,那么该内存就不能再隶属于其他进程,因此,理论上这块内存不能隶属与任何一个进程,而更应该强调共享!!!
匿名管道:
匿名管道(Pipe):
匿名管道是一种半双工的通信机制,用于具有亲缘关系的父子进程间通信。一个管道有一个读端和一个写端。例如父进程可以将数据写入管道的写端,子进程则可以从管道的读端读取数据。管道使用文件描述符进行操作,通过写入和读取文件描述符对应的管道来实现进程间的通信。命名管道:
命名管道(Named Pipe):
命名管道也是一种半双工的通信机制,但不需要具有亲缘关系的进程间通信。命名管道将数据写入一个特定的文件路径,并可以由其他进程从该文件路径读取数据。该文件路径在文件系统中存在,多个进程可以通过它在命名管道中进行通信。
1.1. System V?共享内存:共享内存允许多个进程共享同一块内存区域,以实现高效的数据共享。进程可以通过将这块内存映射到它们自己的虚拟地址空间中,从而直接读写该内存区域。共享内存的通信速度快,适用于大量数据的高速传输。
原理:进程通过使用 shmget() 系统调用创建一个共享内存标识符,然后使用 shmat() 系统调用将共享内存连接到它们的地址空间。通过对共享内存进行读写操作,进程可以进行数据交换和共享。使用 shmdt() 系统调用将共享内存从进程内部地址空间中分离,最后,通过shmctl() 系统调用将该共享内存进行释放,回收其资源。
1.2. System V 消息队列:消息队列是一种进程间通信机制,通过在队列中传递消息来实现进程间的信息传递。每个消息具有一个特定的类型和相关的数据,接收进程可以按照特定的类型从队列中读取到相应的消息。消息队列适用于不连续的、小量数据的通信。
原理:进程使用 msgget() 系统调用创建或打开一个消息队列,并使用 msgsnd() 将消息发送到队列中,接收进程则使用 msgrcv() 从队列中读取消息。每个消息都有一个标志位以及一个消息类型来确定消息的特性和接收的顺序。使用 msgctl() 进行消息队列的删除和清理。
1.3. System V?信号量:信号量是一种用于进程同步和互斥的机制,在多个进程之间共享一个计数器。通过对计数器的操作,进程可以实现对共享资源的访问控制,避免竞争条件和数据冲突。
原理:进程使用 semget() 系统调用创建或取得一个信号量集合,并使用 semctl() 进行对信号量的控制。进程可以使用 semop() 系统调用来对信号量进行P操作(锁定)和V操作(解锁),从而实现对共享资源的同步和互斥访问。
POSIX 标准的进程间通信,我们在多线程部分详谈,在这里简单了解即可!
消息队列
共享内存
信号量
互斥锁
条件变量
读写锁
管道在生活中是很常见的。例如天然气管道、石油管道、自来水管道等等;管道有一个非常显著的特定:管道是有入口、有出口的!而今天,我们简化一下在计算机领域的管道:该管道只有唯一的入口、唯一的出口!
管道都是单向通信或者单向传输 "资源" 的,简而言之,管道传输的都是资源!
计算机中的管道传输的也是资源!而在计算机领域内,数据是最重要的资源。
计算机通信领域的设计者,设计了一种单向通信的方式,称之为管道!
因此,不要再问为什么管道是单向通信了!因为管道的设计者就是用单向通信的方式设计出管道的!!!
管道通信本质上是进程通过管道这种方式进行通信!进程我们有了一定的了解!每个进程都会有一个PCB,PCB里有一个指针,即struct files_struct* files; 该指针指向一张表(struct files_struct),其表中有一个指针数组,即struct file* fd_array[],该数组中的每一个元素都是指向打开文件结构体(即 struct file?)的指针。如下:
?
通过内核数据结构 struct file 中的 struct address_space* f_mapping,我们可以找到相应的内核缓冲区。在Linux源码的 fs.h 有关于struct file的定义,具体如下:
struct file {
/*
* fu_list becomes invalid after file_free is called and queued via
* fu_rcuhead for RCU freeing
*/
union {
struct list_head fu_list;
struct rcu_head fu_rcuhead;
} f_u;
struct path f_path;
#define f_dentry f_path.dentry
#define f_vfsmnt f_path.mnt
const struct file_operations *f_op;
spinlock_t f_lock; /* f_ep_links, f_flags, no IRQ */
atomic_long_t f_count;
unsigned int f_flags;
fmode_t f_mode;
loff_t f_pos;
struct fown_struct f_owner;
const struct cred *f_cred;
struct file_ra_state f_ra;
u64 f_version;
#ifdef CONFIG_SECURITY
void *f_security;
#endif
/* needed for tty driver, and maybe others */
void *private_data;
#ifdef CONFIG_EPOLL
/* Used by fs/eventpoll.c to link all the hooks to this file */
struct list_head f_ep_links;
#endif /* #ifdef CONFIG_EPOLL */
struct address_space *f_mapping;
#ifdef CONFIG_DEBUG_WRITECOUNT
unsigned long f_mnt_write_state;
#endif
};
1、该进程分别以读写方式打开同一个文件!!!那么如下:
注:为了简化图,将0、1、2文件描述符相关联的文件就省略了。
2、fork创建子进程!!!?那么如下:
对上图的解读:
众所周知,进程是具有独立性的;因此,当 fork 创建子进程后,子进程会以父进程为模板创建与进程相关的内核数据结构 (包含PCB、地址空间、页表等等);而代码和数据会以写实拷贝的方式与父进程共享!!!
具体到上图,当 fork?创建子进程后,子进程需要有自己的内核数据结构,包含PCB、地址空间、页表等等,而我们也说了,PCB中有一个指针即 struct files_struct* files,该指针指向一张表,该表中有一个指针数组 fd_array ,该数组的每一个元素都是一个 struct file*,struct file 是一个内核数据结构,其用于描述一个被打开的文件。因此,当fork后,这张表,即 struct files_struct 也需要子进程独有一份,因此子进程也会以父进程为模板创建该表,该表的内容与父进程一致(子进程创建该结构时内容是一致的,后序就有可能不一致了)!
而对于 struct file 来说,子进程需要创建一份自己独有吗??? 答案是:不需要,因为 fork 是创建子进程,子进程只会创建与进程强相关的内核数据结构,而 struct file 该内核结构是属于文件管理模块的,并不归属于进程管理模块。因此此时 struct file 是不会被拷贝的,此时父子进程指向的就是同一个文件对象!!!
3、规定方向,达到单向数据通道的目的,即父子进程关闭自己不需要的文件描述符;例如:假如我要求,父进程读数据、子进程写数据;那么我们就需要关闭掉父进程的写端,关闭掉子进程的读端!!!如下:
经过上面的操作,此时就可以让不同的进程看到同一份 "资源"?,即同一份内存空间!通过让一个进程读,让另一个进程写,以达到单向通信的目的。而这种通信方案就是管道!!!
经过上述描述,你是不是想告诉我,管道就是文件呢??? 没错,管道就是文件!!!众所周知,Linux下一切皆文件,因此管道本质上就是一个文件!!两进程在通信的时候,需不需要将管道文件的将数据刷新到磁盘呢 ? 答案是:完全没有必要。将数据刷新到磁盘这种动作我们称之为落盘或者持久化!但这个动作对于通信双方的进程而言,没有任何意义!这还不是最主要的,最关键的是磁盘是外设,相较于CPU和内存而言,处理数据的效率太低了,因此,管道通信是不会将管道文件的数据刷新到磁盘上的,管道通信是一个内存级别通信的!几乎所有的通信方式都是内存级别的通信,因为这样效率是最高的!!!Linux 系统会在内存中维护一个管道缓冲区,管道的读取和写入操作都是对这个缓冲区进行的,数据传输完毕之后,缓冲区中的数据就会被释放。
在OS中,可以创建一个文件对象,这个文件对象不需要名字,也不需要在磁盘上的文件实体,可以吗??? 当然是可以的!!因此在内存中的 struct file 内核结构对象不一定在磁盘上有对应的实体。struct file 内核数据结构都是OS帮助你创建的,因此操作系统中可以创建一个没有名字、没有对应磁盘上文件实体的文件对象。它是在内存中打开的,仅存在于操作系统的内部数据结构中,即匿名管道本质上是一个内存级别的文件,没有在磁盘上对应的实体。
首先,我们看看以下 pipe 这个系统调用:
man 2 pipe
NAME
pipe - create pipe
SYNOPSIS
#include <unistd.h>
int pipe(int pipefd[2]);
DESCRIPTION
pipe() creates a pipe, a unidirectional data channel
that can be used for interprocess communication
pipefd[0] refers to the read end of the pipe
pipefd[1] refers to the write end of the pipe.
RETURN VALUE
On success, zero is returned.
On error, -1 is returned, and errno is set appropriately.
通过匿名管道实现进程间通信的示例代码:
void Test1(void)
{
// create pipe
int pipefd[2] = {0};
int ret = pipe(pipefd);
assert(ret != 1);
(void)ret;
#ifdef DEBUG
std::cout << "pipefd[0] : " << pipefd[0] << std::endl; // expected value 3
std::cout << "pipefd[1] : " << pipefd[1] << std::endl; // expected value 4
#endif
// create a child process
pid_t id =fork();
assert(id != -1);
// If i want the child process to read data
// and the parent process to write data
// then i should close the write end of the child
// process and the read end of the parent process
if(id == 0)
{
// child process
close(pipefd[1]); // close the write end of the child process
std::string str;
char buffer[1024] = {0};
while(true)
{
ssize_t read_ret = read(pipefd[0],buffer,sizeof buffer - 1);
assert(read_ret != -1);
buffer[read_ret] = 0;
std::cout << "child process[ " << getpid() << " ] get a message of parent process [ " << getppid() << " ]: " << buffer << std::endl;
}
close(pipefd[0]);
exit(0);
}
// parent process
close(pipefd[0]); // close the read end of the parent process
char message[1024]= {0};
int num = 1;
int size_all = 0;
const char* str = "hello,you can see me";
while(true)
{
snprintf(message,sizeof message,
" [%d:%d] : %s",num++,size_all,str);
ssize_t write_ret = write(pipefd[1],message,strlen(message));
assert(write_ret != -1);
size_all += write_ret;
sleep(1);
}
waitpid(id,NULL,0);
close(pipefd[1]);
}
1、 匿名管道是用来进行具有血缘关系的进程的进程间通信,常用于父子通信。
2、 管道实现了进程间协同,提供了访问控制。例如,子进程读取管道数据,父进程向管道写入数据时,如果此时管道没有数据,那么子进程会等待父进程向管道文件写入数据 (子进程会在 read() 系统调用内部阻塞),同理,如果父进程向管道中写入数据时,如果管道已满,父进程会暂停写入,等待子进程读取数据后再继续写入 (父进程会在 write() 系统调用内部阻塞)!这种机制确保了进程间的同步和数据的可靠传输。
3、?管道提供的是面向字节流的通信服务。管道中传输的数据是以字节流的形式进行的,没有记录边界。在管道中,数据被视为一系列字节,按照先入先出(FIFO)的顺序进行传输。无论是多长的数据还是多少个数据分组,在传输过程中都被视为连续的字节流。这种面向字节流的特性意味着管道中的数据没有固定的消息边界,接收方需要根据自己的逻辑进行字节的拼接和解析,从而还原出有意义的消息。
4、?管道是基于文件系统的,文件的生命周期是随进程的,管道的生命周期是随(父子)进程的。
5、 管道是单向通信的,就是半双工通信的一种特殊情况。具体来说,当一个进程向管道中写入数据时,另一个进程可以从管道中读取数据,但它不能再向管道中写入数据,否则就会产生错误。反之,当一个进程从管道中读取数据时,另一个进程可以继续向管道中写入数据,但它不能再从管道中读取数据。这种通信我们称之为半双工通信!!!
写端快,读端慢这种情况是基于管道提供的访问控制。当匿名管道被写满的时候,那么此时写端就会阻塞,需要等待读端读取管道数据!验证这个现象,我们还可以顺便测试管道的大小!思路就是,让写端一直写,统计写的字节个数,当写满后,写端会阻塞等待,等待读端读取管道数据。而当写端阻塞等待后,其统计的字节个数就是管道的大小,如下:
void Test2(void)
{
// create pipe
int pipefd[2] = {0};
int ret = pipe(pipefd);
assert(ret != 1);
(void)ret;
// create a child process
pid_t id =fork();
assert(id != -1);
// If i want the child process to read data
// and the parent process to write data
// then i should close the write end of the child
// process and the read end of the parent process
if(id == 0)
{
// child process
close(pipefd[1]); // close the write end of the child process
std::string str;
char buffer[1024] = {0};
while(true)
{
sleep(1000);
ssize_t read_ret = read(pipefd[0],buffer,sizeof buffer - 1);
assert(read_ret != -1);
buffer[read_ret] = 0;
std::cout << "child process[ " << getpid() << " ] get a message of parent process [ " << getppid() << " ]: " << buffer << std::endl;
}
close(pipefd[0]);
exit(0);
}
// parent process
close(pipefd[0]); // close the read end of the parent process
int size_all = 0;
int old_size = 0;
const char* str = "X";
while(true)
{
// 每次写一个字节
old_size = size_all;
// 当将管道文件写满后,此时会在write内部阻塞
ssize_t write_ret = write(pipefd[1],str,strlen(str));
assert(write_ret != -1);
size_all += write_ret;
// 记录写入的总字节数
std::cout << "total size: " << size_all << std::endl;
}
waitpid(id,NULL,0);
close(pipefd[1]);
}
现象如下:?
可以看到,当管道被写满后,此时写端进程就会被阻塞在 write()系统调用 内部,因此我们得出管道文件的大小就是64KB(具体大小可能会略有差异,取决于不同的内核版本和系统设置)。关于 Pipe的容量大小 我们也可以查一下 man 手册,如下:
man 7 PIPE
NAME
pipe - overview of pipes and FIFOs
在man手册里面,有如下内容:
并且最重要的是我们发现此时写端的进程是处于一种阻塞状态!!!事实上,当管道被写满时,写入端的进程会在?
write()
?系统调用内部阻塞,直到管道中的数据被读取并且有足够的空间可以写入新的数据。这是因为在管道中,数据从写入端流入,流出读取端,如果读取端没有及时读取数据,那么写入端只能等待,否则就会导致管道溢出,并丢失后续写入的数据。在这种情况下,管道使用的是一种先进先出(FIFO)的数据缓存机制,一旦缓存池被填满后,写入端就会被阻塞,直到有足够的空间可以继续写入数据。当读取端读取数据后,写入端才可以在管道中继续写入数据,并且?
write()
?系统调用会返回写入的数据量。
写端慢,读端快这种情况也是基于管道提供的访问控制。当读端将管道数据读取完后,那么此时读端就会等待,等待写端向管道写入数据!!!测试代码如下:
void Test3(void)
{
// create pipe
int pipefd[2] = {0};
int ret = pipe(pipefd);
assert(ret != 1);
(void)ret;
// create a child process
pid_t id =fork();
assert(id != -1);
// If i want the child process to read data
// and the parent process to write data
// then i should close the write end of the child
// process and the read end of the parent process
if(id == 0)
{
// child process
close(pipefd[1]); // close the write end of the child process
std::string str;
char buffer[1024] = {0};
while(true)
{
std::cout << "---------------" << std::endl;
ssize_t read_ret = read(pipefd[0],buffer,sizeof buffer - 1);
assert(read_ret != -1);
buffer[read_ret] = 0;
std::cout << "child process[ " << getpid() << " ] get a message of parent process [ " << getppid() << " ]: " << buffer << std::endl;
}
close(pipefd[0]);
exit(0);
}
// parent process
close(pipefd[0]); // close the read end of the parent process
const char* str = "X";
while(true)
{
write(pipefd[1],str,strlen(str));
sleep(10);
}
waitpid(id,NULL,0);
close(pipefd[1]);
}
现象如下:
可以看到,当读端将管道数据读取完后,如果写入端没有关闭写入管道,但没有新的数据可供读取,那么?
read()
?系统调用将一直阻塞,直到有数据可供读取或者写入管道被关闭。
如果所有管道写端对应的文件描述符被关闭,那么读端中的?
read()
?系统调用将返回 0,表示读端进程已经读取完毕。这种情况通常用于标识管道的结束。当写入端关闭写入管道时,读取端可以通过检测?
read()
?系统调用返回值为 0,来判断管道中的数据是否已经读取完毕,从而终止读取操作。需要注意的是,读取端只有在读到管道被关闭的信号时,才能获取到返回值为 0。如果写入端没有关闭写入管道,而是暂时没有数据可供读取,读取端的?
read()
?系统调用会阻塞,而不会返回 0。
void Test4(void)
{
// create pipe
int pipefd[2] = {0};
int ret = pipe(pipefd);
assert(ret != 1);
(void)ret;
// create a child process
pid_t id =fork();
assert(id != -1);
// If i want the child process to read data
// and the parent process to write data
// then i should close the write end of the child
// process and the read end of the parent process
if(id == 0)
{
// child process
close(pipefd[1]); // close the write end of the child process
std::string str;
char buffer[1024] = {0};
while(true)
{
std::cout << "---------------" << std::endl;
ssize_t read_ret = read(pipefd[0],buffer,sizeof buffer - 1);
if(read_ret == 0)
{
std::cout << "write end close pipe,read return 0,read completed\n";
break;
}
assert(read_ret != -1);
buffer[read_ret] = 0;
std::cout << "child process[ " << getpid() << " ] get a message of parent process [ " << getppid() << " ]: " << buffer << std::endl;
}
close(pipefd[0]);
exit(0);
}
// parent process
close(pipefd[0]); // close the read end of the parent process
const char* str = "X";
while(true)
{
write(pipefd[1],str,strlen(str));
sleep(5);
close(pipefd[1]);
break;
}
waitpid(id,NULL,0);
close(pipefd[1]);
}
现象如下:?
?
如果所有管道读端对应的文件描述符被关闭,则 write() 系统调用会产生信号 SIGPIPE ,进而可能导致写端进程退出。
void Test5(void)
{
// create pipe
int pipefd[2] = {0};
int ret = pipe(pipefd);
assert(ret != 1);
(void)ret;
// create a child process
pid_t id =fork();
assert(id != -1);
// If i want the child process to read data
// and the parent process to write data
// then i should close the write end of the child
// process and the read end of the parent process
if(id == 0)
{
// child process
// close the write end of the child process
close(pipefd[1]);
// close the read end of the child process
close(pipefd[0]);
exit(0);
}
// parent process
close(pipefd[0]); // close the read end of the parent process
const char* str = "X";
while(true)
{
write(pipefd[1],str,strlen(str));
std::cout << "--------------------\n";
sleep(5);
}
waitpid(id,NULL,0);
close(pipefd[1]);
}
现象如下:?
从结果来看,当读端进程关闭了读端,且写端进程未关闭写端,那么此时OS会终止掉写端进程!至于OS为什么要杀掉该进程,我们也能理解,当读端进程关闭了读端,那么也就意味着管道的数据没有进程去读取了,那么也就意味着管道的数据也就没意义了,因此,此时OS终止掉写端进程也在情理之中了。
进程池(Process Pool)是一种并发编程的模型,通常用于提高多任务处理的效率。进程池是在应用程序启动时,或预先分配一定数量的进程并将它们保留在内存中等待使用。当需要创建新的进程时,可以直接从进程池中获取一个可用进程,这样可以避免重复创建和销毁进程所带来的性能消耗。
通常,进程池模型由管理进程和工作者进程两部分组成。管理进程负责创建和维护一定数量的工作者进程,并向工作者进程分配任务。工作者进程在接收到任务后执行任务,并将结果返回给管理进程。任务的分配和结果的收集通常通过进程间通信(IPC)机制来实现。
进程池模型可以提高多任务处理的效率,避免重复创建和销毁进程所产生的开销,并可通过调整进程池中进程的数量来控制系统的负载和并发度。因此,进程池常被使用在高并发、任务密集型或需要大量计算的应用场景,如网络服务器、数据处理、模型训练等。
代码如下:?
my_proc:ProcessPool.cc
g++ -o $@ $^ -std=gnu++11
.PHONY:clean
clean:
rm -ff my_proc
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <map>
#include <vector>
#include <utility>
#include <cassert>
#include <cstring>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/types.h>
enum MAX
{
PROCESS = 5
};
std::vector<std::function<void()>> BusinessLogic;
std::map<int,std::string> ShowMethodMap;
int ReadEndWaitCommand(int read_fd)
{
uint32_t command = 0;
// Require read-only four bytes
ssize_t read_ret = read(read_fd,&command,sizeof(uint32_t));
if(read_ret == 0)
{
return -1; // Read end exit
}
else if(read_ret == sizeof(uint32_t))
{
return command; // Read success
}
else
{
return -2; // Read failure
}
}
void ReadMySQL(void)
{
std::cout << "new process [" << getpid() << " ] : Execute the task of accessing the database \n";
}
void ExecuteUrl(void)
{
std::cout << "new process [" << getpid() << " ] : Execute Url parsing task \n";
}
void Cal(void)
{
std::cout << "new process [" << getpid() << " ] : Execute encryption task \n";
}
void Save(void)
{
std::cout << "new process [" << getpid() << " ] : Execute data persistence taks \n";
}
void LoadDataProcessLogic(void)
{
ShowMethodMap[BusinessLogic.size()] = "ReadMySQL"; // 0 ---> ReadMySQL
BusinessLogic.push_back(ReadMySQL);
ShowMethodMap[BusinessLogic.size()] = "ExecuteUrl"; // 1 ---> ExecuteUrl
BusinessLogic.push_back(ExecuteUrl);
ShowMethodMap[BusinessLogic.size()] = "Cal"; // 2 ---> Cal
BusinessLogic.push_back(Cal);
ShowMethodMap[BusinessLogic.size()] = "Save"; // 3 ---> Save
BusinessLogic.push_back(Save);
}
void ShowMethod(void)
{
for(auto& my_map : ShowMethodMap)
{
std::cout << my_map.first << " : " << my_map.second << std::endl;
}
}
#include "./Task.hpp"
std::vector<std::pair<pid_t,int>> ChildProc_Fd;
std::vector<int> read_fd;
void meau(void)
{
std::cout << "--------------------------------------" << std::endl;
std::cout << "--------- 1. ShowMethod -------" << std::endl;
std::cout << "--------- 2. ExecuteMethod -------" << std::endl;
std::cout << "--------- 0. Exit -------" << std::endl;
std::cout << "----------------------------------------" << std::endl;
}
void DispatchTask(void)
{
while(true)
{
sleep(1);
meau();
int option = 0;
std::cout << "input option> ";
std::cin >> option;
if(option == 1)
{
ShowMethod();
}
else if(option == 2)
{
ShowMethod();
//uint32_t Method = 0;
//std::cout << "input Method> ";
//std::cin >> Method;
//int ProcessNum = rand() % ChildProc_Fd.size();
//write(ChildProc_Fd[ProcessNum].second,&Method,sizeof(uint32_t));
// 单机版的负载均衡
// 采用随机数的方式选择一个管道,让对应的子进程执行相应业务逻辑
// 这种方式我们称之为 随机数方式的负载均衡!
int ProcessNum = rand() % ChildProc_Fd.size();
uint32_t Method = rand() % ShowMethodMap.size();
write(ChildProc_Fd[ProcessNum].second,&Method,sizeof(uint32_t));
}
else if(option == 0)
{
return ;
}
else
{
std::cout << "Illeage option" << std::endl;
}
}
}
void ProcessPool(void)
{
LoadDataProcessLogic();
for(size_t i = 0; i < PROCESS; ++i)
{
// Create pipe
int pipefd[2] = {0};
int pipe_ret = pipe(pipefd);
assert(pipe_ret == 0);
(void)pipe_ret;
// Create child process
pid_t id = fork();
// Child process
// Require child process to read data
if(id == 0)
{
// Build a unidirectional data channel
close(pipefd[1]);
// Save the read end file descriptor in advance
read_fd.push_back(pipefd[0]);
while(true)
{
int command = ReadEndWaitCommand(pipefd[0]);
if(command == -1)
{
// Write end exit,so that read end can also exit now
for(auto& fd : read_fd)
{
std::cout << "read_fd: " << fd << std::endl;
close(fd);
}
//std::cout << "read end exit" << std::endl;
exit(-1); // Write end exit lead to read end exit
}
else if(command >= 0 && command < static_cast<int>(BusinessLogic.size()))
{
// Child process execute command
BusinessLogic[command]();
}
else if(command == -2)
{
std::cout << "Illegal byte read" << std::endl;
break;
}
else
{
std::cout << "Illegal command : " << command << std::endl;
break;
}
}
close(pipefd[0]);
exit(0); // Read end normal exit
}
// Parent process write data to the pipe
close(pipefd[0]); // close read end
// Record the mapping relationship between the write end file descriptor and the child process
ChildProc_Fd.push_back({id,pipefd[1]});
}
// Parent process dispatch task
DispatchTask();
// Close all write side file descriptors
for(size_t i = 0; i < ChildProc_Fd.size(); ++i)
{
close(ChildProc_Fd[i].second);
}
// Recycle all child processes
for(size_t i = 0; i < ChildProc_Fd.size(); ++i)
{
waitpid(ChildProc_Fd[i].first,nullptr,0);
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid());
ProcessPool();
}
通过上面的理解,我们现在知道匿名管道是一个内存级别的文件,其在磁盘上没有对应的实体,也没有名字。匿名管道是父进程通过创建子进程达到通信的目的,这种通信方式有一个非常明显的局限性:即只允许父子进程进行通信,那么如果我想让没有任何关系的两个进程进行通信呢???
首先,按照我们对管道这种通信方式的理解,进程间要通信,那么前提必须是让不同的进程看到同一份资源!因此,我们需要有一个简单的认识,不管你是匿名管道亦或者是我们接下来要说的命名管道,你们通信的前提是一样的,即让不同的进程看到同一份资源;而不同之处就在于让不同的进程看到同一份资源的手段是不一样的。匿名管道是通过创建子进程,通过子进程继承的方式达到通信目的。那么命名管道是如何做到的呢?答案是:命名管道是通过让不同的进程打开同一路径下的文件的方式!!!
在磁盘上创建一个命名管道,该文件可以被多个进程打开,但不会将内存数据持久化到磁盘。这种文件就被称之为命名管道文件。
该命名管道文件有名字,因此该文件一定在系统路径中。我们知道路径具有唯一性,通信双方进程就可以通过该管道文件的路径,看到同一份资源。
命名管道文件也是一个内存级别的文件,只不过命名管道在磁盘上是有实体的,这个磁盘是的实体是有文件属性的,但就是没有文件内容。也就是说,这个命名管道文件里的数据是内存级别的数据,存在于内核的管道缓冲区中,该文件并不会将内存数据持久化到磁盘上!而不同的进程通过路径的方式锁定同一个命名管道文件(路径具有唯一性),通过双方进程读写管道的数据,进而达到进程间协同的目的。
那么如何创建命名管道呢?
man 1 mkfifo
NAME
mkfifo - make FIFOs (named pipes)
SYNOPSIS
mkfifo [OPTION]... NAME...mkfifo [OPTION]... NAME...
DESCRIPTION
Create named pipes (FIFOs) with the given NAMEs.
Mandatory arguments to long options are mandatory for short options too.
示例如下:
可以清楚的看到,此时写入端的进程即 echo 这个进程此时是处于阻塞状态的!这很好理解,因为此时命名管道中还有 echo 写入的数据,因此,需要我们读取命名管道数据后,写入端才会继续被调度,进而被终止该进程!
当读取端将命名管道的数据读取后,此时写入端就不会被阻塞了,写入端和读取端退出。?
man 3 mkfifo
NAME
mkfifo - make a FIFO special file (a named pipe)
SYNOPSIS
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
// pathname: 管道文件所在的路径
// mode: 管道文件的初始权限
RETURN VALUE
On success mkfifo() returns 0.
In the case of an error, -1 is returned
(in which case, errno is set appropriately).
man 2 unlink
NAME
unlink - delete a name and possibly the file it refers to
SYNOPSIS
#include <unistd.h>
int unlink(const char *pathname);
RETURN VALUE
On success, zero is returned.
On error, -1 is returned, and errno is set appropriately.
示例:
.PHONY:all
all:server client
server:server.cxx
g++ -o $@ $^ -std=gnu++11
client:client.cxx
g++ -o $@ $^ -std=gnu++11
.PHONY:clean
clean:
rm -f server client
#include "name_pipe.hpp"
void client_ipc_task(void)
{
int fd = open(NamePipePath.c_str(),O_WRONLY);
assert(fd != -1);
// 起始时间 1970_0_0 00:00:00 ---> 0
// 现在时间 ~~~~~~~~~~~~~ ---> time(nullptr)
std::string message; // 信息
std::string Date; // 当前时间
while(true)
{
getline(std::cin,message);
Date = TimeStampToDate(); // 获取时间的字符串
Date += "\n";
Date += message;
ssize_t write_size = write(fd, Date.c_str(), Date.size());
assert(write_size > 0);
(void)write_size;
}
// Close related file descriptors
close(fd);
}
int main()
{
client_ipc_task();
}
#include "name_pipe.hpp"
void server_ipc_task(void)
{
int ret = mkfifo(NamePipePath.c_str(),START_MODE);
assert(ret == 0);
(void)ret;
std::cout << "step 1: create a name pipe success" << std::endl;
#ifdef DEBUG_ONE
std::cout << "ret : " << ret << std::endl;
#endif
// Normal file operation
// This process serves as the read end of the pipe
int fd = open(NamePipePath.c_str(), O_RDONLY);
assert(fd != -1);
std::cout << "step 2: open name pipe success\n";
#ifdef DEBUG_TWO
std::cout << "read_fd: " << fd << std::endl;
#endif
char buffer[READ_BUFFER_MAX] = {0};
while(true)
{
ssize_t read_size = read(fd, buffer, READ_BUFFER_MAX - 1);
assert(read_size != -1);
if(read_size == 0)
{
// Write end exit lead to read end exit
break;
}
buffer[read_size] = 0;
std::cout << buffer << std::endl;
}
// Close related file descriptors
close(fd);
std::cout << "step 3: close read end file descriptors\n";
// man 2 unlink --- delete a name and possibly the file it refers to
unlink(NamePipePath.c_str());
std::cout << "step 4: delete the name pipe\n";
}
int main()
{
server_ipc_task();
}
#pragma once
#include <iostream>
#include <string>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define START_MODE 0666
const int READ_BUFFER_MAX = 1024;
std::string NamePipePath = "/home/Xq/2023_12/12_21/name_pipe";
class Date
{
public:
static int month_day[13];
Date(int year = 1970,int month = 1,int day = 1)
:_year(year)
,_month(month)
,_day(day)
{}
bool if_leap_year(int year)
{
if(year % 400 == 0 || (year % 4 == 0 && year % 100 != 0))
return true;
else
return false;
}
int get_month_day()
{
int ret = month_day[_month];
if(_month == 2 && if_leap_year(_year))
ret++;
return ret;
}
Date& operator++()
{
++_day;
int cur_month_day = get_month_day();
if(_day > cur_month_day)
{
++_month;
if(_month > 12)
{
_month = 1;
++_year;
}
_day = 1;
}
return *this;
}
Date& operator+=(int num)
{
while(num--)
{
operator++();
}
return *this;
}
std::string print()
{
std::string Date;
Date += std::to_string(_year);
Date += "年";
Date += std::to_string(_month);
Date += "月";
Date += std::to_string(_day);
Date += "日";
return Date;
}
private:
int _year;
int _month;
int _day;
};
int Date::month_day[13] = {0,31,28,31,30,31,30,31,31,30,31,30,31};
// 天数 : 剩余的秒数
std::pair<int,long long> TimeStampToDay(long long Time)
{
int day = Time / 86400;
long long second = Time % 86400;
return std::pair<int,long long>(day,second);
}
std::string TimeStampToDate()
{
Date StartDate(1970,1,1);
std::pair<int, long long> DaySecond = TimeStampToDay(time(nullptr) + 28800);
StartDate+=DaySecond.first;
std::string CurDate = StartDate.print();
int hour = DaySecond.second / 3600;
int min = (DaySecond.second - hour * 3600) / 60;
int second = DaySecond.second - hour * 3600 - min * 60;
CurDate += std::to_string(hour);
CurDate += ":";
CurDate += std::to_string(min);
CurDate += ":";
CurDate += std::to_string(second);
return CurDate;
}
可以看到,当 server 服务端创建命名管道后,此时是处于一种阻塞状态的 (阻塞在 open() 内部),需要等待 client 客户端打开该命名管道文件后,server 才会结束阻塞状态,进而打开管道文件,过程如下:
当? client 服务端打开管道文件后,此时 server 服务端就需要阻塞等待 client 服务端向管道文件写入数据,也就是说,此时的 server服务端会在 read() 内部阻塞,当命名管道有数据后,server 才会继续执行 read() ,过程如下:
当与该管道有关的所有写入端都被关闭了,那么此时读取端调用 read() 返回0,即代表读取结束,读取端进程退出,如下:
这就是我们的命名管道,如果要加深理解,建议还是自己多去尝试。
命名管道(Named Pipe)是一种特殊的进程间通信机制,它允许多个进程通过一个命名的文件来进行通信。
总结一下命名管道的特点:
1. 文件系统中的命名管道是基于文件的进程间通信机制,不同于匿名管道(局限于父子进程间的通信)。命名管道通过在文件系统中创建一个具有唯一名称的文件来实现。
2. 命名管道可以由多个进程同时打开,允许这些进程通过管道进行通信和数据传输。
3. 命名管道在创建时需要指定一个路径和管道文件名,该路径在文件系统中唯一标识了该管道,使不同的进程可以通过相同的路径来访问同一个命名管道,即路径具有唯一性。
4. 命名管道使用 FIFO(先进先出)的方式来传递数据。一个进程写入数据到管道时,另一个进程可以从管道中读取这些数据。数据在管道中是顺序传递的,遵循先进先出的原则。
5. 命名管道使用的是内存中的缓冲区作为数据传输的中介,在内核中维护了一个缓冲区来缓存数据。当管道为空或已满时,读写操作会被阻塞,直到有数据可读或缓冲区有空间可写。
6. 命名管道的数据不会持久化到磁盘,仅存在于内存中,因此只适用于临时的进程通信和数据传输,而不适合用于长期存储或持久性数据。
总体而言,命名管道提供了一种简单而有效的进程间通信方式,适用于需要多个进程之间进行数据传输和通信的场景。
进程要通信,那么前提条件就是:通信双方进程需要看到同一份资源!!!
在进程的地址空间中,堆栈之间,有一段区域,我们称之为共享区!操作系统在物理内存申请一段空间( 该空间的大小可以由用户决定 ),将该空间由通信双方进程各自页表映射到各自进程中的地址空间的共享区里,并将映射到地址空间后的起始地址返回给上层用户。那么上层可以通过访问虚拟地址空间的地址,通过各自页表映射,映射到同一块物理内存空间。此时进程间通信的前提条就具备了,而我们将这种通信方式称之为共享内存。如图所示:
那我们申请的这个共享内存,属于哪一个通信的进程呢???
答案是:这块共享内存不属于通信双方的任何一个进程!其一,众所周知,进程具有独立性,如果该共享内存属于了某一个通信的进程,那么也就是说这块资源就不能再隶属于其他的进程了,那么通过该资源进行通信就是无稽之谈了( 通信的前提:进程看到同一份公共资源 );其二,这块共享内存的提供者是操作系统。换句话说,这块共享内存属于操作系统并由操作系统对其进行管理。
共享内存这种通信方式是由操作系统为进程间通信专门设计的!
假设有很多进程通过共享内存的方式进行进程间通信,那么带来的结果就是操作系统中会存在大量的共享内存,那么操作系统需不需要管理这些共享内存呢?
答案是: 当然要管理共享内存!
那么如何管理呢? 答案是: 先描述再组织!
既然要管理,当然且必须要有相应的内核数据结构! 尽管我们自始至终没见过,但我们是可以预见的,而这就是理念的力量!
因此,你以为我们申请共享内存,操作系统只给我们创建一个共享内存块吗? 答案是:当然不是!这种理解是非常肤浅的,因此我们需要重新理解共享内存:
操作系统为了管理共享内存,因此一定需要先描述再组织!因此共享内存在内核中,除了共享内存块的内存空间,也必须要有描述共享内存的那段内存空间即对应的内核数据结构,具体为 struct shamid_ds ,也就是说,共享内存 =? 共享内存块 + 对应的共享内存的内核数据结构。
在 Linux内核源码中的 <shm.h> 有如下定义:
struct shmid_ds {
struct ipc_perm shm_perm; /* operation perms */
int shm_segsz; /* size of segment (bytes) */
__kernel_time_t shm_atime; /* last attach time */
__kernel_time_t shm_dtime; /* last detach time */
__kernel_time_t shm_ctime; /* last change time */
__kernel_ipc_pid_t shm_cpid; /* pid of creator */
__kernel_ipc_pid_t shm_lpid; /* pid of last operator */
unsigned short shm_nattch; /* no. of current attaches */
unsigned short shm_unused; /* compatibility */
void *shm_unused2; /* ditto - used by DIPC */
void *shm_unused3; /* unused */
};
shmid_ds
?结构体包含了共享内存区域的详细信息,包括:
shm_perm
:共享内存的访问权限,即读、写和执行权限。shm_segsz
:共享内存的大小,以字节为单位。shm_lpid
:上次操作此共享内存的进程的 PID。shm_cpid
:创建此共享内存的进程的 PID。shm_nattch
:当前附加的进程数。shm_atime
:上次附加时间。shm_dtime
:上次分离时间。shm_ctime
:上次更改时间。?
而这就是共享内存的前置性认识!
shmget()?系统调用会创建一个新的共享内存区域并返回该共享内存区域的标识符 (用户层) ,或者获取一个已经存在的共享内存区域的标识符 (用户层) 。具体如下:
man 2 shmget
NAME
shmget - allocates a System V shared memory segment
SYNOPSIS
#include <sys/ipc.h>
#include <sys/shm.h>
int shmget(key_t key, size_t size, int shmflg);
RETURN VALUE
On success, a valid shared memory identifier is returned.
On errir, -1 is returned, and errno is set to indicate the error.
key:是一个键值,用于标识共享内存区域(内核层)。通常可以使用?
ftok
?函数将文件路径和一个整数值转换为一个唯一的键值。size:是要创建的共享内存的大小,以字节为单位。
shmflg:?是标志参数,用于指定创建共享内存的权限和行为。
shmget
?函数的返回值是一个整数,表示共享内存的标识符 (用户层)。标识符在后续的操作中用于访问和控制共享内存。函数的行为取决于?
shmflg
?参数的设置。常用的?shmflg
?参数选项包括:
IPC_CREAT
:如果指定的键值对应的共享内存不存在,则创建新的共享内存。如果共享内存已经存在,则返回该共享内存的标识符 (用户层) 。IPC_EXCL
: 与?IPC_CREAT
?一起使用,表示如果共享内存未存在,那么创建并返回该共享内存的标识符 (用户层) 。如果共享内存已经存在,则返回错误。- 权限标志:可以通过使用?
IPC_CREAT
?等标志与权限标志进行组合。例如,可以使用?0666
?表示读写权限为所有用户。可能的错误包括:
EEXIST
:使用了?IPC_CREAT | IPC_EXCL
,但共享内存已经存在。ENOENT
:使用了?IPC_CREAT
,但没有找到对应的键值。EINVAL
:参数无效或不符合要求。ENOMEM
:无法分配足够的内存空间。
我们之前说过,操作系统中是有可能存在很多个共享内存的,那么通信的双方进程,怎么保证双方能看到并且看到的就是同一个共享内存呢?
答案是:在内核层面,是以 key 值作为共享内存的标识的。因此,我们必须让 key 具有唯一性,即通信双方的进程使用同一个 key 值,就可以找到同一个共享内存了。换句话说,key 是几不重要,只需要保证它的唯一性即可,只要 key 值相同,就代表看到的是同一个共享内存。
通信双方进程使用同样的算法规则,形成这个唯一值 key,类似于字符串 Hash 函数!
那么如何形成这个唯一值 key 呢?
ftok() 函数用于将给定的文件路径名和项目ID转换为一个用于创建唯一键值(key)的值,该键值通常用于共享内存、消息队列或信号量的创建和访问。它是 System V IPC(进程间通信)的一部分。?具体如下:
man 3 ftok
NAME
ftok - convert a pathname and a project identifier to a System V IPC key
SYNOPSIS
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
RETRUN VALUE
On success, the generated key_t value is returned.
On failure -1 is returned
ftok()?内部不调用任何系统调用,内部是一套算法,将这两个参数经过各种运算形成一个唯一值。
pathname:表示一个可访问的文件路径。并且该路径名应该是一个已存在且具有访问权限的文件。
proj_id:是一个整数参数,代表项目标识符。在?
ftok()
?函数中,proj_id
?用于生成 System V IPC key 的一部分。它的最低有效8位会与文件路径名的标识符结合,生成一个唯一的 IPC key。proj_id
?必须是一个非零的值。
在 /usr/include/sys/ipc.h?中查找 key_t:
grep -ER 'key_t' /usr/include/sys/ipc.h
结果如下:
#ifndef __key_t_defined
typedef __key_t key_t;
# define __key_t_defined
在 /usr/include/bits/types.h 中查找 __key_t:
grep -ER '__key_t' /usr/include/bits/types.h
结果如下:
__STD_TYPE __KEY_T_TYPE __key_t; /* Type of an IPC key. */
在 /usr/include/bits/typesizes.h? 中查找 __KEY_T_TYPE
grep -ER '__KEY_T_TYPE' /usr/include/bits/typesizes.h
结果如下:?
#define __KEY_T_TYPE __S32_TYPE
在 /usr/include/bits/types.h?中查找 __S32_TYPE
grep -ER '__S32_TYPE' /usr/include/bits/types.h
结果如下:?
#define __S32_TYPE int
因此本质上 key_t 的类型就是 int。
size:代表这个共享内存块的大小!
该参数是有由我们自己设置的,但是,共享内存块的大小,最好是页(PAGE:4096)的整数倍。因为操作系统管理物理内存的时候,是以页的大小即4KB(4096 bytes)为单位的。
假如你申请了 4097 bytes,那么操作系统会按照两页大小给你分配空间,那么也就意味着剩余的4095个字节你访问不了,但是又由于这段空间被申请了,那么也就意味着这份空间被浪费了。
也就是说,共享内存的大小是按照页的倍数 (向上取整) 即最小倍数来进行申请的。可是当我们将 size 设置为 4097字节,那么此时共享内存的大小并不是2页,而是4097字节,这是为什么呢? 注意:操作系统为我们申请了两页的大小,但并不代表你有权利使用这两页空间。你要 4097 bytes,那么操作系统就申请2页的大小,并将其中的4097个字节给你。因此,我们申请共享内存的空间,最好是页的整数倍,有利于避免空间的浪费,提高空间的使用率。
shmflg 最常见的两个选项: IPC_CREAT 、 IPC_EXCL
单独使用 IPC_CREAT,代表:创建共享内存,如果底层已经存在,那么获取之,并且返回其标识符 (用户层);如果不存在,创建之,并返回标识符 (用户层)。
单独使用 IPC_EXCL 是没有意义的。
一起使用 IPC_EXCL 、IPC_CREAT 即??IPC_EXCL | IPC_CREAT 代表:
如果底层不存在,创建之,并返回标识符 (用户层) ;如果底层存在,出错返回。
如果此时返回成功,那么代表着一定是一个全新的共享内存。
注意:创建一个全新的共享内存时,该共享内存块的起始访问权限也要设置。例如:
IPC_EXCL | IPC_CREAT | 0666;0666?代表所有用户具有读写访问权限。
shmget() 的返回值代表着:共享内存的用户层标识符。
而我们刚刚还说过,key 也是共享内存的标识符,那么 key 和 这个函数的返回值 有什么区别呢?
key 值 是在系统层面上标识共享内存的唯一性!
shmget() 返回值即 shmid?是在用户层面上标识共享内存的唯一性!
ipcs -m 查看共享内存的各种信息;如下
ipcs 命令用于显示系统上的 IPC(进程间通信)资源信息;-m 参数用于显式共享内存信息。
shmid: 共享内存的用户标识符。
owner:拥有该共享内存区域的进程的UID。
perms:共享内存区域的访问权限。包括读、写和执行权限,用八进制表示。
bytes: 该共享内存区域的大小,单位为字节。
nattch:当前与该共享内存区域相关联的进程数量。换句话说,目前有多少个进程正在使用该共享内存区域。 n --- 个数, attach --- 关联;
status: 共享内存区域的状态信息;
#include <iostream>
#include <cassert>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <unistd.h>
#define PATHNAME "/home/Xq/2023_12/12_24/"
#define PROJID 128
#define SHMSIZE 4096
std::string TransToHex(key_t key)
{
char value[32] = {0};
snprintf(value,32,"0x%x",key);
return value;
}
void shared_memory(void)
{
// 获取唯一值 key
key_t key = ftok(PATHNAME,PROJID);
assert(key != -1);
std::cout << "get a unique value: " << TransToHex(key) << std::endl;
// 申请共享内存
int shmid = shmget(key, SHMSIZE, IPC_CREAT | IPC_EXCL | 0666);
assert(shmid != -1);
std::cout << "create a shared memory, shmid: " << shmid << std::endl;
sleep(3);
}
现象如下:?
结论:共享内存的生命周期不是随进程的,而是随内核的。
ipcrm -m "用户层的共享内存的标识符,也就是 shmid "
注意:ipcrm -m "对应的共享内存的ID号,即shmid",删除用的是 shmid 而不是 key 值。例如:
?补充:
ipcs -q? ?查看消息队列的信息
ipcrm -q "msqid" 删除对应的消息队列
key
:?key
?是一个用于唯一标识消息队列的键值。它是一个整数值,用于在系统范围内唯一地标识一个特定的消息队列。多个进程可以使用相同的?key
?值来访问同一个消息队列。
msqid
:?msqid
?是消息队列的标识符,用于内部标识一个特定的消息队列。它在系统范围内是唯一的,用于识别操作系统内核中的消息队列。
owner
:?owner
?表示消息队列的拥有者,即创建该消息队列的进程的用户标识符或名称。它用于确定哪个进程有权访问和操作该消息队列。
perms
:?perms
?表示消息队列的权限,用于控制对消息队列的访问。它是一个数字,通常使用八进制表示,在 Linux 系统中,常见的权限包括读(4)、写(2)和执行(1)权限。例如,权限为?644
?表示拥有者可读写,其他用户只可读。
used-bytes
:?used-bytes
?表示当前消息队列中已使用的字节数。它表示消息队列中已经存储的消息所占用的空间大小。
message
:?message
?是消息队列中包含的实际消息内容。它可以是一个文本字符串、结构体或任何其他类型的数据。每个消息都有一个唯一的标识符,并按照一定的顺序存储在消息队列中,可以通过读取和写入消息队列来进行进程间的通信和数据交换。
ipcs -s 查看信号量的信息
ipcrm -s "semid" 删除对应的信号量
key
:?key
?是一个用于唯一标识信号量集的键值。它是一个整数值,用于在系统范围内唯一地标识一个特定的信号量集。多个进程可以使用相同的?key
?值来访问同一个信号量集。
semid
:?semid
?是信号量集的标识符,用于内部标识一个特定的信号量集。它在系统范围内是唯一的,用于识别操作系统内核中的信号量集。
owner
:?owner
?表示信号量集的拥有者,即创建该信号量集的进程的用户标识符或名称。它用于确定哪个进程有权访问和操作该信号量集。
perms
:?perms
?表示信号量集的权限,用于控制对信号量集的访问。它是一个数字,通常使用八进制表示,在Linux系统中,常见的权限包括读(4)、写(2)和执行(1)权限。例如,权限为?644
?表示拥有者可读写,其他用户只可读。
nsems
:?nsems
?表示信号量集中包含的信号量数量。信号量集是一组相关联的信号量,nsems
?指定了信号量集中的信号量个数,用于同步和互斥进程间的访问和操作
ipcrm -m 可以达到删除共享内存的目的,可是这种删除方式有点太麻烦了,如果我想用代码进行删除呢?? ?
shmctl() 函数用于操作和控制共享内存区域,具体如下:
man 2 shmctl
NAME
shmctl - System V shared memory control
SYNOPSIS
#include <sys/ipc.h>
#include <sys/shm.h>
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
RETURN VALUE
On success, return 0;
On error, -1 is returned, and errno is set appropriately.
shmid:共享内存的标识符 (用户层) 。
cmd:IPC控制命令。
buf:是一个指向 shmid_ds 结构体的指针,用于传递和接收共享内存区域的属性信息。
shmctl
?函数支持的命令包括:
IPC_STAT
:获取共享内存的属性信息,并将其写入到?shmid_ds
?结构体中。IPC_SET
:设置共享内存的属性,使用?shmid_ds
?结构体中的值作为属性值。IPC_RMID
:将共享内存从系统中删除,该命令特别强势,即便是有进程和当下的共享内存挂接,依旧删除共享内存。
代码实例如下:
#include <iostream>
#include <cassert>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <unistd.h>
#define PATHNAME "/home/Xq/2023_12/12_24/"
#define PROJID 128
#define SHMSIZE 4096
std::string TransToHex(key_t key)
{
char value[32] = {0};
snprintf(value,32,"0x%x",key);
return value;
}
void shared_memory(void)
{
// 获取唯一值 key
key_t key = ftok(PATHNAME,PROJID);
assert(key != -1);
std::cout << "get a unique value: " << TransToHex(key) << std::endl;
// 申请共享内存
int shmid = shmget(key, SHMSIZE, IPC_CREAT | IPC_EXCL | 0666);
assert(shmid != -1);
std::cout << "create a shared memory, shmid: " << shmid << std::endl;
sleep(3);
// 删除共享内存
// 在这里我们不需要传递和接收共享内存区域的属性信息
// 因此将 buf 这个参数设置为 nullptr
int ctl_ret = shmctl(shmid, IPC_RMID, nullptr);
assert(ctl_ret != -1);
std::cout << "delete shared memory success, ctl_ret: " << ctl_ret << std::endl;
}
现象如下:
这就是用代码删除共享内存的简单实例。
shmat (shared memory attach) 函数用于将指定的共享内存区域挂接(附加)到调用进程的地址空间。
man 2 shmat
NAME
shmat - System V shared memory operation
SYNOPSIS
#include <sys/types.h>
#include <sys/shm.h>
void *shmat(int shmid, const void *shmaddr, int shmflg);
RETURN VALUE
On success shmat() returns the address of the attached shared memory segment;
on error (void *) -1 is returned, and errno is set to indicate the cause of the error.
Retrun Value:
On success:返回你挂接的这个共享内存块的虚拟地址(该虚拟地址经过页表映射到物理空间这个共享内存块)。
On failure:return(void*)-1;
通过该返回值作为用户访问共享内存的起始地址!这个地址是虚拟地址。进程借助该虚拟地址通过页表映射可以访问到对应的物理内存上的共享内存块!!!
那么这段内存空间的大小我知道吗?我当然知道,你不是创建共享内存的时候设置了大小吗?那么此时我就可以通过虚拟地址 + 偏移量这种寻址发方案就可以访问到共享内存区域中的不同数据位置。
shmid:要挂接哪一个共享内存 ,即共享内存的标识符 (用户层);
shmaddr:是一个指向共享内存地址的指针。
- 如果?
shmaddr
?的值为?NULL
,系统会自动选择一个合适的虚拟地址用于附加共享内存。- 如果?
shmaddr
?的值不为?NULL
,那么系统将尝试将共享内存附加到指定的虚拟地址,但实际是否能成功,还受到操作系统和进程地址空间的约束。shmflg:是标志参数,用于指定附加共享内存的行为。
SHM_RDONLY
:以只读方式附加共享内存。这意味着对该内存区域的写操作将导致错误。SHM_RND
:将?shmaddr
?转换为系统页面大小的整数倍。这可以避免一些地址对齐问题。SHM_REMAP
(Linux 特定标志):重新映射共享内存,即使它已经被附加到调用进程的地址空间中。这个标志在改变共享内存的访问模式或调整地址空间布局时很有用。一般情况下我们会将?
shmflg
?设置为 0 表示以默认的方式附加共享内存。具体来说,当?shmflg
?参数为 0 时,shmat
?函数将使用默认的行为来附加共享内存。默认的行为是在可读可写的方式下附加共享内存,这意味着调用进程可以读取和修改共享内存区域中的数据。此外,默认的行为还会尝试将共享内存附加到一个合适的地址上,如果?
shmaddr
?参数为?NULL
。这样可以确保共享内存被正确地映射到调用进程的地址空间。设置?
shmflg
?为 0 是使用?shmat
?函数时最常见的用法,因为它会使用默认的行为附加共享内存,这通常满足了大多数情况下对共享内存的操作需求。
挂接实例代码如下:
#include <iostream>
#include <cassert>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <unistd.h>
#define PATHNAME "/home/Xq/2023_12/12_24/"
#define PROJID 128
#define SHMSIZE 4096
std::string TransToHex(key_t key)
{
char value[32] = {0};
snprintf(value,32,"0x%x",key);
return value;
}
void shared_memory(void)
{
// 获取唯一值 key
key_t key = ftok(PATHNAME,PROJID);
assert(key != -1);
std::cout << "get a unique value: " << TransToHex(key) << std::endl;
// 申请共享内存
int shmid = shmget(key, SHMSIZE, IPC_CREAT | IPC_EXCL | 0666);
assert(shmid != -1);
std::cout << "create a shared memory, shmid: " << shmid << std::endl;
sleep(1);
// 挂接共享内存
void* shmaddr = shmat(shmid, nullptr, 0);
assert(shmaddr != (void*)-1);
std::cout << "attach shared memory success" << std::endl;
sleep(3);
// 删除共享内存
int ctl_ret = shmctl(shmid, IPC_RMID, nullptr);
assert(ctl_ret != -1);
std::cout << "delete shared memory success, ctl_ret: " << ctl_ret << std::endl;
}
现象如下:
上面的过程就是将共享内存挂接到通信进程的地址空间中。
shmdt
?(shared memory detach) 函数用于分离共享内存,将共享内存从当前进程的地址空间中移除。
man 2 shmdt
NAME
shmdt - System V shared memory operations
SYNOPSIS
#include <sys/types.h>
#include <sys/shm.h>
int shmdt(const void *shmaddr);
RETURN VALUE
On success shmdt() returns 0; on error -1 is returned.
and errno is set to indicate the cause of the error.
shmaddr:是一个指针(地址空间上的虚拟地址) ,指向共享内存区域的起始地址。
调用?
shmdt
?函数会将指定的共享内存从当前进程的地址空间中分离,但并不会销毁或释放共享内存。当共享内存分离后,当前进程将不再能够直接访问该共享内存区域。
shmdt
?函数的返回值是一个整数,表示函数执行的结果。一般情况下,如果共享内存成功分离,则返回值为 0;如果发生错误,则返回值为 -1,并设置相应的错误码。可能的错误包括:
EINVAL
:无效的共享内存地址(shmaddr
?参数不是共享内存的起始地址)。ENOMEM
:无法解除指定共享内存的映射。需要注意的是,当不再需要使用共享内存时,调用?
shmdt
?函数来分离是很重要的。这样可以确保共享内存资源被正确释放,以免造成资源泄露或其他不可预见的问题。
分离共享内存代码实例:
#include <iostream>
#include <cassert>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <unistd.h>
#define PATHNAME "/home/Xq/2023_12/12_24/"
#define PROJID 128
#define SHMSIZE 4096
std::string TransToHex(key_t key)
{
char value[32] = {0};
snprintf(value,32,"0x%x",key);
return value;
}
void shared_memory(void)
{
// 获取唯一值 key
key_t key = ftok(PATHNAME,PROJID);
assert(key != -1);
std::cout << "get a unique value: " << TransToHex(key) << std::endl;
// 申请共享内存
int shmid = shmget(key, SHMSIZE, IPC_CREAT | IPC_EXCL | 0666);
assert(shmid != -1);
std::cout << "create a shared memory, shmid: " << shmid << std::endl;
sleep(1);
// 挂接共享内存
void* shmaddr = shmat(shmid, nullptr, 0);
assert(shmaddr != (void*)-1);
std::cout << "attach shared memory success" << std::endl;
sleep(2);
// 分离共享内存
int dt_ret = shmdt(shmaddr);
assert(dt_ret != -1);
(void)dt_ret;
std::cout << "detach shared memory success" << std::endl;
sleep(2);
// 删除共享内存
int ctl_ret = shmctl(shmid, IPC_RMID, nullptr);
assert(ctl_ret != -1);
std::cout << "delete shared memory success, ctl_ret: " << ctl_ret << std::endl;
}
现象如下:?
上述过程就是分离共享内存的过程。
有了上述步骤的理解,我们就有了利用共享内存这种通信方式实现进程间通信的思路。
System V 共享内存通信逻辑如下:
- 创建公共的 key 值 --- ftok。
- 创建共享内存,注意创建时要设定好该共享内存的起始权限?---?shmget。
- 将指定的共享内存,挂接到通信进程的地址空间 ---?shmat。
- 通信逻辑。
- 将指定的共享内存,从相关进程的地址空间中去关联,即分离共享内存?--- shmdt。
- 删除共享内存?--- shmctl。
可以发现,上面的步骤,我们只差一步,即通信逻辑的完善!而要完善通信逻辑,我们需要理解一些准备,具体如下:
在学习进程时,地址空间我们说过,在32位机器下,地址空间的大小为4GB,0 - 3GB 我们称之为用户空间,3 - 4GB 我们称之为内核空间。
用户空间是给用户进程使用的,其中包含了用户程序的代码、数据、堆栈、共享区等。用户进程可以直接访问用户空间,但不能直接访问内核空间,需要通过相关系统调用接口访问内核空间。
内核空间是给操作系统内核使用的,包含操作系统的核心代码、数据结构、驱动程序等。内核空间通常是特权级的,只能被操作系统内核及其相关模块访问,并且可以访问整个地址空间,包括用户空间和内核空间自身。
地址空间32位机器下的地址空间如图所示:
我们之前也说过,当用 shmget() 系统调用创建共享内存后,并通过 shmat() 系统调用将共享内存映射 (挂接) 到通信进程的地址空间 (具体是共享区内)中,而从图中我们可以非常清晰地看到,共享内存所在的共享区是位于堆栈之间的,而这块空间是属于用户空间的,而非内核空间。
而我们之前又说了,用户进程是可以直接访问用户空间的,那么也就是说,这段空间 (共享内存) 不用通过系统调用就可以直接访问。换句话说,共享内存一旦映射到了进程的地址空间,那么双方进程如果要通信,直接对这块共享内存进行内存级别的读写即可 (不需要进行系统调用)。?
可是问题又来了,为什么我们之前学的 pipe (匿名管道) 、fifo (命名管道) 都要通过 read() 、write() 系统调用进行通信呢?
众所周知,管道 (匿名管道 / 命名管道) 都是基于文件系统的,但是,管道的缓冲区并不存储于文件系统中,而是存储于内核空间中的缓冲区中。 换句话说,管道中的内存级别的数据是存储于内核空间中的缓冲区中的。因此,与管道相关的资源是由操作系统维护的,属于操作系统的内核空间,用户不能直接访问这些资源,只能通过系统调用进行访问。
而共享内存与管道不同,一旦共享内存由 shmat () 系统调用挂接到了通信进程的地址空间中的共享区内,而共享区是属于用户空间的,用户可以直接进行读写访问,不需要经过系统调用。
有了这些只是的铺垫,我们就可以用共享内存的方式实现进程间通信了!具体如下:
#ifndef _LOG_HPP__
#include <iostream>
#include <string>
#include <ctime>
#include <cstring>
#endif
const std::string error_level[4] = {
"Success",
"Debug",
"Warning",
"Error"
};
std::string TransToTime()
{
unsigned int ret = (unsigned int)time(nullptr);
ret += 8 * 3600;
int hour = ret % (24 * 3600) / 3600;
int min = (ret % (24 * 3600) - (hour * 3600)) / 60;
int second = (ret % (24 * 3600)) - hour * 3600 - min * 60;
std::string cur_time;
cur_time += std::to_string(hour);
cur_time += ":";
cur_time += std::to_string(min);
cur_time += ":";
cur_time += std::to_string(second);
return cur_time;
}
std::string TransToHex(key_t key)
{
char buffer[64] = {0};
snprintf(buffer,64,"0x%x",key);
return buffer;
}
std::ostream& Log(const std::string& message, int level)
{
std::cout << "| " << TransToTime() << " | " << message << " | " << error_level[level] << " |";
return std::cout;
}
#ifndef _SHM_HPP__
#include "Log.hpp"
#include <cassert>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <unistd.h>
#endif
#define PATHNAME "/home/Xq/2023_12/12_25"
#define PROJID 128
#define SHMSIZE 4096
#include "Shm.hpp"
void shm_ipc_client(void)
{
// step 1: get a unqiue key
key_t key = ftok(PATHNAME,PROJID);
assert(key != -1);
Log("get a unique key",0) << " key: " << TransToHex(key) << std::endl;
// step 2: get a shared memory
int shmid = shmget(key, SHMSIZE, IPC_CREAT);
assert(shmid != -1);
Log("get a shared memory",0) << " shmid: " << shmid << std::endl;
// step 3: attach shared memory
void* shmaddr = shmat(shmid, nullptr, 0);
assert(shmaddr != (void*)-1);
Log("attach shared memory",0) << std::endl;
// step 4: communication logic
std::string message;
while(true)
{
message.clear();
getline(std::cin,message);
snprintf(static_cast<char*>(shmaddr), SHMSIZE, "%s",message.c_str());
if(strcmp(message.c_str(),"quit") == 0)
{
break;
}
sleep(1);
}
Log("communication logic end",0) << std::endl;
// step 5: detach shared memory
int dt_ret = shmdt(shmaddr);
assert(dt_ret != -1);
(void)dt_ret;
Log("detach shared memory",0) << std::endl;
}
int main()
{
shm_ipc_client();
return 0;
}
#include "Shm.hpp"
void shm_ipc_server(void)
{
// step 1: create unique value
key_t key = ftok(PATHNAME,PROJID);
assert(key != -1);
Log("create unqiue value",0) << " key: " << TransToHex(key) << std::endl;
// step 2: create new shared memory
int shmid = shmget(key, SHMSIZE, IPC_CREAT | IPC_EXCL | 0666);
assert(shmid != -1);
Log("create new shared memory",0) << " shmid: " << shmid << std::endl;
// step 3: attach shared memory
void* shmaddr = shmat(shmid, nullptr, 0);
assert(shmaddr != (void*)-1);
Log("attach shared memory",0) << std::endl;
// step 4: communication logic
while(true)
{
printf("Client say:> %s\n",static_cast<char*>(shmaddr));
if(strcmp("quit",static_cast<char*>(shmaddr)) == 0)
{
break;
}
sleep(1);
}
Log("communication logic end",0) << std::endl;
// step 5: detach shared memory
int dt_ret = shmdt(shmaddr);
assert(dt_ret != -1);
(void)dt_ret;
Log("detach shared memory",0) << std::endl;
// step 6: delete shared memory
int ctl_ret = shmctl(shmid, IPC_RMID, nullptr);
assert(ctl_ret != -1);
(void)ctl_ret;
Log("delete shared memory",0) << std::endl;
}
int main()
{
shm_ipc_server();
return 0;
}
.PHONY:all
all:ShmServer ShmClient
ShmServer:ShmServer.cc
g++ -o $@ $^ -std=gnu++11
ShmClient:ShmClient.cc
g++ -o $@ $^ -std=gnu++11
.PHONY:clean
clean:
rm -f ShmClient ShmServer
单独将 ShmServer 进程跑起来的时候,现象如下:
第一个现象:我们发现此时 ShmServer 服务端读取的内容是空的,换句话说共享内存被创建之后,默认情况下,内容会被清空为全0;
将 ShmClient 客户端 和 ShmServer 服务端 一起运行,ShmClient 客户端向共享内存写入数据,ShmServer 服务端从共享内存读取数据:
我们发现一个现象,当通信双方进程通过共享内存进行进程间通信时,写端向共享内存写入数据,读取端可以立马看到,该现象体现了低延迟、速度快的特性。
共享内存通常是所有进程间通信方式中速度最快的一种,这是因为共享内存允许多个进程直接在用户空间访问共享内存,而不需要通过内核进行数据的拷贝和传输。因此,共享内存的数据访问速度较快,可以实现高效的数据交换和共享。
在这里,我们将管道和共享内存做一个简单的比较。
管道如下图所示:
共享内存如下图所示:?
总结,上面的图,我们是将 从键盘文件读取数据 和 将数据写入显示器文件?分别算了一次拷贝,如果这些不考虑的话,那么管道通信会有两次拷贝,共享内存通信一次拷贝都没有,具体分析如下:
对于管道,一般情况下会涉及两次数据拷贝:一次是将数据从写入端进程的用户空间拷贝到内核空间的缓冲区,另一次是将数据从内核空间的缓冲区拷贝到读取端进程的用户空间。
对于共享内存,通常情况下没有数据拷贝,因为通信进程直接在用户空间的共享内存区域进行读写操作,不需要将数据写入到内核中 。
需要注意的是,在使用共享内存和管道的过程中,其他调用可能会涉及更多的数据拷贝操作,这取决于具体的使用情况和实现方式。同时,为了保护数据的一致性和安全性,进程在访问共享内存或管道时通常也需要使用同步机制或者互斥操作,这也可能会引入额外的开销。
在学习管道的时候我们说过,当管道没有数据的时候,读端进程被阻塞在 read() 系统调用内部,当管道数据被写满的时候,写端进程会被阻塞在 write() 系统调用内部,我们将这种现象称之为访问控制!即管道提供了访问控制!
可是对于共享内存来说,共享内存缺乏访问控制!共享内存天然的就是为了让我们以较高效率进行通信!因此内部没有提供任何的访问控制!甚至,读取端和写入端压根不知道对方的存在!读取端不管你共享内存中有没有数据,它会一直读取共享内存中的数据;如果写端将共享内存写满后继续向共享内存写入新的数据,数据可能会覆盖之前的数据,导致数据的丢失和数据的不一致性!
共享内存缺乏访问控制会导致并发问题。由于多个进程可以直接访问共享内存,当没有适当的访问控制措施时,可能会出现以下并发问题:
竞态条件(Race Condition):当多个进程同时尝试读取或写入共享内存中的数据时,由于访问没有得到同步,可能会导致数据的不确定性和不一致性。
冲突和数据覆盖:如果多个进程同时尝试写入共享内存中的数据,没有合适的同步机制,可能会导致数据冲突和数据覆盖的问题,最终导致数据的不正确和丢失。
死锁(Deadlock):当多个进程需要使用共享内存进行读写操作时,如果它们没有正确进行同步,可能会导致死锁情况的发生,即多个进程互相等待对方释放资源而无法继续执行。
共享内存缺乏访问控制,但如果我就想提供一定的访问控制,该如何实现呢?
我们说过,管道具有访问控制,即管道具有同步能力,那么能不能以管道为辅助让共享内存具有访问控制呢? 答案:可以的。
#ifndef _LOG_HPP__
#include <iostream>
#include <string>
#include <ctime>
#include <cstring>
#endif
const std::string error_level[5] = {
"Success",
"Notice",
"Debug",
"Warning",
"Error"
};
std::string TransToTime()
{
unsigned int ret = (unsigned int)time(nullptr);
ret += 8 * 3600;
int hour = ret % (24 * 3600) / 3600;
int min = (ret % (24 * 3600) - (hour * 3600)) / 60;
int second = (ret % (24 * 3600)) - hour * 3600 - min * 60;
std::string cur_time;
cur_time += std::to_string(hour);
cur_time += ":";
cur_time += std::to_string(min);
cur_time += ":";
cur_time += std::to_string(second);
return cur_time;
}
std::string TransToHex(key_t key)
{
char buffer[64] = {0};
snprintf(buffer,64,"0x%x",key);
return buffer;
}
std::ostream& Log(const std::string& message, int level)
{
std::cout << "| " << TransToTime() << " | " << message << " | " << error_level[level] << " |";
return std::cout;
}
#ifndef _SHM_HPP__
#include "Log.hpp"
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/stat.h>
#include <fcntl.h>
#endif
namespace Xq
{
#define SHMPATHNAME "/home/Xq/2023_12/12_26"
#define PROJID 128
#define SHMSIZE 4096
#define FIFOPATHNAME "/home/Xq/2023_12/12_26/fifo"
#define FIFOSTARTMODE 0666
#define WRITE O_WRONLY
#define READ O_RDONLY
class named_fifo
{
public:
named_fifo()
{
int ret = mkfifo(FIFOPATHNAME,FIFOSTARTMODE);
assert(ret != -1);
(void)ret;
}
~named_fifo()
{
int ret = unlink(FIFOPATHNAME);
assert(ret != -1);
(void)ret;
}
};
int open(const char* pathname,int flags)
{
int fd = ::open(pathname,flags);
assert(fd != -1);
return fd;
}
void wait(int fd)
{
u_int32_t signal = 0;
int read_ret = read(fd, &signal,sizeof(u_int32_t));
assert(read_ret != -1);
(void)read_ret;
}
void put_signal(int fd)
{
u_int32_t signal = 0;
int write_ret = write(fd,&signal,sizeof(u_int32_t));
assert(write_ret != -1);
(void)write_ret;
}
}
.PHONY:all
all:ShmClient ShmServer
ShmClient:ShmClient.cc
g++ -o $@ $^ -std=gnu++11
ShmServer:ShmServer.cc
g++ -o $@ $^ -std=gnu++11
.PHONY:clean
clean:
rm -f ShmClient ShmServer
#include "Shm.hpp"
void shm_ipc_client(void)
{
// step 1: get a unqiue key
key_t key = ftok(SHMPATHNAME,PROJID);
assert(key != -1);
Log("get a unique key",0) << " key: " << TransToHex(key) << std::endl;
// step 2: get a shared memory
int shmid = shmget(key, SHMSIZE, IPC_CREAT);
assert(shmid != -1);
Log("get a shared memory",0) << " shmid: " << shmid << std::endl;
// step 3: attach shared memory
void* shmaddr = shmat(shmid, nullptr, 0);
assert(shmaddr != (void*)-1);
Log("attach shared memory",0) << std::endl;
// step 4: communication logic
int fd = Xq::open(FIFOPATHNAME, WRITE);
std::string message;
while(true)
{
message.clear();
std::cout << "Please input message:> ";
getline(std::cin,message);
snprintf(static_cast<char*>(shmaddr), SHMSIZE, "%s",message.c_str());
// 当写端(client 客户端)将数据完全写入共享内存后
// 在发送信号,让读端(server 服务端)不要再阻塞,进而读取共享内存的数据
Xq::put_signal(fd);
if(strcmp(message.c_str(),"quit") == 0)
break;
}
Log("communication logic end",0) << std::endl;
// step 5: detach shared memory
int dt_ret = shmdt(shmaddr);
assert(dt_ret != -1);
(void)dt_ret;
Log("detach shared memory",0) << std::endl;
}
int main()
{
shm_ipc_client();
return 0;
}
#include "Shm.hpp"
void shm_ipc_server(void)
{
// step 1: create unique value
key_t key = ftok(SHMPATHNAME,PROJID);
assert(key != -1);
Log("create unqiue value",0) << " key: " << TransToHex(key) << std::endl;
// step 2: create new shared memory
int shmid = shmget(key, SHMSIZE, IPC_CREAT | IPC_EXCL | 0666);
assert(shmid != -1);
Log("create new shared memory",0) << " shmid: " << shmid << std::endl;
// step 3: attach shared memory
void* shmaddr = shmat(shmid, nullptr, 0);
assert(shmaddr != (void*)-1);
Log("attach shared memory",0) << std::endl;
// step 4: communication logic
int fd = Xq::open(FIFOPATHNAME, READ);
while(true)
{
Log("wait data",1) << std::endl;
// 读端(server 服务单)阻塞 (实质上是在 Xq::wait() 中的 read() 系统调用内部阻塞)
// 等待写端(client 客户端)发送信号,在进行读取共享内存的数据
Xq::wait(fd);
printf("Client say:> %s\n",static_cast<char*>(shmaddr));
if(strcmp("quit",static_cast<char*>(shmaddr)) == 0)
break;
}
Log("communication logic end",0) << std::endl;
// step 5: detach shared memory
int dt_ret = shmdt(shmaddr);
assert(dt_ret != -1);
(void)dt_ret;
Log("detach shared memory",0) << std::endl;
// step 6: delete shared memory
int ctl_ret = shmctl(shmid, IPC_RMID, nullptr);
assert(ctl_ret != -1);
(void)ctl_ret;
Log("delete shared memory",0) << std::endl;
}
int main()
{
Xq::named_fifo fifo_target;
shm_ipc_server();
return 0;
}
当 server 服务端跑起来的时候,我们发现它此时处于阻塞状态,实际上此时 server 服务端是阻塞在 open() 系统调用的内部 (即阻塞在打开管道文件的内部),等待 client 客户端打开管道文件。
当 client 客户端跑起来的时候,并将管道文件也打开了之后,此时 server 端就会继续被调度,当 server 服务端走到了 Xq::wait 这个函数,此时会继续被阻塞,实质上是阻塞在了 read() 系统调用内部,等待 client 客户端向管道写入数据后,server 服务端才会继续被调度。
可以清楚的看到,当 client 客户端向共享内存写入数据后 (在这个过程中会向管道发送一个信号 --- 实质上就是向管道写入了数据,写入之后,管道就有了数据 ,因此 server 服务端不在阻塞,进行读取共享内存的数据),此时 server 服务端就可以将共享内存的数据读取出来,读取完 server 服务端继续被阻塞,重复上述过程,而这就是我们通过管道让共享内存具有访问控制的处理方案。
我们在进程间通信主题简单理解一下信号量的概念,到了多线程,我们具体细说信号量。
基于对于管道和共享内存的理解,如果我们想让进程间进行通信,那么首先要解决的问题就是:如何让不同的进程看到同一份资源!无论是管道,还是共享内存,它们都是以不同的方式让不同的进程看到了同一份资源,为进程通信提供了技术保障!可是,我们知道,在编程领域内,当我们用一个方式解决了一个问题,那么这个方式往往就会带来新的问题,这是一种被称为“问题解决导致新问题”的现象。比如共享内存这种方通信式,它虽然可以让不同的进程看到同一份资源,为进程间通信提供了技术保证,但是,它也带来了新的问题,例如时序问题,容易造成数据不一致问题!
时序问题:是指在多个进程同时访问共享内存时,无法确定它们的执行顺序。这可能导致不同进程对共享数据的读写操作发生冲突,产生竞争条件(race condition)和不确定的结果。
数据一致性问题:是指当多个进程同时修改共享数据时,由于缺乏同步机制,可能导致数据的不一致性。例如,一个进程修改了共享数据的部分内容,而另一个进程在读取该数据时可能得到不正确或过期的值。
而在这里我们有一批结论,在进程间通信这个主题,我们是记忆加理解,为多线程做准备,具体如下:
- 我们将多个进程(执行流) 看到的公共的一份资源 --- 称之为临界资源。
- 我们把自己的进程,访问临界资源的代码 --- 称之为临界区。所以,多个执行流,互相运行的时候互相干扰,主要是我们在临界区不加保护的访问了同样的资源(临界资源),在非临界区多个执行流互相是不影响的。
- 为了更好地进行临界区的保护,可以让多执行流在任何时刻,都只有一个进程进入临界区进行访问临界资源 ??????? --- 这种我们称之为互斥。
- 原子性: 要么完全不执行???????,要么完全执行???????,没有中间状态,我们称之为原子性。在编程领域中,对于并发和多线程环境下的操作,我们通常将具有原子性的操作称之为是线程安全的,而不具有原子性的操作则可能是线程不安全的。
为了更好的理解信号量,我们在这里形象化地讲一个故事:
在生活中,我们或多或少的去过电影院看过电影!我们可以将多个执行流访问临界资源想象成人们去电影院看电影!其中多个执行流对应的就是看电影的人,临界资源对应的就是电影院的座位,临界区对应的就是电影院!
看电影的过程,实际上是在电影院这个地点,在电影放映过程中,某个人在占用某个座位,对应到计算机,即某个进程在临界区访问临界资源!!!
在现实生活中,去电影院看电影,难道是人们直接去电影院随便找一个座位看电影吗?答案是:当然不是,如果这样随机处理的话,可能会出现各种问题,这是看电影的人和电影院的管理者都不愿意看见的场景!事实上,看电影,前提并不是去抢占座位,而是应该先去买票!
只要你买了票,那么这个票映射的座位在整个电影放映期间是只属于你的!即便你不去看了,这个座位在放映期间依旧只属于你!
因此买票的本质 : 对座位 (临界资源) 的预定机制。
同样的,当某一个进程进入临界区,想去访问临界资源中的一部分,不能让进程直接去访问临界资源 (对应到电影院:不能让用户直接去电影院内部占座位),而是应该先申请信号量 (你得先买票!!)
同样的,我们也知道,电影院的座位实际上是有限的。假设一个电影院中座位的有效数量是100,那么买票本质上就是将这个数量进行--,即被占用了一个有效座位,此时还剩下99个有效座位!对应到计算机,当某一个进程进入临界区访问某一个临界资源时,此时这个进程应该先去申请信号量(对应到电影院这个故事,应该先去买票,即座位数--),那么此时这个信号量就类似于这个有效座位的个数,即临界资源的有效个数,简而言之,这个信号量我们可以做个简单的定义:它是一个计数器!!!
进程访问临界资源,并不是直接去访问,而是应该先去申请信号量!
先申请信号量!
- 申请信号量的本质:让信号量计数器 --
- 只要申请信号量成功,临界资源内部,一定给你预留了你想要的资源 --- 信号量本质其实是对临界资源的一种预定机制!!!
只要信号量申请成功,就可以访问临界资源,访问临界资源本质上就是进程执行自己的临界区代码。
释放信号量我们可以理解为信号量计数器++的操作!
既然你说了,信号量是一个计数器,那么可以用一个全局变量表示该信号量吗?答案是:不可以!我们知道,进程是具有独立性的 (这句话我们都说烂了) ,就算是父子进程,对于代码和数据的处理都是也写实拷贝的方案处理的,更别说其他的进程了!
但是,我们在这里做一个假设,假设让多个进程看到同一个全局变量( int n ),大家都进行申请信号量,即做 n-- 的操作;
而我们知道,在冯诺依曼体系中,只有CPU具有计算能力,计算要在CPU内,可是数据 (变量 n)在内存啊,那么在整个计算过程中,我们可以简单的看成三个过程:
- ?将内存中的数据加载到CPU内的寄存器中(读指令)。
- ?n--(分析 / 执行指令)。
- ?将CPU修改外完毕的 n 写回内存 (写回结果)。
可是我们知道,执行流在被CPU调度的时候,是具有时间片的,即进程在被调度的过程中,随时都可能被切换!同样,我们以前也说过,进程被调度的时候,会产生临时数据,保存在CPU内部的寄存器中,当进程被切换时,这些临时数据也必然要被保存,下次被调度时又需要将这些临时数据在加载到CPU内部的寄存器中,可是寄存器只有一套啊,被所有的执行流共享,但是寄存器里面的数据是你这个进程的啊,因此当发生进程切换的时候,临时数据需要保存在进程的PCB中。而我们将寄存器中的数据称之为执行流的上下文数据!进程被切换的时候, 需要上下文保护和上下文恢复!!!
因此,当某一个执行流执行 n-- 的过程中,由于 n-- 这个操作并不是一步就完成的,是存在中间状态的,也就是说 n-- 这个操作,因为时序问题,会导致 n 有中间状态!可能导致数据不一致问题!此时这个 n-- 就是不安全的。
如果一个操作是原子的,那么就是安全的,而 n-- 这个操作是非原子的,也就意味着这个 n-- 操作是不安全的!?
每个进程想要通过信号量得到一个临界资源,也就是说信号量首先就需要被所有进程看到,即信号量自身就天然是一个临界资源。
而你这个信号量 (计数器) 的目的是:保护其他的临界资源,可是你自己就是一个临界资源!那么谁保证你的安全呢?
因此,这个计数器必须优先保证自己是安全的。所以,信号量的申请和释放操作必须是安全的,那么如何保证呢?
申请信号量 ---> 计数器--? ----> P操作 ---> 必须是原子的。
释放信号量 ---> 计数器++ ---> V操作 ---> 必须是原子的。
也就是说,信号量的申请和释放操作必须是原子的,这样才能保证信号量自身是安全的。
实话实说,关于信号量尽管我们说了一些概念,但是,我们可能对它的理解并不深刻!因此我们今天在进程间通信这个主题关于信号量的目的就是:对信号量有一个简单的认识,为多线程关于信号量的详解做前置性认识!
至此,进程间通信到此为止。当然,后期如果又有新收获,会增添(改正)内容,希望自己能够以空杯心态继续前进,共勉。