【linux】POSIX信号量+基于环形队列的生产消费模型

发布时间:2024年01月09日

POSIX信号量+基于环形队列的生产消费模型

在这里插入图片描述

喜欢的点赞,收藏,关注一下把!在这里插入图片描述

1.POSIX信号量

上篇文章最后我们基于BlockQueue生产者消费者模型写了代码,测试什么的都通过了。最后我们说代码还有一些不足的地方,由这些不足从而引入了我们接下来要学的信号量!

我们在看一看不足的地方
在这里插入图片描述
1.一个线程,在操作临界资源的时候,必须临界资源是满足条件的!
2.可是,公共资源是否满足生产或者消费条件,我们无法直接得知(我们不能事先得知(在没有访问之前无法得知))
3.只能先加锁,再检测,再操作,再解锁
为什么要先加锁呢?因为你要检测的本质也是在访问临界资源!

总而言之就是,因为我们在操作临界资源的时候,有可能不就绪,但是我们无法提前得知,所以只能先加锁,在检测,根据检测结果,决定下一步怎么走!这是我们刚才写代码的逻辑。

那有没有一种方法在实际操作的时候就把对应的资源情况得知呢?
有的,信号量!

关于信号量我们以前学过,今天我们真正的用起来。

首先回答什么是信号量

再说信号量之前在说一个东西
只要我们对资源进行整体加锁,就默认了我们对这个资源整体使用。这点要注意。
但是实际情况可能存在,一份公共资源,但是允许同时访问不同的区域!
就好比我们之前在信号量说过的电影院的例子,电影院在我们现实生活中是由我们共享的,它的工作模式是可以同时让我们去坐到自己的座位上,真正的竞争关系只在于访问同一个座位时存在冲突。

下面说说什么是信号量
a.信号量本质是一把计数器

之前说过电影院的例子是这样说的,每一个想进入电影院看电影的时候,是不是只有你屁股坐到电影院某个座位上这个座位才属于你,是这样的吗?并不是,只要我先买票,票买到了虽然人还没有坐到电影院的某个座位上,即使很长时间没有过去,但是我心里很清楚,只要我票买了这个座位就会给我预留着。所以我们对应申请信号量的动作,起始就相当于购买电影票的概念。也就是说未来想访问这一份公共资源或者公共资源的某一区域,所有线程都必须遵守先申请信号量!只有申请成功的线程,就一定能保证在这份公共资源里一定有一个位置给你预留着让你去访问。那一定有一个位置给你预留让你去访问怎么保证呢?这是由程序员编码保证不同的线程可以并发访问公共资源的不同区域!
什么意思呢?就是说我对一份公共资源进行划分,比如是数组,进来之前先申请信号量,申请成功的线程,程序员通过编码保证让不同的线程能够访问公共资源的不同区域。
因为信号量是一把计数器,它是衡量临界资源中资源数量多少的计数器
所以比如说电影院有100个座位,每一个人进入电影票之前你得先买票,而票最多100张,不管最后100个人最后怎么坐,至少我保证进入电影院的人不会超过100个,所以对100个人经过合理的规划让他们坐到不同的座位上,不就不会出现冲突吗,并且还会并发访问这个资源。
所以在真正访问临界资源前,先申请信号量这是第一点。

b.只要拥有信号量,就在未来一定能够拥有临界资源的一部分,所以申请信号量的本质:对临界资源中特定小块资源的 预定机制

这就对应去电影院买票看电影,你买到票了但你根本就没有去看电影,你很清楚只要我把电影院票买到了,我在未来想看的时候一定有这个座位给我留着。所以买票的本质是对座位的预定机制。

所以未来访问公共资源需要先申请信号量。
先申请信号量,信号量本质是一把计数器,这意味着什么?
是不是通过这两点,有可能我们在访问真正的临界之前,我们其实就可以提前知道临界资源的使用情况!!!

