目录
基于BlockingQueue(阻塞队列)的生产者消费者模型
生产者消费者模型是多线程同步与互斥的一个经典场景,我们生活中也有许多的消费者与生产者模型,下面我们来举一个生活中的生产者消费者模型的例子——超市
那么超市的作用是什么呢?
下面我们再来分析一下生产者、消费者之间有什么关系?
为什么消费者与消费者之间是互斥关系?
假如说现在是世界末日,超市里面现在就只有一包方便面,然后你和另外一个人正好要去买这瓶水,这是不是就是互斥关系了呢?所以消费者与消费者之间是存在互斥关系的。
为什么生产者与消费者之间存在同步关系?
生产者和消费者在操作上需要相互协调和配合,以确保生产的顺利进行。例如,在生产线上,生产者需要等待上一道工序完成后才能进行下一道工序,而消费者则需要等待生产者完成生产后才能进行消费。在这种情况下,生产者和消费者需要在时间上保持同步,以确保整个生产的协调和稳定。
了解了生产者消费者模型之后,下面我来教大家一个方法让大家快速记住它。
321原则
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列区别:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
template <class T>
class BlockQueue
{
static const int defaultnum = 4;
public:
BlockQueue(int maxcap = defaultnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&c_cond_,nullptr);
pthread_cond_init(&p_cond_,nullptr);
}
// 谁来唤醒呢?
T pop()
{
pthread_mutex_lock(&mutex_);
if(q_.size() == 0)
{
pthread_cond_wait(&c_cond_,&mutex_);
}
T out = q_.front();// 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足
q_.pop();
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T& num)
{
pthread_mutex_lock(&mutex_);
if(q_.size() == maxcap_)
{
pthread_cond_wait(&p_cond_,&mutex_);
}
//1.队列没有满 2.被唤醒
q_.push(num);// 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源
int maxcap_; // 极值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
};
- 我们实现的是单生产者、单消费者的生产者-消费者模型,所以我们主要关注生产者和消费者之间的同步与互斥关系。模型中不需要维护生产者与生产者、消费者与消费者之间的关系,因为只有一个生产者和一个消费者。
- 为了方便使用,我们采用了模板化的数据存储在阻塞队列中。这个阻塞队列是临界资源,因为它会被生产者和消费者同时访问。为了确保数据的一致性和正确性,我们使用互斥锁来保护这个临界资源。
- 当生产者想要向队列中Push数据时,它首先需要检查队列是否还有空间。如果没有空间,生产者会挂起等待,直到队列中有可用空间。同样,消费者在尝试从队列中Pop数据时,也会先检查队列是否为空。如果为空,消费者会挂起等待,直到队列中有数据。
- 因此在这里我们需要用到两个条件变量,一个条件变量用来描述队列为空,另一个条件变量用来描述队列已满。当阻塞队列满了的时候,要进行生产的生产者线程就应该在full条件变量下进行等待;当阻塞队列为空的时候,要进行消费的消费者线程就应该在empty条件变量下进行等待。
- 在这个模型中,无论是生产者还是消费者,它们在进入临界区之前都会先申请锁。如果条件不满足(如队列满或队列空),对应的线程会挂起。但此时该线程是拿着锁的,为了避免死锁问题,当线程调用
pthread_cond_wait
函数时,就需要传入当前线程手中的互斥锁,当该线程被挂起时它会释放手中的互斥锁。当线程被唤醒时,它会重新获取这个互斥锁。这样,即使在多线程环境中,也能保证对临界资源的正确访问和数据的完整性。- 谁来唤醒?谁最清楚队列里面有没有数据——生产者!所以我们用生产者来唤醒消费者消费数据。谁最清楚队列里面有没有空间——消费者!所以我们用消费者来唤醒生产者。
主函数:
#include "BlockQueue.hpp"
#include <unistd.h>
void* Consumer(void* args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
//消费者不断进行生产
while (true)
{
int data = bq->pop();
cout << "消费者消费了一个数据: " << data <<endl;
sleep(1);
}
}
void* Productor(void* args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
//生产者不断进行生产
while (true)
{
int data = rand()%100;
bq->push(data);
cout << "生产者生产了一个数据: " << data << endl;
sleep(1);
}
}
int main()
{
//种一颗随机数种子
srand((long long)time(nullptr));
BlockQueue<int>* bq = new BlockQueue<int>;
//创建生产者线程和消费者线程
pthread_t c,p;
pthread_create(&c,nullptr,Consumer,bq);
pthread_create(&p,nullptr,Productor,bq);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
delete bq;
return 0;
}
- 阻塞队列要让生产者线程向队列中Push数据,让消费者线程从队列中Pop数据,因此这个阻塞队列必须要让这两个线程同时看到,所以我们在创建生产者线程和消费者线程时,需要将该阻塞队列作为线程执行例程的参数进行传入。
代码运行后我们可以看到生产者和消费者的执行步调是一致的。
运行结果:
我们让生产者每隔一秒生产一个数据,消费者每隔两秒消费一个数据。过了一段时间后,阻塞队列被塞满了数据,此时生产者要进行等待同时通知消费者来消费,此时消费者消费一个数据,然后生产者被唤醒进而继续生产数据。此时就会变成生产者每生成一个数据,消费者消费就会消费一个数据,所以后面我们就看到了生产者和消费者步调又一致了。
运行结果:
我们让生产者每隔两秒生产一个数据,消费者每隔一秒消费一个数据。由于生产者生产的慢,此时阻塞队列里面没数据,所以消费者就需要进行等待,直达到有数据了才可以进行消费,因此消费者它的步调会随着生产者走。?
我们设置了两个参数:low_water_ 和?int high_water_,当队列中的数据小于low_water_时,消费者唤醒生产者生产数据;而当队列中的数据大于high_water_时,生产者唤醒消费者进行消费数据。
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
template <class T>
class BlockQueue
{
static const int defaultnum = 20;
public:
BlockQueue(int maxcap = defaultnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&c_cond_,nullptr);
pthread_cond_init(&p_cond_,nullptr);
low_water_ = maxcap_/3;
high_water_ = (maxcap_*2)/3;
}
// 谁来唤醒呢?
T pop()
{
pthread_mutex_lock(&mutex_);
if(q_.size() == 0)
{
pthread_cond_wait(&c_cond_,&mutex_);
}
T out = q_.front();
q_.pop();
if(q_.size() < low_water_)
{
pthread_cond_signal(&p_cond_);
}
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T& num)
{
pthread_mutex_lock(&mutex_);
if(q_.size() == maxcap_)
{
pthread_cond_wait(&p_cond_,&mutex_);
}
//1.队列没有满 2.被唤醒
q_.push(num);
if(q_.size() > high_water_)
{
pthread_cond_signal(&c_cond_);
}
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源
//int mincap_;
int maxcap_; // 极值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
int low_water_;
int high_water_;
};
运行结果:
可以看到我们这一次只有在队列的数据小于low_water_才会通知生产者进行生产数据,只有在队列的数据大于high_water_的一半时才会通知消费者进行消费数据。?
我们的生产者消费者模型可不是只能像上面那样生产者生成一个数据,消费者消费一个数据仅仅打印一个数字而已。
我们还可以自己定义一个Task的类,然后实现一个基于计算任务的生产者消费者模型,生产者生产任务,消费者去处理这个任务然后计算出答案。
Task.hpp
#pragma once
#include <iostream>
#include <string>
using namespace std;
string opers = "+-*/%";
enum{
DivZero = 1,
ModZero,
Unknown
};
class Task
{
public:
Task(int x,int y,char op)
:data1_(x),data2_(y),oper_(op),result_(0),exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
}
break;
default:
exitcode_ = Unknown;
break;
}
}
void operator()()
{
run();
}
string GetResult()
{
string r = to_string(data1_);
r+=oper_;
r+=to_string(data2_);
r+="=";
r+=to_string(result_);
r+="[code:";
r+=to_string(exitcode_);
r+="]";
return r;
}
string GetTask()
{
string r = to_string(data1_);
r+=oper_;
r+=to_string(data2_);
r+="=?";
return r;
}
~Task()
{}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
BlockQueue.hpp:
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
using namespace std;
template <class T>
class BlockQueue
{
static const int defaultnum = 20;
public:
BlockQueue(int maxcap = defaultnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_,nullptr);
pthread_cond_init(&c_cond_,nullptr);
pthread_cond_init(&p_cond_,nullptr);
// low_water_ = maxcap_/3;
// high_water_ = (maxcap_*2)/3;
}
// 谁来唤醒呢?
T pop()
{
pthread_mutex_lock(&mutex_);
while(q_.size() == 0) // 因为判断临界资源调试是否满足,也是在访问临界资源!判断资源是否就绪,是通过再临界资源内部判断的。
{
// 如果线程wait时,被误唤醒了呢??
pthread_cond_wait(&c_cond_,&mutex_); // 你是持有锁的!!1. 调用的时候,自动释放锁,因为唤醒而返回的时候,重新持有锁
}
T out = q_.front(); // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足
q_.pop();
// if(q_.size() < low_water_) pthread_cond_signal(&p_cond_);
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T& num)
{
pthread_mutex_lock(&mutex_);
while(q_.size() == maxcap_)// 这里用while作为判断条件,做到防止线程被伪唤醒的情况
{
// 伪唤醒情况?
pthread_cond_wait(&p_cond_,&mutex_);//1. 调用的时候,自动释放锁 2.?
}
//1.队列没有满 2.被唤醒
q_.push(num);// 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源, q被当做整体使用的,q只有一份,加锁。但是共享资源也可以被看做多份!
int maxcap_; // 极值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
// int low_water_;
// int high_water_;
};
main.c:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
void* Consumer(void* args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while (true)
{
//消费
Task t = bq->pop();
//计算
// t.run();
t();
cout << "处理任务:" << t.GetTask() << "处理任务的结果是:" << t.GetResult() << "thread_id: " << pthread_self() <<endl;
// sleep(1);
}
}
void* Productor(void* args)
{
int len = opers.size();
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
int x = 10,y = 20;
while (true)
{
//模拟生产者生产数据
int data1 = rand()%10+1;// [1,10]
usleep(10);
int data2 = rand()%10;// [0,9]
char op = opers[rand()%len];
Task t(data1,data2,op);
//生产
bq->push(t);
cout << "生产者生产了一个任务: " << t.GetTask() << "thread id:" << pthread_self() << endl;
sleep(1);
}
}
int main()
{
//种一颗随机数种子
srand((long long)time(nullptr));
BlockQueue<Task>* bq = new BlockQueue<Task>;
pthread_t c[3],p[5];
for(int i = 0;i < 3;++i)
{
pthread_create(c+i,nullptr,Consumer,bq);
}
for(int i = 0;i < 5;++i)
{
pthread_create(p+i,nullptr,Productor,bq);
}
for(int i = 0;i < 3;++i)
{
pthread_join(c[i],nullptr);
}
for(int i = 0;i < 5;++i)
{
pthread_join(p[i],nullptr);
}
delete bq;
return 0;
}
运行结果:
可以看到如此以来,我们的生产者不断的往阻塞队列里面放一个又一个的Task对象,然后消费者拿到一个又一个的Task对象之后对他们进行处理从而得到运行结果。
我们从代码中可以看到生产者和消费者进行生产或者消费数据时,都要进行加锁,所以这个生产和消费在访问仓库的过程之中,本身就是串行的,这样的话生产消费还能高效吗?答案是可以的!这是因为生产消费不只是存数据和获取数据的过程,他还有前置和后置的工作要进行处理!下面我们来具体说明一下:
我们看下面这张图,如果我们只看红色方框里面的内容,生产者和消费者就是生产者生产数据和消费者获取数据,而且生产者生产数据和消费数据是同步关系的,消费者必须要等生产者生产数据之后再来消费,如果我们只是这样看生产者消费者模型的话效率是低下的!
但是实际上生产者消费者模型还是前置和后置的工作!
我们来看下面这个图进行加深理解:
所以虽然这个生产和消费在访问仓库的过程之中,本身就是串行的,生产的时候不能进行消费,消费的时候不能进行生产,但是由于生产和消费都是需要花时间的,那么如果在生产者进行生产的时候消费者进行加工处理数据,而消费者进行消费的时候生产者进行获取数据,这样的话,生产和消费不就是并发进行的吗?一个在访问临界区的代码,一个在访问非临界区的代码,这样两个线程就高效并发的调度运行起来了!是不是生产消费的效率就提高了!!
那么我们还有一个问题,在我们上面基于任务的生产者消费者模型当中,我们创建了多个生产者线程和多个消费者线程,这样可以提高效率吗?
答案是可以的!从运行结果我们可以看到每次生产和消费的线程id都是不一样的。这样当其中一个生产者进行申请锁的时候,其它线程也可以并发的进行获取数据;其中一个消费者进行申请锁的时候,其它消费者线程可以并发的进行处理数据。这样效率就大大提高了!
伪唤醒问题是指线程,但此时条件判断还不满足,但是线程却因为伪唤醒而运行后续的代码,这可能导致程序运行异常或错误。下面我们来具体说明伪唤醒问题:?
- 假设阻塞队列已经被写满了,消费者正在运行,消费之后唤醒一个生产者,然后消费者进行解锁,这个时候阻塞队列只有一个位置能进行生产。由于刚开始阻塞队列是满的,所以许多生产者线程要进行生产却不能进行,这个时候如果消费者在唤醒的时候多次使用了pthread_cond_signal或者使用了pthread_cond_broadcast。那么在一个条件变量等待的多个生产者线程都被唤醒了。这个时候这些生产者线程就不再在条件变量下进行等待了。
- 这样第一个拿到锁的生产者从pthread_cond_wait()开始往下执行程序进行生产数据,生产完数据之后阻塞队列又满了,生产者又进行唤醒消费者进行消费数据,然后释放锁。但这个时候不一定是消费者拿到锁,刚刚被唤醒的多个生产者也可能拿到锁。而这个时候,如果是生产者拿到了锁,函数并不是从头开始执行的,而是继续从pthread_cond_wait()开始往下执行程序进行,当由于阻塞队列已经满了,这个时候生产者再向阻塞队列进行push就会产生错误,这种情况的被唤醒的生产者线程就被称为伪唤醒状态。
那么如何解决伪唤醒问题呢?
判断是否满足生产消费条件时不能用if,而应该用while:
为了避免伪唤醒问题,我们在循环中检查线程等待条件,也就是说当线程被唤醒时,我们不能直接往后执行,而是要让它重新判断一次是否满足条件,确保程序在满足结束条件的情况下退出。
void push(const T& num)
{
pthread_mutex_lock(&mutex_);
while(q_.size() == maxcap_)// 这里用while作为判断条件,做到防止线程被伪唤醒的情况
{
// 伪唤醒情况?
pthread_cond_wait(&p_cond_,&mutex_);//1. 调用的时候,自动释放锁 2.?
}
//1.队列没有满 2.被唤醒
q_.push(num);// 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足
// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
}
这样增加了循环检查线程等待条件,当条件不满足时,就会重新进入休眠状态等待唤醒。