基于阻塞队列下的生产者消费者模型(多线程)

发布时间:2023年12月23日

一、生产者消费者模型

321原则(便于自己记忆)

1.1 为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

1.2 生产者消费者模型优点

1、解耦
2、支持并发(效率高)
3、支持忙闲不均

1.3 基于BlockingQueue的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述

1.4 C++用queue模拟阻塞队列的生产消费模型

单生产者,单消费者

1.4.1 makefile

cp:main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f cp

1.4.2 BlockQueue.hpp


#include <iostream>
using namespace std;
#include <pthread.h>

#include <queue>
#include "LockGuard.hpp"

static const size_t DefaultCapacity = 5;

template <class T>
class BlockQueue
{
public:
    BlockQueue(const size_t capacity = DefaultCapacity)
        : _capacity(capacity)
    {
        // 初始化互斥锁和环境变量
        pthread_mutex_init(&_mtx, nullptr);
        pthread_cond_init(&_Full, nullptr);
        pthread_cond_init(&_Empty, nullptr);
    }
    ~BlockQueue()
    {
        // 销毁互斥锁和环境变量
        pthread_mutex_destroy(&_mtx);
        pthread_cond_destroy(&_Full);
        pthread_cond_destroy(&_Empty);
    }

    // void Push(const T &in)
    // {
    //     // 为啥要先加锁,再判断size与_capacity的大小关系,
    //     // 因为判断也是访问临界资源,所以要先加锁,保证安全
    //     pthread_mutex_lock(&_mtx);
    //     // 临界区
    //     // 为什么要while循环判断,因为条件变量可能存在伪唤醒
    //     // 的情况,即可能条件变量并不满足条件,但是由于一些错
    //     // 误导致条件变量被唤醒了,或者pthread_cond_wait
    //     // 函数调用失败了,这时并不是直接往下执行,而是再循环
    //     // 判断条件是否已经满足
    //     while (_bq.size() == _capacity)
    //     {
    //         // 队列满了就到_Full的条件变量下等待,
    //         pthread_cond_wait(&_Full, &_mtx);
    //     }

    //     // 来到这里说明阻塞队列一定满足Push的条件,即阻塞队列
    //     // 一定不是满的,所以可以直接Push
    //     _bq.push(in);

    //     // 既然插入了数据,那么队列就一定有数据,所以这时就可
    //     // 以通知消费者来消费了,消费者是在_Empty的条件变量下
    //     // 等待的,所以此时可以向条件变量_Empty发送就绪信号
    //     pthread_cond_signal(&_Empty);

    //     printf("0x%x",pthread_self());
    //     cout <<":生产者生成了一个任务: " << in._x << " " << in._op 
    //         << " " << in._y << " = ?" << endl;

    //     pthread_mutex_unlock(&_mtx);
    // }


    void Push(const T &in)
    {
        // 为啥要先加锁,再判断size与_capacity的大小关系,
        // 因为判断也是访问临界资源,所以要先加锁,保证安全
        
        //RAII风格的锁
        LockGuard lock(&_mtx);
        // 临界区
        // 为什么要while循环判断,因为条件变量可能存在伪唤醒
        // 的情况,即可能条件变量并不满足条件,但是由于一些错
        // 误导致条件变量被唤醒了,或者pthread_cond_wait
        // 函数调用失败了,这时并不是直接往下执行,而是再循环
        // 判断条件是否已经满足
        while (_bq.size() == _capacity)
        {
            // 队列满了就到_Full的条件变量下等待,
            pthread_cond_wait(&_Full, &_mtx);
        }

        // 来到这里说明阻塞队列一定满足Push的条件,即阻塞队列
        // 一定不是满的,所以可以直接Push
        _bq.push(in);

        // 既然插入了数据,那么队列就一定有数据,所以这时就可
        // 以通知消费者来消费了,消费者是在_Empty的条件变量下
        // 等待的,所以此时可以向条件变量_Empty发送就绪信号
        pthread_cond_signal(&_Empty);

        printf("0x%x",pthread_self());
        cout <<":生产者生成了一个任务: " << in._x << " " << in._op 
            << " " << in._y << " = ?" << endl;

    }


