int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrictattr);
参数:
cond:要初始化的条件变量
attr:NULL
int pthread_cond_destroy(pthread_cond_t *cond)
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后面详细解释
//唤醒全部
int pthread_cond_broadcast(pthread_cond_t *cond);
//唤醒特定等待的一个
int pthread_cond_signal(pthread_cond_t *cond);
为了方便大家对生产者消费者模型的理解,我采用“321”原则为大家分析。3即3种关系,2即两种角色,1即一个交易场所。
3种关系:
1、生产者VS生产者:竞争 互斥
2、消费者VS消费者:竞争 户次
3、生产者VS消费者:互斥、同步
2种角色:生产者和消费者
1种交易场所:一段缓冲区(内存空间,STL容器等)
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
具体实现:
BlockQueue模块
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
namespace ns_blockqueue
{
const int default_cap = 5;
template <class T>
class BlockQueue
{
private:
std::queue<T> bq_; //我们的阻塞队列
int cap_; //队列的元素上限
pthread_mutex_t mtx_; //保护临界资源的锁
// 1. 当生产满了的时候,就应该不要生产了(不要竞争锁了),而应该让消费者来消费
// 2. 当消费空了,就不应该消费(不要竞争锁了),应该让生产者来进行生产
pthread_cond_t is_full_; // bq_满的, 消费者在改条件变量下等待
pthread_cond_t is_empty_; // bq_空的,生产者在改条件变量下等待
private:
bool IsFull()
{
return bq_.size() == cap_;
}
bool IsEmpty()
{
return bq_.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&mtx_);
}
void UnlockQueue()
{
pthread_mutex_unlock(&mtx_);
}
void ProducterWait()
{
// pthread_cond_wait
// 1. 调用的时候,会首先自动释放mtx_!,然后再挂起自己
// 2. 返回的时候,会首先自动竞争锁,获取到锁之后,才能返回!
// 3.
pthread_cond_wait(&is_empty_, &mtx_);
}
void ConsumerWait()
{
pthread_cond_wait(&is_full_, &mtx_);
}
void WakeupComsumer()
{
pthread_cond_signal(&is_full_);
}
void WakeupProducter()
{
pthread_cond_signal(&is_empty_);
}
public:
BlockQueue(int cap = default_cap) : cap_(cap)
{
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&is_empty_, nullptr);
pthread_cond_init(&is_full_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&is_empty_);
pthread_cond_destroy(&is_full_);
}
public:
// const &:输入
//*: 输出
//&: 输入输出
void Push(const T &in)
{
LockQueue();
//临界区
// if(IsFull()){ //bug?
//我们需要进行条件检测的时候,这里需要使用循环方式
//来保证退出循环一定是因为条件不满足导致的!
while (IsFull())
{
//等待的,把线程挂起,我们当前是持有锁的!!!
ProducterWait();
}
//向队列中放数据,生产函数
bq_.push(in);
// if(bq_.size() > cap_/2 ) WakeupComsumer();
UnlockQueue();
WakeupComsumer();
}
void Pop(T *out)
{
LockQueue();
//从队列中拿数据,消费函数函数
//我们需要进行条件检测的时候,这里需要使用循环方式
//来保证退出循环一定是因为条件不满足导致的!
while (IsEmpty())
{ //
//无法消费
ConsumerWait();
}
*out = bq_.front();
bq_.pop();
// if(bq_.size() < cap_/2 ) WakeupProducter();
UnlockQueue();
WakeupProducter();
}
};
}
Task任务模块
#pragma once
#include <iostream>
#include <pthread.h>
namespace ns_task
{
class Task
{
private:
int x_;
int y_;
char op_; //+/*/%
public:
// void (*callback)();
Task() {}
Task(int x, int y, char op) : x_(x), y_(y), op_(op)
{
}
int Run()
{
int res = 0;
switch (op_)
{
case '+':
res = x_ + y_;
break;
case '-':
res = x_ - y_;
break;
case '*':
res = x_ * y_;
break;
case '/':
res = x_ / y_;
break;
case '%':
res = x_ % y_;
break;
default:
std::cout << "bug??" << std::endl;
break;
}
std::cout << "当前任务正在被: " << pthread_self() << " 处理: " \
<< x_ << op_ << y_ << "=" << res << std::endl;
return res;
}
int operator()()
{
return Run();
}
~Task() {}
};
}
mian函数模块
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <time.h>
#include <cstdlib>
#include <unistd.h>
using namespace ns_blockqueue;
using namespace ns_task;
void *consumer(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
while(true){
Task t;
bq->Pop(&t); //这里完成了任务消费的第1步
t(); //这里完成了任务消费的第2步
// sleep(2);
// int data = 0;
// bq->Pop(&data);
// std::cout << "消费者消费了一个数据: " << data << std::endl;
}
}
void *producter(void *args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
std::string ops = "+-*/%";
while(true){
//1. 制造数据,生产者的数据(task)从哪里来??
int x = rand()%20+1; //[1,20]
int y = rand()%10+1; //[1,10]
char op = ops[rand()%5];
Task t(x, y, op);
std::cout << "生产者派发了一个任务: " << x << op << y << "=?" << std::endl;
//2. 将数据推送到任务队列中
bq->Push(t);
sleep(1);
// sleep(2);
// int data = rand()%20 + 1;
// std::cout << "生产者生产数据: " << data << std::endl;
// bq->Push(data);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c,p;
pthread_t c1,c2,c3,c4;
pthread_create(&c, nullptr, consumer, (void*)bq);
pthread_create(&c1, nullptr, consumer, (void*)bq);
pthread_create(&c2, nullptr, consumer, (void*)bq);
pthread_create(&c3, nullptr, consumer, (void*)bq);
pthread_create(&c4, nullptr, consumer, (void*)bq);
pthread_create(&p, nullptr, producter, (void*)bq);
pthread_join(c, nullptr);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p, nullptr);
return 0;
}