基于这一点就有可能解决以前那种先加锁在检测临界资源,而现在不用给我检测临界资源了,你给我检测信号量就行了,在检测信号量期间这一份公共资源不会被访问。

只要申请成功,就一定有你的资源!
只要申请失败,就说明条件不就绪,你只能等!

所以我们就可以将信号量的申请成功或失败,直接或间接判断资源就绪的情况,所以我们就不再判断了。

总结一些,其实就是我们发现之前的代码根本原因就是你根本就不知道这个临界资源中资源的使用情况所以你只能自己先加锁再检测,能生产就生产不能就把自己挂起,但今天每一个线程都必须先申请信号量,信号量是一把计数器是衡量临界资源数量多少的计数器,它也是对资源的预定机制,所以你先不要着急访问临界资源也不要着急判断,你直接去申请信号量,只要你申请成功这个资源就一定是就绪的你直接生产就行了,申请失败那对不起这个资源并不满足你只能等待,而不管成功还是失败你都没有去访问临界资源,所以通过这个的方式把公共资源的数目先暴露出来,来确定资源有没有就绪。这就是信号量!

什么是信号量:信号量的本质是一把计数器,它是用来对临界资源某一块资源的预定机制。

为什么要有信号量:在访问临界资源之前通过信号量的方式得知资源的使用情况。

按照我们这种说法
线程要访问临界资源中的某一区域 ---- 先申请信号量 ---- 所有人必须要先看到信号量 ---- 信号量本身必须是:公共资源

信号量是一把计数器,那它匹配的一定有 递增 or 递减
伪代码,假设sem_tsem=10;
sem- -(信号量- -的操作) ----> 申请资源 ----> 必须保证操作的原子性 ---->
P操作

sem++(信号量++的操作) ----> 归还资源 ----> 必须保证操作的原子性 ---->
V操作

信号量核心操作:PV原语

接下来看看信号量的基本使用接口(怎么办?)

初始化信号量

sem_t //信号量类型,这是由pthead库为我们维护的信号量

在这里插入图片描述
sem:定义的信号量
pshared:0表示线程间共享,非零表示进程间共享
value:该信号量计数器的初始值,这个值完全取决于临界资源数目的多少

销毁信号量

在这里插入图片描述

等待信号量

对信号量计数器减一,就是信号量对应匹配的P操作
在这里插入图片描述

发布信号量

表示资源使用完毕,可以归还资源了。将信号量值加1。对应信号量匹配的V操作
在这里插入图片描述

2.基于环形队列的生产消费模型

目前关于信号量我们学了50%,下面我们找一个场景写代码来帮我们更深层次的理解它。

在数据结构初阶我们学过环形队列,今天这里我们就不在关注它是如何实现的,而关注点在于基于环形队列的生产消费模型!
在这里插入图片描述
我们也知道环形队列在物理上是一个数组,可以用数组来实现。
在这里插入图片描述
关于环形队列就回顾到这里。

那环形队列在生产者消费者这里我们怎么用它。

单生产单消费为例
既然是一个环形队列,未来一定有个生产者在放数据,有个消费者在拿数据。最开始它们俩一定指向同一个位置。
在这里插入图片描述
对于生产者和消费者而言,它们俩在什么情况也会访问同一个位置呢?

  1. 空的时候
  2. 满的时候
  3. 其他情况,生产者和消费者,根本访问的就是不同的区域!

下面我们举个例子
今天我们想玩一个游戏,一张桌子,桌子有N个空的盘子
在这里插入图片描述
然后你和我手拉手到了这张桌子的旁边,我们走到这个桌子时,桌子上N个空盘子,
在这里插入图片描述