    void Print(const T& out)
    {
        int x = out._x;
        int y = out._y;
        int z = 0;
        switch (out._op)
        {
        case '+':
        {
            z = x + y;
            break;
        }
        case '-':
        {
            z = x - y;
            break;
        }
        case '*':
        {
            z = x * y;
            break;
        }
        case '/':
        {
            z = x / y;
            break;
        }
        case '%':
        {
            z = x % y;
            break;
        }
        default:
        {
            cout << "运算符错误!!!" << endl;
            exit(1);
        }
        }
        printf("0x%x",pthread_self());
        cout <<":消费者消费了一个任务 : " << out._x 
        << " " << out._op << " " << out._y << " = " << z << endl;
    }

    // T Pop()
    // {
    //     pthread_mutex_lock(&_mtx);
    //     // 循环判断,原因用上面的Push
    //     while (_bq.empty())
    //     {
    //         pthread_cond_wait(&_Empty, &_mtx);
    //     }

    //     // 来到这里说明队列一定不为空,此时就可以Pop一个数据了
    //     T out = _bq.front();
    //     _bq.pop();
    //     Print(out);

    //     // 因为我们刚刚pop了一个数据,所以阻塞队列一定不是满
    //     // 的,此时可以通知生产者来生产数据了
    //     pthread_cond_signal(&_Full);

    //     pthread_mutex_unlock(&_mtx);

    //     return out;
    // }

    T Pop()
    {
        //RAII风格的锁
        LockGuard lock(&_mtx);
        // 循环判断,原因用上面的Push
        while (_bq.empty())
        {
            pthread_cond_wait(&_Empty, &_mtx);
        }

        // 来到这里说明队列一定不为空,此时就可以Pop一个数据了
        T out = _bq.front();
        _bq.pop();
        Print(out);

        // 因为我们刚刚pop了一个数据,所以阻塞队列一定不是满
        // 的,此时可以通知生产者来生产数据了
        pthread_cond_signal(&_Full);
        return out;
    }

private:
    queue<T> _bq;          // 阻塞队列
    size_t _capacity;      // 阻塞队列的容量
    pthread_mutex_t _mtx;  // 互斥锁
    pthread_cond_t _Full;  // 表示阻塞队列已经满了的条件变量
    pthread_cond_t _Empty; // 表示阻塞队列已经空了的条件变量
};

1.4.3 LockGuard.hpp

#include <pthread.h>


class Mtx
{
public:
    Mtx(pthread_mutex_t* mtx)
        :_mtx(mtx)
    {}

    ~Mtx()
    {}

    void Lock()
    {
        pthread_mutex_lock(_mtx);
    }

    void Unlock()
    {
        pthread_mutex_unlock(_mtx);
    }

private:
    pthread_mutex_t* _mtx;
};

//RAII风格的锁
//创建这个对象就是上锁,析构这个对象就是解锁
class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mtx)
        :_mtx(mtx)//单参数的构造函数支持隐式类型的转换
    {
        _mtx.Lock();
    }
    ~LockGuard()
    {
        _mtx.Unlock();
    }

private:
    Mtx _mtx;
};

1.4.4 Task.hpp

class Task
{
public:
    Task(const int x,const int y,const char op)
        :_x(x)
        ,_y(y)
        ,_op(op)
    {}

public:
    int _x;
    int _y;
    char _op;
};

1.4.5 main.cc

#include <iostream>
using namespace std;
#include <ctime>
#include <unistd.h>

#include "BlockQueue.hpp"
#include "Task.hpp"

void *producter(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        int x = 0;
        int y = 0;
        char op;
        cout << "Please input the first num : ";
        cin >> x;
        cout << "Please input the second num : ";
        cin >> y;
        cout << "Please input the operator : ";
        cin >> op;

        Task t(x, y, op);
        bq->Push(t);
        usleep(1000);

    }
    return nullptr;
}

void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while (true)
    {
        sleep(1);
        Task out = bq->Pop();
    }
    return nullptr;
}

int main()
{
    srand((unsigned int)time(nullptr));
    pthread_t c, p;
    BlockQueue<int> bq;

    pthread_create(&p, nullptr, producter, &bq);
    pthread_create(&c, nullptr, consumer, &bq);

    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

二、Linux多线程内容一览图

在这里插入图片描述

以上就是基于阻塞队列下实现生产者消费者模型啦,你学会了吗?如果感觉到有所收获的话,那就点点小心心,点点关注呗,后期还会持续更新Linux系统编程的相关知识哦,我们下期见!!!

文章来源:https://blog.csdn.net/weixin_70056514/article/details/135174992
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。