可以使用互斥锁来解决生产者-消费者问题,但是,在队列没有数据的这一段时间,消费者线程会不断的判断队列是否为空,浪费了很多不必要的CPU资源。这时候我们设想,能否设计这样的一种机制,如果在队列没有数据的时候,消费者线程能一直阻塞在那里,等待着别人给它唤醒,在生产者往队列中放入数据的时候通知一下这个等待线程,唤醒它,告诉它可以来取数据了。于是多线程中的条件变量就横空出世!条件变量是多线程数据同步的一种操作,不管是用哪种框架,哪种语言实现多线程的功能,条件变量都是不得不考虑的一种情况。
在C++11中,可以使用条件变量(condition_variable)实现多个线程间的同步操作;当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:
为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起;通常情况下这个锁是std::mutex,并且管理这个锁只能是
std::unique_lock <std::mutex>
模板类
注意:条件变量中只能使用
std::unique_lock <std::mutex>
线程的阻塞是通过成员函数
wait、wait_for、wait_until
函数实现的。wait导致当前线程被阻塞,直至条件变量被通知或虚假唤醒发生时,线程被唤醒解除阻塞,下面主要介绍wait
函数
wait
函数有两个重载的版本,函数的声明及功能,如下:
// 重载版本一
void wait(std::unique_lock<std::mutex>& lock);
// 函数功能
1、调用wait函数,将解锁互斥量,并堵塞到wait所在的行,直到其他某个线程调用notify_one/notify_all成员函数才会解除阻塞
// 重载版本二
// Predicate 谓词函数,可以普通函数或者lambda表达式
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
// 函数功能
1、如果第二个参数lambda表达式返回值是true,那wait()直接返回不堵塞
2、如果第二个参数lambda表达式返回值是false,那么wait()将解锁互斥量,并堵塞到所在的行
当wait的状态唤醒后,wait就开始恢复干活了,工作流程如下:
步骤一:wait不断的尝试重新获取互斥量锁,如果获取不到,那么流程就卡在wait这里等着获取,如果获取到了锁,那么wait就继续执行步骤二;
步骤二:有以下三种情况
1)如果wait有第二个参数(lambda),就判断这个lambda表达式,如果lambda表达式为false,那么wait又对互斥量解锁,然后又休眠在这里等待再次被notify_one唤醒
2)如果lambda表达式为true,则wait返回,流程走下来(此时互斥锁被锁着)
3)如果wait没有第二个参数,则wait返回,流程走下来
注意:
wait
类型的函数在会阻塞时,自动释放锁,即调用unique_lock
的成员函数unlock()
,以便其他线程能有机会获得锁。这就是条件变量只能和unique_lock
一起使用的原因,否则当前线程一直占有锁,线程被阻塞。
在正常情况下,
wait
类型函数返回时要么是因为被唤醒,要么是因为超时才返回,但是在实际中发现,因此操作系统的原因,wait
类型在不满足条件时,它也会返回,这就导致了虚假唤醒。因此,我们一般都是使用带有谓词参数的wait
函数,带谓词的wait
函数等价于:
while (!(xxx条件) )
{
//虚假唤醒发生,由于while循环,再次检查条件是否满足,
//否则继续等待,解决虚假唤醒
wait();
}
//其他代码
notify_one()
与notify_all()
常用来唤醒阻塞的线程,两者的区别如下:
notify_one()
:因为只唤醒等待队列中的第一个线程;不存在锁争用,所以能够立即获得锁。其余的线程不会被唤醒,需要等待再次调用notify_one()或者notify_all()。notify_all()
:会唤醒所有等待队列中阻塞的线程,存在锁争用,只有一个线程能够获得锁。其他线程会继续尝试获得锁(类似于轮询),而不会再次阻塞。当持有锁的线程释放锁时,这些线程中的一个会获得锁。而其余的会接着尝试获得锁。生产者消费者问题,也称有限缓冲问题,是一个多进程/线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程/线程——即所谓的生产者和消费者,在实际运行时会发生的问题。
生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者,代码实现如下:
#include <mutex>
#include <deque>
#include <iostream>
#include <thread>
#include <condition_variable>
// 缓存区最大数目
const size_t MAX_NUM = 30;
class PCModle {
public:
void producer_thread() {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// 加锁
std::unique_lock<std::mutex> lk(m_cvMutex);
// 当队列未满时,继续添加数据
m_cv.wait(lk, [this]() {
return this->m_dataDeque.size() <= MAX_NUM;
});
// 流程只要能走到这里来,这个互斥锁一定是锁着的
m_dateIndex++;
m_dataDeque.push_back(m_dateIndex);
std::cout << "producer " << m_dateIndex << ", queue size: " << m_dataDeque.size() << std::endl;
// 唤醒其他线程
m_cv.notify_all();
// 自动释放锁
}
}
void consumer_thread() {
while (true) {
// 加锁
std::unique_lock<std::mutex> lk(m_cvMutex);
// 当队列中没有数据时,继续等待数据
m_cv.wait(lk, [this] {
return !this->m_dataDeque.empty();
});
// 流程只要能走到这里来,这个互斥锁一定是锁着的
// 消费队列中的数据
int data = m_dataDeque.front();
m_dataDeque.pop_front();
std::cout << "consumer " << data << ", deque size: " << m_dataDeque.size() << std::endl;
// 唤醒其他线程
m_cv.notify_all();
// 自动释放锁
}
}
private:
// 互斥锁
std::mutex m_cvMutex;
// 条件变量
std::condition_variable m_cv;
// 缓存区
std::deque<int> m_dataDeque;
// 数据
long m_dateIndex = 0;
};
int main() {
PCModle obj;
std::thread producerThread = std::thread(&PCModle::producer_thread, &obj);
std::thread consumerThread1 = std::thread(&PCModle::consumer_thread, &obj);
std::thread consumerThread2 = std::thread(&PCModle::consumer_thread, &obj);
producerThread.join();
consumerThread1.join();
consumerThread2.join();
return 0;
}