我们倆玩的是你追我跑的游戏,我不断的向盘子里放一个苹果,放一个之后立马就往后跑。你要直接把苹果从盘子上拿走然后你觉得不好吃,然后你继续追我。盘子里不能放两个或两个以上的苹果,你也不能从空盘子里假装拿到了苹果,我们在完追逐游戏的时候,怎么样保证游戏正常的运行呢?我们有如下几个原则!

  1. 你不能超过我
  2. 我不能把你套一个圈以上
  3. 我们俩什么时候,会站在一起?
    a . 盘子全为空
    b. 盘子上全都是苹果(满)
    c.其他情况,我们俩指向的是不同的位置!

盘子全为空我们俩站在一起让谁先运行呢?
我(生产者)
盘子全都是苹果我们俩又站在一起了这次让谁先运行呢?
你(消费者)

根据上面的例子我们有了一个结论:
在环形队列中,大部分情况,单生产者和单消费是可以并发执行的!
只有在满或者空的时候,才会有互斥与同步的问题!!

对应到我们刚刚的说的环形队列中,桌子以及一个个盘子就是我们的环形队列,我叫做生产者线程,你叫做消费者线程。生产者和消费者只有在空满的时候访问同一个位置,其他都访问的是不同区域,所以当然可以并发了。

所以为了完成环形队列生产消费问题,我们要做的核心工作是什么?
你—>消费者
我—>生产者

  1. 你不能超过我
  2. 我不能把你套一个圈以上
  3. 我们俩什么时候,会站在一起?

那如何保证上面的性质呢?
就用我们刚学的信号量!
信号量是用来衡量临界资源中资源数量的!

以前我们从来没有谈论过资源的问题。即使在阻塞队列中我们所说的资源就是这个队列,我们是把队列当做整体来使用的。因为我们加了锁所以我们认为队列是资源。但你在想想资源这个东西是不是各花入各眼。

1.对于生产者而言,看中的是什么?
队列中剩余的空间
2.对于消费者而言,看中的是什么?
放入队列中的数据

所以为了更好衡量生产者和消费者,我们给空间资源定义一个信号量,给数据资源定义一个信号量
有了这个信号量空间资源有多少剩余多少,数据资源有多少剩余多少,计数器给你确定,要生成和消费都先申请信号量,空间信号量只要申请成功环形队列一定有个位置给你生产,同样资源信号量只要申请成功环形队列一定有个数据给你消费。所以去访问就行了,至于访问那一个由我们程序员写代码来确定。

接下来写一些伪代码帮助理解
在这里插入图片描述
生产者你想进入环形队列,对不起先不要着急,你先进行P操作,申请信号量。
申请成功,你就可以继续向下运行。
申请失败,当前执行流,阻塞在申请处
在这里插入图片描述
申请成功并且把数据放到队列中之后,就要进行V操作,这里有一个问题生产者把数据生产到队列中,它要走了,那么它曾经申请的信号量在它生产结束之后归还了吗?答案是并没有!因为把数据生产到队列中你走了但你生产者生产的数据依旧在你曾经申请的资源空格里面放着,你人走了数据还在,数据还在格子就依旧被占用。 可是我还是做一下V操作啊,虽然我走了但是毕竟环形队列中多了一个数据,多了一个数据不就是多了一个消费者看中的资源吗,所以此时V不应该V的prodoctor_sem的信号量而是consumer_sem的信号量!
在这里插入图片描述
此时完成了一个生产过程。

对于消费者而言你首先也要进行P操作,申请信号量资源,你要申请的是消费者看中的数据资源。申请成功往下走从事消费活动。申请失败阻塞同样阻塞在申请处。
在这里插入图片描述
从事消费活动之后也要进行V操作,当消费者把队列中数据消费了走了数据已经不在了,那消费者申请的信号量数据有还吗?并没有!但是消费者把队列中数据拿走了,队列中的位置不就空出来一个吗。这意味着消费者消费一个数据那可以供我们生产的位置资源不就多了一个吗。所以消费者V的是prodoctor_sem
在这里插入图片描述

