上一篇文章记录了如何实现函数assert()和panic(),为了方便我们在运行过程中随时发现错误。那么现在我们有了这两个工具,这篇文章就开始记录如何实现进程间通信(IPC)机制。
目录
IPC是 Inter-Process Communication 的缩写,直译为进程间通信,所白了就是进程间发消息。我们在开始的文章中把这种消息传递比作邮政系统,但实际上这种比喻并不全对。有的消息机制是很像收发邮件的,这种叫做异步IPC,意思是说,发信这发完就去干别的了,收信者也一样,看看信箱里没信,也不坐在旁边傻等。而另一种消息机制正好相反,被称为同步IPC,它不像邮寄,倒像接力赛,发送者一直等到接收者收到消息才肯放手,接收者也一样,接不到就一直等着,不干别的。
在这里,书上选择了同步IPC。同步IPC有若干好处,比如:
这些特性可能无法一下子全都明白,随着我们接下来写完代码,可能会清楚一些。
Minix的IPC机制我们已经明白了,它的核心在于“int SYSVEC”这个软中断以及与之对应的sys_call()这个函数。增加一个系统调用对我们来讲已经不是什么困难的事情了,按照”《ORANGE’S:一个操作系统的实现》读书笔记(二十二)输入输出系统(四)”文章中记录的步骤来做就好了。我们把这个新的系统调用起名为sendrec。sendrec和sys_sendrec的函数体如下代码所示。
代码 kernel/syscall.asm,sendrec。
_NR_sendrec equ 0
...
global sendrec
...
sendrec:
mov eax, _NR_sendrec
mov ebx, [esp + 4] ; function
mov ecx, [esp + 8] ; src_dest
mov edx, [esp + 12] ; p_msg
int INT_VECTOR_SYS_CALL
ret
代码 kernel/proc.c,sys_sendrec。
/**
* <Ring 0> The core routine of system call 'sendrec()'.
*
* @param function SEND or RECEIVE
* @param src_dest To/From whom the message is transferred.
* @param m Ptr to the MESSAGE body.
* @param p The caller proc.
*
* @return Zero if success.
*/
PUBLIC int sys_sendrec(int function, int src_dest, MESSAGE* m, PROCESS* p)
{
assert(k_reenter == 0); /* make sure we are not in ring0 */
assert((src_dest >= 0 && src_dest < NR_TASKS + NR_PROCS) || src_dest == ANY || src_dest == INTERRUPT);
int ret = 0;
int caller = proc2pid(p);
MESSAGE* mla = (MESSAGE*)va2la(caller, m);
mla->source = caller;
/**
* Actually we have the third message type: BOTH. However, it is not
* allowed to be passed to the kernel directly. Kernel doesn't know
* it at all. It is transformed into a SEND followed by a RECEVIE
* by 'send_recv()'
*/
if (function == SEND) {
ret = msg_send(p, src_dest, m);
if (ret != 0) {
return ret;
}
} else if (function == RECEIVE) {
ret = msg_recevie(p, src_dest, m);
if (ret != 0) {
return ret;
}
} else {
panic("{sys_sendrec} invalid function: %d (SEND:%d, RECEIVE:%d).", function, SEND, RECEIVE);
}
return 0;
}
sys_sendrec()这个函数设计的相当简单,可以描述为:把SEND消息交给msg_send()处理,把RECEIVE消息交给msg_receive()处理。函数中使用的宏如定义在const.h中,具体如下所示。
代码 include/const.h,宏定义。
/* ipc */
#define SEND 1
#define RECEIVE 2
#define BOTH 3 /* BOTH = (SEND | RECEIVE) */
/* tasks */
#define INTERRUPT -10
#define ANY (NR_TASKS + NR_PROCS + 10)
结构体MESSAGE定义在tty.h中,这里借用了Minix的MESSAGE结构体。因为书上并没有对此进行说明,我在网上还没有找到相应的文档,所以先拿来用,如果你知道的话,请告诉我,谢谢。
代码 include/tty.h,MESSAGE结构体。
/**
* MESSAGE mechanism is borrowed from MINIX
*/
struct mess1 {
int m1i1;
int m1i2;
int m1i3;
int m1i4;
};
struct mess2 {
void* m2p1;
void* m2p2;
void* m2p3;
void* m2p4;
};
struct mess3 {
int m3i1;
int m3i2;
int m3i3;
int m3i4;
u64 m3l1;
u64 m3l2;
void* m3p1;
void* m3p2;
};
typedef struct {
int source;
int type;
union {
struct mess1 m1;
struct mess2 m2;
struct mess3 m3;
} u;
} MESSAGE;
接下来,我们修改一下global.c中的sys_call_table[],将sys_sendrec放入数组中,代码如下所示:
PUBLIC system_call sys_call_table[NR_SYS_CALL] = {sys_sendrec, sys_printx};
sys_sendrec()函数核心是msg_send()和msg_receive()两个函数,它们是IPC的核心代码。现在我们来看一下它们是如何实现的。
代码 kernel/proc.c,msg_send 和 msg_receive。
/**
* <Ring 0~1> Calculate the linear address of a certain segment of a given proc.
*
* @param p Whose (the proc ptr).
* @param idx Whick (one proc has more than one segments).
*
* @return The required linear address.
*/
PUBLIC int ldt_seg_linear(PROCESS *p, int idx)
{
DESCRIPTOR *d = &p->ldts[idx];
return d->base_high << 24 | d->base_mid << 16 | d->base_low;
}
/**
* <Ring 0~1> Virtual addr --> Linear addr.
*
* @param pid PID of the proc whose address is to be calculated.
* @param va Virtual address.
*
* @return The liner address for the given virtual address.
*/
PUBLIC void* va2la(int pid, void* va)
{
PROCESS *p = &proc_table[pid];
u32 seg_base = ldt_seg_linear(p, INDEX_LDT_RW);
u32 la = seg_base + (u32)va;
if (pid < NR_TASKS + NR_PROCS) {
assert(la == (u32)va);
}
return (void*)la;
}
/**
* <Ring 0~3> Clear up a MESSAGE by setting each byte to 0.
*
* @param p The message to be cleared.
*/
PUBLIC void reset_msg(MESSAGE* p)
{
memset(p, 0, sizeof(MESSAGE));
}
/**
* <Ring 0> This routine is called after 'p_flags' has been set (!= 0), it
* calls 'schedule()' to choose another proc as the 'proc_ready'.
*
* @attention This routine does not change 'p_flags'. Make sure the 'p_flags'
* of the proc to be blocked has been set properly.
*
* @param p The proc to be blocked.
*/
PRIVATE void block(PROCESS* p)
{
assert(p->p_flags);
scheduld();
}
/**
* <Ring 0> This is a dummy routine. It does nothing actually. When it is
* called, the 'p_flags' should have been cleared (== 0).
*
* @param p The unblocked proc.
*/
PRIVATE void unblock(PROCESS* p)
{
assert(p->p_flags == 0);
}
/**
* <Ring 0> Check whether it is safe to send a message from src to dest.
* The routine will detect if the messaging graph contains a cycle. For
* instance, if we have procs trying to send messages like this:
* A -> B -> C -> A, then a deadlock occurs, because all of then will
* wait forever. If no cycles detected, it is considered as safe.
*
* @param src Who wants to send message.
* @param dest To whom the message is sent.
*
* @return Zero if success.
*/
PRIVATE int deadlock(int src, int dest)
{
PROCESS* p = proc_table + dest;
while (1) {
if (p->p_flags & SENDING) {
if (p->p_sendto == src) {
/* print the chain */
p = proc_table + dest;
printl("=_=%s", p->p_name);
do {
assert(p->p_msg);
p = proc_table + p->p_sendto;
printl("->%s", p->p_name);
} while (p != proc_table + src);
printl("=_=");
return 1;
}
p = proc_table + p->p_sendto;
} else {
break;
}
}
return 0;
}
/**
* <Ring 0> Send a message to the dest proc. If dest is blocked waiting for
* the message, copy the message to it and unblock dest. Otherwise the caller
* will be blocked and appended to the dest's sending queue.
*
* @param current The caller, the sender.
* @param dest To whom the message is sent.
* @param m The message.
*
* @return Zero if success.
*/
PRIVATE int msg_send(PROCESS* current, int dest, MESSAGE* m)
{
PROCESS* sender = current;
PROCESS* p_dest = proc_table + dest; /* proc dest */
assert(proc2pid(sender) != dest);
/* check for deadlock here */
if (deadlock(proc2pid(sender), dest)) {
panic(">>DEADLOCK<< %s->%s", sender->p_name, p_dest->p_name);
}
if ((p_dest->p_flags & RECEIVING) && /* dest is waiting for the msg */
(p_dest->p_recvfrom == proc2pid(sender) ||
p_dest->p_recvfrom == ANY)) {
assert(p_dest->p_msg);
assert(m);
phys_copy(va2la(dest, p_dest->p_msg), va2la(proc2pid(sender), m), sizeof(MESSAGE));
p_dest->p_msg = 0;
p_dest->p_flags &= ~RECEIVING; /* dest has received the msg */
p_dest->p_recvfrom = NO_TASK;
unblock(p_dest);
assert(p_dest->p_flags == 0);
assert(p_dest->p_msg == 0);
assert(p_dest->p_recvfrom == NO_TASK);
assert(p_dest->p_sendto == NO_TASK);
assert(sender->p_flags == 0);
assert(sender->p_msg == 0);
assert(sender->p_recvfrom == NO_TASK);
assert(sender->p_sendto == NO_TASK);
} else { /* dest is not waiting for the msg */
sender->p_flags |= SENDING;
assert(sender->p_flags == SENDING);
sender->p_sendto = dest;
sender->p_msg = m;
/* append to the sending queue */
PROCESS* p;
if (p_dest->q_sending) {
p = p_dest->q_sending;
while (p->next_sending) {
p = p->next_sending;
}
p->next_sending = sender;
} else {
p_dest->q_sending = sender;
}
sender->next_sending = 0;
block(sender);
assert(sender->p_flags == SENDING);
assert(sender->p_msg != 0);
assert(sender->p_recvfrom == NO_TASK);
assert(sender->p_sendto == dest);
}
return 0;
}
/**
* <Ring 0> Try to get a message from the src proc. If src is blocked sending
* the message, copy message from it and unblock src. Otherwise the caller
* will be blocked.
*
* @param current The caller, the proc who wanna receive.
* @param src From whom the message will be received.
* @param m THe message ptr to accept the message.
*
* @return Zero if success.
*/
PRIVATE int msg_recevie(PROCESS* current, int src, MESSAGE* m)
{
PROCESS* p_who_wanna_recv = current; /**
* This name if a little bit
* wierd, but it makes me
* think cleary, so I keep it.
*/
PROCESS* p_from = 0; /* from which the message will be fetched */
PROCESS* prev = 0;
int copyok = 0;
assert(proc2pid(p_who_wanna_recv) != src);
if ((p_who_wanna_recv->has_int_msg) && ((src == ANY) || (src == INTERRUPT))) {
/* There is an interrupt needs p_who_wanna_recv's handling and p_who_wanna_recv is ready to handler it. */
MESSAGE msg;
reset_msg(&msg);
msg.source = INTERRUPT;
msg.type = HARD_INT;
assert(m);
phys_copy(va2la(proc2pid(p_who_wanna_recv), m), &msg, sizeof(MESSAGE));
p_who_wanna_recv->has_int_msg = 0;
assert(p_who_wanna_recv->p_flags == 0);
assert(p_who_wanna_recv->p_msg == 0);
assert(p_who_wanna_recv->p_sendto == NO_TASK);
assert(p_who_wanna_recv->has_int_msg == 0);
return 0;
}
/* Arrives here if no interrupt for p_who_wanna_recv. */
if (src == ANY) {
/* p_who_wanna_recv is ready to receive messages from
* ANY proc, we'll check the sending queue and pick the
* first proc in it.
*/
if (p_who_wanna_recv->q_sending) {
p_from = p_who_wanna_recv->q_sending;
copyok = 1;
assert(p_who_wanna_recv->p_flags == 0);
assert(p_who_wanna_recv->p_msg == 0);
assert(p_who_wanna_recv->p_recvfrom == NO_TASK);
assert(p_who_wanna_recv->p_sendto == NO_TASK);
assert(p_who_wanna_recv->q_sending != 0);
assert(p_from->p_flags == SENDING);
assert(p_from->p_msg != 0);
assert(p_from->p_recvfrom == NO_TASK);
assert(p_from->p_sendto == proc2pid(p_who_wanna_recv));
}
} else {
/* p_who_wanna_recv wants to receive a message from
* a certain proc: src.
*/
p_from = &proc_table[src];
if ((p_from->p_flags & SENDING) && (p_from->p_sendto == proc2pid(p_who_wanna_recv))) {
/* Perfect, src is sending a message to
* p_who_wanna_recv
*/
copyok = 1;
PROCESS* p = p_who_wanna_recv->q_sending;
assert(p); /* p_from must have been appended to the
* queue, so the queue must not be NULL
*/
while (p) {
assert(p_from->p_flags & SENDING);
if (proc2pid(p) == src) { /* if p is the one */
p_from = p;
break;
}
prev = p;
p = p->next_sending;
}
assert(p_who_wanna_recv->p_flags == 0);
assert(p_who_wanna_recv->p_msg == 0);
assert(p_who_wanna_recv->p_recvfrom == NO_TASK);
assert(p_who_wanna_recv->p_sendto == NO_TASK);
assert(p_who_wanna_recv->q_sending != 0);
assert(p_from->p_flags == SENDING);
assert(p_from->p_msg != 0);
assert(p_from->p_recvfrom == NO_TASK);
assert(p_from->p_sendto == proc2pid(p_who_wanna_recv));
}
}
if (copyok) {
/* It's determined from which proc the message will
* be copied. Note that this proc must have been
* waiting for this moment in this queue, so we should
* remove it from the queue.
*/
if (p_from == p_who_wanna_recv->q_sending) { /* the 1st one */
assert(prev == 0);
p_who_wanna_recv->q_sending = p_from->next_sending;
p_from->next_sending = 0;
} else {
assert(prev);
prev->next_sending = p_from->next_sending;
p_from->next_sending = 0;
}
assert(m);
assert(p_from->p_msg);
/* copy the message */
phys_copy(va2la(proc2pid(p_who_wanna_recv), m), va2la(proc2pid(p_from), p_from->p_msg), sizeof(MESSAGE));
p_from->p_msg = 0;
p_from->p_sendto = NO_TASK;
p_from->p_flags &= ~SENDING;
unblock(p_from);
} else { /* nobody's sending any msg */
/* Set p_flags so that p_who_wanna_recv will not
* be scheduled until it is unblocked.
*/
p_who_wanna_recv->p_flags |= RECEIVING;
p_who_wanna_recv->p_msg = m;
if (src == ANY) {
p_who_wanna_recv->p_recvfrom = ANY;
} else {
p_who_wanna_recv->p_recvfrom = proc2pid(p_from);
}
block(p_who_wanna_recv);
assert(p_who_wanna_recv->p_flags == RECEIVING);
assert(p_who_wanna_recv->p_msg != 0);
assert(p_who_wanna_recv->p_recvfrom != NO_TASK);
assert(p_who_wanna_recv->p_sendto == NO_TASK);
assert(p_who_wanna_recv->has_int_msg == 0);
}
return 0;
}
围绕msg_send()和msg_receive(),代码中还列出了其它几个必要的函数,它们是:
在block()、unblock()和deadlock()中,都出现了PROCESS这个结构体的一个新成员:p_flag。除了该成员外,其实还增加了其它几个成员,具体代码如下所示。
代码 include/proc.h,进程表的新成员。
/* 进程结构体 */
typedef struct s_proc {
STACK_FRAME regs; /* process registers saved in stack frame */
u16 ldt_sel; /* gdt selector giving ldt base and limit */
DESCRIPTOR ldts[LDT_SIZE]; /* local descriptors for code and data */
int ticks; /* remained ticks */
int priority;
u32 pid; /* process id passed in from MM */
char p_name[16]; /* name of the process */
int p_flags; /* process flags. A proc is runnable iff p_flags == 0 */
MESSAGE* p_msg;
int p_recvfrom;
int p_sendto;
int has_int_msg; /**
* nonzero if an INTERRUPT occurred when
* the task is not ready to deal with it.
*/
struct s_proc* q_sending; /* queue of procs sending messages to this proc */
struct s_proc* next_sending; /* next proc in the sending queue (q_sending) */
int nr_tty;
}PROCESS;
所有增加的这些成员都是跟消息机制有关的。
假设有进程A想要向进程B发送消息M,那么过程将会是这样的:
假设有进程B想要接收消息(来自特定的进程、中断或者任意进程),那么过程将会是这样的:
值得说明的是,不管是接收方还是发送方,都各自维护一个消息结构体,只不过发送方的结构体是携带了消息内容的而接收方是空的。由于我们使用同步IPC,一方的需求——发送或接收——只有被满足之后才会继续运行,所以操作系统不需要维护任何的消息缓冲,实现起来也就相对简单。
代码中使用的宏定义在const.h中,具体如下所示:
/* Process */
#define SENDING 0x02 /* set when proc trying to send */
#define RECEIVING 0x04 /* set when proc trying to recv */
#define NO_TASK (NR_TASKS + NR_PROCS + 20)
代码中使用了一个phys_copy,它其实是一个宏被定义成memcpy的宏,该宏定义在string.h中,具体如下所示:
#define phys_copy memcpy
由于我们在进程结构体PROCESS中新添加了成员,所以要对新添加的成员进程初始化,初始化的代码放在main.c中。
代码 kernel/main.c,初始化PROCESS新添加的成员。
PUBLIC int kernel_main()
{
...
for(i = 0; i < NR_TASKS + NR_PROCS; i++){
...
p_proc->p_flags = 0;
p_proc->p_msg = 0;
p_proc->p_recvfrom = NO_TASK;
p_proc->p_sendto = NO_TASK;
p_proc->has_int_msg = 0;
p_proc->q_sending = 0;
p_proc->next_sending = 0;
...
}
...
proc_table[0].ticks = proc_table[0].priority = 15;
proc_table[1].ticks = proc_table[1].priority = 5;
proc_table[2].ticks = proc_table[2].priority = 3;
proc_table[3].ticks = proc_table[3].priority = 1;
proc_table[4].ticks = proc_table[3].priority = 1;
proc_table[1].nr_tty = 0;
proc_table[2].nr_tty = 0;
proc_table[3].nr_tty = 1;
proc_table[4].nr_tty = 1;
...
}
现在每个进程增加了两种可能的状态:SENDING和RECEIVING。相应的,我们需要在进程调度的时候区别对待了。凡是处于SENDING或RECEIVING状态的进程,我们就不再让它们获得CPU了,也就是说,将它们“阻塞”了。这样说明了为什么block()和unblock()两个函数本质上没有做任何工作——一个进程是否阻塞,已经由进程表中的p_flags项决定,我们不需要额外做什么工作。不过我们还是应该保留这两个函数,一方面将来可能扩展它们,另一方面它们也有助于理清编程的思路。
那么现在我们来修改一下调度函数。
代码 kernel/proc.c,增加消息机制之后的进程调度。
PUBLIC void scheduld()
{
PROCESS* p;
int greatest_ticks = 0;
while (!greatest_ticks) {
for (p = &FIRST_PROC; p <= &LAST_PROC; p++) {
if (p->p_flags == 0) {
if (p->ticks > greatest_ticks) {
greatest_ticks = p->ticks;
p_proc_ready = p;
}
}
}
if (!greatest_ticks) {
for (p = &FIRST_PROC; p < &LAST_PROC; p++) {
if (p->p_flags == 0) {
p->ticks = p->priority;
}
}
}
}
}
可以看到,当且仅当p_flags为零时,一个进程才可能获得运行的机会。
为了验证消息机制是否正常工作,我们还是从最简单的工作入手,删掉原先的系统调用get_ticks,用收发消息的方法重新实现之。
既然是收发消息,必然有两方参与。我们需要一个系统进程来接收用户进程的消息,并且返回ticks的值。我们来建立一个新的系统进程,就叫它“SYSTASK”。
添加一个任务的工作还是按照之前的步骤进行,这里就不再进行说明了。为了完成这个任务,我们新建一个文件systask.c,代码如下。
/**
* include/const.h
* #define TASK_SYS 1
*/
/**
* <Ring 1> The main loop of TASK SYS.
*/
PUBLIC void task_sys()
{
MESSAGE msg;
while (1) {
send_recv(RECEIVE, ANY, &msg);
int src = msg.source;
switch (msg.type) {
case GET_TICKS:
msg.RETVAL = ticks;
send_recv(SEND, src, &msg);
break;
default:
panic("unknown msg type");
break;
}
}
}
代码不复杂,不过要留心一下其中用到的函数send_recv(),它其实是把sendrec这个系统调用给封装了一下。
代码 kernel/proc.c,send_recv。
/**
* <Ring 1~3> IPC syscall.
*
* It is an encapsulation of `sendrec',
* invoking `sendrec' directly should be avoided
*
* @param function SEND, RECEIVE or BOTH
* @param src_dest The caller's proc_nr
* @param msg Pointer to the MESSAGE struct
*
* @return always 0.
*****************************************************************************/
PUBLIC int send_recv(int function, int src_dest, MESSAGE* msg)
{
int ret = 0;
if (function == RECEIVE)
memset(msg, 0, sizeof(MESSAGE));
switch (function) {
case BOTH:
ret = sendrec(SEND, src_dest, msg);
if (ret == 0)
ret = sendrec(RECEIVE, src_dest, msg);
break;
case SEND:
case RECEIVE:
ret = sendrec(function, src_dest, msg);
break;
default:
assert((function == BOTH) ||
(function == SEND) || (function == RECEIVE));
break;
}
return ret;
}
我们知道,一个完整的系统调用需要一个来回,那就是用户进程向内核请求一个东西,然后内核返回给它。我们用消息机制来实现这个过程同样需要一个来回,这意味着用户进程发送一个消息之后需要马上等待接收一个消息,以便收到内核(其实是某个系统任务)给它的返回值。这个发送然后马上接收的行为被send_recv()这个函数包装了一下,并在SEND和RECEIVE之外又提供了一个叫做BOTH的消息类型。今后我们想要收发消息时,就直接使用这个send_recv(),而不再直接使用系统调用sendrec。
好了,系统进程SYSTASK已经就绪,下面就来修改一下函数get_ticks。
代码 kernel/main.c,get_ticks。
/**
* include/const.h
* #define RETVAL u.m3.m3i1
*/
PUBLIC int get_ticks()
{
MESSAGE msg;
reset_msg(&msg);
msg.type = GET_TICKS;
send_recv(BOTH, TASK_SYS, &msg);
return msg.RETVAL;
}
...
void TestA()
{
while(1) {
printf("<Tricks:%x>", get_ticks());
milli_delay(2000);
}
}
我们以GET_TICKS为消息类型,不带其它任何信息地传递给SYSTASK,SYSTASK接收到这个消息之后,把当前的ticks值放入消息并发送给用户进程,用户进程会接收到它,完成整个任务。
好了,到此为止,我们完成了使用IPC来替换系统调用get_ticks,现在我们可以make并运行看一下效果了,不过不要忘记,由于我们新建了一个systask.c文件,所以需要更改Makefile。运行的效果如下图所示。
成功了!进程TestA调用get_ticks之后,成功打印出了它的值,这表明我们的消息机制工作良好。
虽然运行结果没有很大改变,但是如今我们的操作系统已经确立了微内核的路线,并且成功地实现了IPC,即便这算不上一个质的飞跃,至少我们已经走上了另一个台阶。
欢迎关注我的公众号