生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
类似之前我们学过的管道
问:我们怎么知道我们要让一个线程去休眠了那?
答:一定是临界资源不就绪,没错,临界资源也是有状态的!!
问:你怎么知道临界资源是就绪还是不就绪的?你判断出来的!判断是访问临界资源吗?
答:必须是的,也就是判断必须在加锁之后!!
判断当前线程是否要进入等待,需要判断是否满足临界资源的条件
而我们判断临界资源本质是在访问临界区,所有这个操作只能在加锁之后
#pragma once
#include<iostream>
#include<queue>
#include<pthread.h>
using namespace std;
template<class T>
class BlockQueue{
const static int defaultnum=20;
public:
BlockQueue(int maxcap=defaultnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&c_cond_,nullptr);
pthread_cond_init(&p_cond_,nullptr);
low_water_ = maxcap_/3;//消费到低于到这个数字时,就唤醒生产者生产
high_water_ = (maxcap_*2)/3;//生产到这个数字时,就唤醒消费者过来消费
}
void push(const T&in){
pthread_mutex_lock(&mutex_);
// 我们怎么知道我们要让一个线程去休眠了那?一定是临界资源不就绪,没错,临界资源也是有状态的!!
// 你怎么知道临界资源是就绪还是不就绪的?你判断出来的!判断是访问临界资源吗?必须是的,也就是判断必须在加锁之后!!
if(q.size()==maxcap_){
//满了,则生产者就去休眠
pthread_cond_wait(&p_cond_,&mutex_);
// pthread_cond_wait让线程等待的时候,会自动释放锁!
}
// 1. 队列没满 2.被唤醒
q.push(in);
// 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
if(q .size() > high_water_) pthread_cond_signal(&c_cond_);
//当产品数量大于high的时候唤醒消费者队列,让其过来消费
pthread_mutex_unlock(&mutex_);
}
T pop(){
pthread_mutex_lock(&mutex_);
//判断当前线程是否要进入等待,需要判断是否满足临界资源的条件
//而我们判断临界资源本质是在访问临界区,所有这个操作只能在加锁之后
if(q.size()==0){
pthread_cond_wait(&c_cond_,&mutex_);
}
T out=q.front();
// 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足
q.pop();
if(q.size()<low_water_){
pthread_cond_signal(&p_cond_ );
}
pthread_mutex_unlock(&mutex_);
return out;
}
~BlockQueue(){
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
int low_water_;//消费到低于到这个数字时,就唤醒生产者生产
int high_water_;//生产到这个数字时,就唤醒消费者过来消费
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;//消费者的等待队列
pthread_cond_t p_cond_;//生产者的等待队列
queue<T>q;//共享资源
int maxcap_;//阻塞队列的最大容量
};
#include "BlockQueue.hpp"
#include <unistd.h>
#include <time.h>
void *Customer(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
while (true)
{
int n = bq->pop();
cout << "Customer get " << n << endl;
}
}
void *Productor(void *args)
{
BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
srand((unsigned int)time(nullptr));
int n = 0;
// 产生随机数放入队列中
while (true)
{
sleep(1);
n = rand() % 100 + 1;
bq->push(n);
cout << "Productor create n: " << n << endl;
}
}
int main()
{
BlockQueue<int> *bq = new BlockQueue<int>();
// 根据目标所需,创建存储不同类型的阻塞队列
pthread_t c, p;
pthread_create(&c, nullptr, Customer, bq); // 消费者线程
pthread_create(&p, nullptr, Productor, bq); // 生产者线程
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}