最开始环形队列是空的,生产者线程和消费者线程并发谁先运行呢?
答案是谁先运行不确定,但一定是谁申请成功谁往下运行!
当队列为空的是,一定是生产者申请成功继续往下执行代码。因为消费者最开始资源计数器为0只能是申请失败阻塞等待!当生产者执行完V操作之后,消费者consumer_sem一定由0->1。那消费者P操作一定成功继续往后运行。
所以当队列为空的时候,一定能保证只有一个执行流进来 ,一定只能保证生产者先运行!

假设极端场景,消费者就不消费,那么生产者能不能把消费者套一个圈,绕过消费者在继续生产呢?
不可以!因为信号量是一个计数器,生产者连续消费prodoctor_sem一直在减减,虽然consumer_sem一直在加加但是先不消费,一直生产生产最后prodoctor_sem减到0 ,它还想在生产还能申请到信号量吗?并不能!所以即便是消费者不消费,生产者随便生产,对不起你生产满了你在申请空间资源的时候你申请不到了,你就会阻塞挂起!所以生产者不能把消费者套一个圈

假设现在已经生产满了,
在这里插入图片描述
此时生产线程和消费线程谁先运行我们也不确定,但是生产一定先阻塞在申请处,消费申请资源成功继续往下运行。
所以满的时候生产和消费同时到来,生产一定申请不成功,进不到临界资源去访问。消费一定先运行!
生产者一直不生产,那有没有可能消费者把数据消费完了最终超过生产者呢?
并不会,消费完数据了已经没有数据可以消费了。消费者再去申请时只能阻塞挂起了!只能等生产者生产。所以消费者不能超过生产者

只有空满的时候我们俩会在同一个位置,你不能超过我,我不能把你套一个圈以上,除此之外生产者和消费者都是并发执行访问不同位置!

这些都是由信号量计数器来保证的!

未来,生产和消费的位置我们要想清楚。
1.其实就是队列中的下标
2.一定是两个下标
3.为空或者为满,下标相同

通过下标让生产和消费访问不同的位置,另外通过下标给我们的线程指派要访问的资源。

接下来我们写代码实现环形队列生产消费模型

上层调用逻辑大的框架

#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>

void* productor(void* args)
{
    ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);

    while(true)
    {
        
    }
}

void* consumer(void* args)
{
    ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);

    while(true)
    {
        
    }
}

int main()
{
    //随机数种子,这里为了更随机
    srand((unsigned int)time(nullptr)^getpid());
    ringqueue<int>* rq=new ringqueue<int>();

    pthread_t p,c;
    pthread_create(&p,nullptr,productor,rq);
    pthread_create(&c,nullptr,consumer,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);

    return 0;
}

环形队列大的逻辑框架

#pragma once

#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>

using namespace std;

const int maxcapacity=5;

template<class T>
class ringqueue
{
public:
    ringqueue(int capacity=maxcapacity):_queue(capacity),_capacity(capacity)
    {
        //空间资源初始化为环形队列大小
        sem_init(&_spacesem,0,_capacity);
        //数据资源初始化为0
        sem_init(&_datasem,0,0);
        
        _pstep=_cstep=0;
    }
	
	//生产
    void push(const T& in)
    {

    }
	
	//消费
    void pop(T* out)
    {

    }

    ~ringqueue()
    {
        //销毁
        sem_destroy(&_spacesem);
        sem_destroy(&_datasem);
    }


private:
    vector<T> _queue;//模拟环形队列
    int _capacity;//队列的大小,不能无线扩容
    sem_t _spacesem;//生产者生产看中的空间资源(信号量)
    sem_t _datasem;//消费者消费看中的数据资源(信号量)
    int _pstep;//生产者下标
    int _cstep;//消费者下标
};

接下来我们把代码补充完整

#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>

void* productor(void* args)
{
    ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);

    while(true)
    {
        //生产活动
        //version1
        int data=rand()&10+1;
        rq->push(data);
        cout<<"生产完成,生产的数据是: "<<data<<endl;
    }
}

void* consumer(void* args)
{
    ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);

    while(true)
    {
        //消费活动
        //version1
        int data;
        rq->pop(&data);
        cout<<"消费完成,消费的数据是: "<<data<<endl;
    }
}

int main()
{
    //随机数种子,这里为了更随机
    srand((unsigned int)time(nullptr)^getpid());
    ringqueue<int>* rq=new ringqueue<int>();

    pthread_t p,c;
    pthread_create(&p,nullptr,productor,rq);
    pthread_create(&c,nullptr,consumer,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);

     return 0;
}

现在有个问题未来生产者生产满了还能不能让生产者生产吗?
不能,消费者把数据消费完了也不能在消费了。所以理论上它俩也有同步的过程,满了不能再生产空了不能在消费。这样是不是阻塞队列很像啊,但是它和阻塞队列不同的点是,如果生产和消费不是同一个位置,它们是可以同时生产和消费的。

#pragma once

#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>
#include<cassert>

using namespace std;

const int maxcapacity=5;

template<class T>
class ringqueue
{
public:
    ringqueue(int capacity=maxcapacity):_queue(capacity),_capacity(capacity)
    {
        //空间资源初始化为环形队列大小
        int n=sem_init(&_spacesem,0,_capacity);
        assert(n == 0);
        //数据资源初始化为0
        n=sem_init(&_datasem,0,0);
        assert(n == 0);

        _pstep=_cstep=0;
    }

    void push(const T& in)
    {
        //1.申请空间资源P操作, 成功意味着我一定能进行正常的生产,失败阻塞挂起
        P(_spacesem);
        //2.往对应生产下标处生产
        _queue[_pstep++]=in;
        _pstep%=_capacity;//为了是一个环形的
        //3.环形队列多了一个消费资源
        V(_datasem);  
    }

    void pop(T* out)
    {
        P(_datasem);//申请成功,意味着一定能进行正常的消费
        *out=_queue[_cstep++];//从对应的消费下标处消费
        _cstep%=_capacity;
        V(_spacesem);//环形队列多了一个空间资源

    }

    ~ringqueue()
    {
        //销毁
        sem_destroy(&_spacesem);
        sem_destroy(&_datasem);
    }
private:
    void P(sem_t& sem)//对信号量做--
    {
        int n=sem_wait(&sem);
        assert(n == 0);
        (void)n;
    }

    void V(sem_t& sem)//对信号量做++
    {
        int n=sem_post(&sem);
        assert(n == 0);
        (void)n;
    }


private:
    vector<T> _queue;//模拟环形队列
    int _capacity;//队列的大小,不能无线扩容
    sem_t _spacesem;//生产者生产看中的空间资源(信号量)
    sem_t _datasem;//消费者消费看中的数据资源(信号量)
    int _pstep;//生产者下标
    int _cstep;//消费者下标
};

未来生产和消费谁先运行不知道,但是在初始时我们把空间资源信号量置为最大值,数据资源信号量置为0。最开始生产者和消费者同时到来 ,对不起只有生产者P操作成功,消费者P操作失败所以消费者只能等。所以为空只能是生产者先生产。而满的时候,只能是消费者消费生产者只能等。

void* productor(void* args)
{
    ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);

    while(true)
    {
        //生产活动
        //version1
        int data=rand()&10+1;
        rq->push(data);
        cout<<"生产完成,生产的数据是: "<<data<<endl;
        sleep(1);//生产慢一点
    }
}

在这里插入图片描述

void* consumer(void* args)
{
    ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);

    while(true)
    {
        //消费活动
        //version1
        int data;
        rq->pop(&data);
        cout<<"消费完成,消费的数据是: "<<data<<endl;
        sleep(1);//消费慢一点
    }

在这里插入图片描述
环形队列生产和消费的过程一定是可以并行的,因为只有当空和满的时候才会只有一个执行流。不过我们这里看不出来。

下面我们再把在阻塞队列写的任务拿过去,让环形队列跑看看效果

#pragma once
#include <iostream>
#include <functional>
#include <string>
using namespace std;

class Task
{
    typedef function<int(int, int, char)> func_t;

public:
    Task() {}

    Task(int x, int y, char op, func_t func) : _x(x), _y(y), _op(op), _callback(func)
    {
    }

    // 把任务返回去可以看到
    string operator()()
    {
        int result = _callback(_x, _y, _op);
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);
        return buffer;
    }

    // 把生产的任务也打印出来
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);
        return buffer;
    }

private:
    int _x;
    int _y;
    char _op;         // 对应+-*/%操作
    func_t _callback; // 回调函数
};

string oper = "+-*/%";

// 回调函数
int mymath(int x, int y, char op)
{
    int result = 0;
    switch (op)
    {
    case '+':
        result = x + y;
        break;
    case '-':
        result = x - y;
        break;
    case '*':
        result = x * y;
        break;
    case '/':
    {
        if (y == 0)
        {
            cout << "div zero error" << endl;
            result = -1;
        }
        else
        {
            result = x / y;
        }
    }
    break;
    case '%':
    {
        if (y == 0)
        {
            cout << "mod zero error" << endl;
            result = -1;
        }
        else
        {
            result = x % y;
        }
    }
    break;
    default:
        break;
    }
    return result;
}
#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"Task.hpp"

void* productor(void* args)
{
    //ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
    ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);


    while(true)
    {
        //生产活动
        //version1
        // int data=rand()&10+1;
        // rq->push(data);
        // cout<<"生产完成,生产的数据是: "<<data<<endl;
        //sleep(1);//生产慢一点

        //version2
        //构建or获取任务
        int x=rand()%10+1;
        int y=rand()%5;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        //生产任务
        rq->push(t);
        //输出提示
        cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
}

void* consumer(void* args)
{
    //ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
    ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);

    while(true)
    {
        //消费活动
        //version1
        // int data;
        // rq->pop(&data);
        // cout<<"消费完成,消费的数据是: "<<data<<endl;
        // sleep(1);//消费慢一点

        //vecrsion2
        Task t;
        //消费任务
        rq->pop(&t);
        cout<<"消费者消费了一个任务: "<<t()<<endl;
    }
}

int main()
{
    //随机数种子,这里为了更随机
    srand((unsigned int)time(nullptr)^getpid());
    ringqueue<Task>* rq=new ringqueue<Task>();

    pthread_t p,c;
    pthread_create(&p,nullptr,productor,rq);
    pthread_create(&c,nullptr,consumer,rq);

    pthread_join(p,nullptr);
    pthread_join(c,nullptr);

    return 0;
}

在这里插入图片描述
目前我们写的代码都是单生成单消费,那多生产多消费该怎么写呢?
根据321原则,生产与生产互斥关系,消费与消费互斥关系,生产与消费互斥同步,目前只维护了生产和消费的互斥与同步关系。那生产与生产,消费与消费之前关系该如何维护呢?不然就写不出多生产多消费!

在以前我们的阻塞队列中无论是多个生产者还是多少消费者无论在任何时刻只有一个线程在阻塞队列中访问临界资源。今天我们是环形队列的生产消费模型,生产生产消费消费也是互斥关系,所以无论无何在任何时刻是不是只允许一个生产者或者一个消费者先进入到临界区里面,就是让生产者之间先竞争选出一个胜利者,消费者之间竞争选出一个胜利者,然后生产者和消费者胜利的这个人在结合队列是不是空的慢的,能不能并发,然后确定如何生产消费。
所以说在环形队列中多生产多消费,只要保证,最终进入临界区的一个是生产,一个是消费就行了。

所以我们需要加锁,并且是两把锁,一把生产者竞争用的锁,一把消费者竞争用的锁。你们内部之间先竞争处一个胜出者然后在进行临界资源的访问。

#pragma once

#include<iostream>
#include<pthread.h>
#include<vector>
#include<semaphore.h>
#include<cassert>

using namespace std;

const int maxcapacity=5;

template<class T>
class ringqueue
{
public:
    ringqueue(int capacity=maxcapacity):_queue(capacity),_capacity(capacity)
    {
        //空间资源初始化为环形队列大小
        int n=sem_init(&_spacesem,0,_capacity);
        assert(n == 0);
        //数据资源初始化为0
        n=sem_init(&_datasem,0,0);
        assert(n == 0);
        pthread_mutex_init(&_plock,nullptr);
        pthread_mutex_init(&_clock,nullptr);

        _pstep=_cstep=0;
    }

    void push(const T& in)
    {
        pthread_mutex_lock(&_plock);
        //1.申请空间资源P操作, 成功意味着我一定能进行正常的生产,失败阻塞挂起
        P(_spacesem);
        //2.往对应生产下标处生产
        _queue[_pstep++]=in;
        _pstep%=_capacity;//为了是一个环形的
        //3.环形队列多了一个消费资源
        V(_datasem);  
        pthread_mutex_unlock(&_plock);
    }

    void pop(T* out)
    {
        pthread_mutex_lock(&_clock);
        P(_datasem);//申请成功,意味着一定能进行正常的消费
        *out=_queue[_cstep++];//从对应的消费下标处消费
        _cstep%=_capacity;
        V(_spacesem);//环形队列多了一个空间资源
        pthread_mutex_unlock(&_clock);

    }

    ~ringqueue()
    {
        //销毁
        sem_destroy(&_spacesem);
        sem_destroy(&_datasem);
        pthread_mutex_destroy(&_plock);
        pthread_mutex_destroy(&_clock);
    }
private:
    void P(sem_t& sem)//对信号量做--
    {
        int n=sem_wait(&sem);
        assert(n == 0);
        (void)n;
    }

    void V(sem_t& sem)//对信号量做++
    {
        int n=sem_post(&sem);
        assert(n == 0);
        (void)n;
    }
    
private:
    vector<T> _queue;//模拟环形队列
    int _capacity;//队列的大小,不能无线扩容
    sem_t _spacesem;//生产者生产看中的空间资源(信号量)
    sem_t _datasem;//消费者消费看中的数据资源(信号量)
    int _pstep;//生产者下标
    int _cstep;//消费者下标
    pthread_mutex_t _plock;//生产者的锁
    pthread_mutex_t _clock;//消费者的锁
};

我们先测试一下再说

#include"ringqueue.hpp"
#include<ctime>
#include<sys/types.h>
#include<unistd.h>
#include"Task.hpp"

string Selfname()
{
    char name[64];
    snprintf(name,sizeof name,"thread[0x%x]",pthread_self());
    return name;
}

void* productor(void* args)
{
    //ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
    ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);


    while(true)
    {
        //生产活动
        //version1
        // int data=rand()&10+1;
        // rq->push(data);
        // cout<<"生产完成,生产的数据是: "<<data<<endl;
        //sleep(1);//生产慢一点

        //version2
        //构建or获取任务

        int x=rand()%10+1;
        int y=rand()%5;
        char op=oper[rand()%oper.size()];
        Task t(x,y,op,mymath);
        //生产任务
        rq->push(t);
        //输出提示
        //cout<<"生产者派发了一个任务: "<<t.toTaskString()<<endl;
        cout<<Selfname()<<" ,生产者派发了一个任务: "<<t.toTaskString()<<endl;

        sleep(1);

    }
}

void* consumer(void* args)
{
    //ringqueue<int>* rq=static_cast<ringqueue<int>*>(args);
    ringqueue<Task>* rq=static_cast<ringqueue<Task>*>(args);

    while(true)
    {
        //消费活动
        //version1
        // int data;
        // rq->pop(&data);
        // cout<<"消费完成,消费的数据是: "<<data<<endl;
        // sleep(1);//消费慢一点

        //vecrsion2
        Task t;
        //消费任务
        rq->pop(&t);
        //cout<<"消费者消费了一个任务: "<<t()<<endl;
        cout<<Selfname()<<" ,消费者消费了一个任务: "<<t()<<endl;
    }
}

int main()
{
    //随机数种子,这里为了更随机
    srand((unsigned int)time(nullptr)^getpid());
    //ringqueue<int>* rq=new ringqueue<int>();
    ringqueue<Task>* rq=new ringqueue<Task>();

    pthread_t p[4],c[2];
    for(int i=0;i<4;++i)
    {
        pthread_create(p+i,nullptr,productor,rq);
    }
    
    for(int i=0;i<2;++i)
    {
        pthread_create(c+i,nullptr,consumer,rq);
    }

    for(int i=0;i<4;++i)
    {
        pthread_join(p[i],nullptr);
    }
    
    for(int i=0;i<2;++i)
    {
        pthread_join(c[i],nullptr);        
    }

    return 0;
}

在这里插入图片描述
效果虽然不明显,但是确实是多生产多消费。

但是我们的加锁逻辑有一些问题,这个加锁有没有优化的可能
在这里插入图片描述
你认为:先加锁,后申请信号量,还是先申请信号量,在加锁?
我们维护的互斥关系是为了保护信号量吗?信号量大手一挥瞥了瞥眼同情的看这这把锁,你保护我干什么,我的申请本来就是原子的,我申请又不会出问题你保护我干啥呢?
我们是先申请锁然后在申请信号量等到走完之后才释放了锁,而另一个生产者也只能是先申请锁在申请信号量,那么在你持有锁之间我可不可以先把信号量指派给不同的线程呢?

这种先申请锁在申请信号量一定比较低,因为先申请锁意味着后序的线程,只能在锁上等,当持有锁线程把锁释放了,后序线程才能去申请信号量。

因此正确写法如下

    void push(const T& in)
    {
        //1.申请空间资源P操作, 成功意味着我一定能进行正常的生产,失败阻塞挂起
        P(_spacesem);
        pthread_mutex_lock(&_plock);
        //2.往对应生产下标处生产
        _queue[_pstep++]=in;
        _pstep%=_capacity;//为了是一个环形的
        //3.环形队列多了一个消费资源
        pthread_mutex_unlock(&_plock);
        V(_datasem);  
    }

    void pop(T* out)
    {
        P(_datasem);//申请成功,意味着一定能进行正常的消费
        pthread_mutex_lock(&_clock);
        *out=_queue[_cstep++];//从对应的消费下标处消费
        _cstep%=_capacity;
        pthread_mutex_unlock(&_clock);
        V(_spacesem);//环形队列多了一个空间资源

    }

这样反过来,就相当于让每个线程先去领取任务,领到任务成功之后进去就一个个进。但是在你一个线程在里面做访问的时候,我可以在外面不断的派发任务,这样加锁就是一个线程进去的时候其他线程不是干等,还有信号量吗我拿一个,还有信号量吗我拿一个。因为是串行的,这样进去是一个一个进去的并且访问的是不同的下标。

问:
多线程访问环形队列时,进入队列最多几个线程?最少几个线程?
最多有两个线程,生产者和消费者同时并发的访问。
最少只有一个线程,队列为空为满的时候。
但无论什么时候,最多只有一个生产者一个消费者进入这个队列中。

问:多生产多消费的效率高在哪里?(阻塞队列说过)
获取or构建任务这个是要花时间,并不像我们写的那样随便来个随机数就行了,将来可能会从外设,网络,文件,数据库里读取数据的。
消费的时候也是要花时间的,并不是把任务从环形队列中拿到就完了,后面可能大量的时间在处理这个任务。
所以多生产多效果高效在生产之前消费之后可以并发执行获取任务和处理任务!

至此我们的环形队列结束了。下一篇我们根据在线程这块所学的知识写一个线程池!

文章来源:https://blog.csdn.net/fight_p/article/details/135392450
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。