C++ 并发编程 | 条件变量

发布时间:2024年01月18日


前言

可以使用互斥锁来解决生产者-消费者问题,但是,在队列没有数据的这一段时间,消费者线程会不断的判断队列是否为空,浪费了很多不必要的CPU资源。这时候我们设想,能否设计这样的一种机制,如果在队列没有数据的时候,消费者线程能一直阻塞在那里,等待着别人给它唤醒,在生产者往队列中放入数据的时候通知一下这个等待线程,唤醒它,告诉它可以来取数据了。于是多线程中的条件变量就横空出世!条件变量是多线程数据同步的一种操作,不管是用哪种框架,哪种语言实现多线程的功能,条件变量都是不得不考虑的一种情况。

一、条件变量

1、介绍

在C++11中,可以使用条件变量(condition_variable)实现多个线程间的同步操作;当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:

  • 一个线程因等待"条件变量的条件成立"而挂起
  • 另外一个线程使"条件成立",给出信号,从而唤醒被等待的线程

为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起;通常情况下这个锁是std::mutex,并且管理这个锁只能是 std::unique_lock <std::mutex>模板类

注意:条件变量中只能使用std::unique_lock <std::mutex>

2、wait成员函数

线程的阻塞是通过成员函数wait、wait_for、wait_until函数实现的。wait导致当前线程被阻塞,直至条件变量被通知或虚假唤醒发生时,线程被唤醒解除阻塞,下面主要介绍wait函数

2.1、函数声明与功能

wait函数有两个重载的版本,函数的声明及功能,如下:

// 重载版本一
void wait(std::unique_lock<std::mutex>& lock);
// 函数功能
1、调用wait函数,将解锁互斥量,并堵塞到wait所在的行,直到其他某个线程调用notify_one/notify_all成员函数才会解除阻塞

// 重载版本二
// Predicate 谓词函数,可以普通函数或者lambda表达式
template<class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
// 函数功能
1、如果第二个参数lambda表达式返回值是true,那wait()直接返回不堵塞
2、如果第二个参数lambda表达式返回值是false,那么wait()将解锁互斥量,并堵塞到所在的行

2.2、wait被唤醒后的工作流程

当wait的状态唤醒后,wait就开始恢复干活了,工作流程如下:

步骤一:wait不断的尝试重新获取互斥量锁,如果获取不到,那么流程就卡在wait这里等着获取,如果获取到了锁,那么wait就继续执行步骤二;
步骤二:有以下三种情况
1)如果wait有第二个参数(lambda),就判断这个lambda表达式,如果lambda表达式为false,那么wait又对互斥量解锁,然后又休眠在这里等待再次被notify_one唤醒
2)如果lambda表达式为true,则wait返回,流程走下来(此时互斥锁被锁着)
3)如果wait没有第二个参数,则wait返回,流程走下来

注意wait类型的函数在会阻塞时,自动释放锁,即调用unique_lock的成员函数unlock(),以便其他线程能有机会获得锁。这就是条件变量只能和unique_lock一起使用的原因,否则当前线程一直占有锁,线程被阻塞。

2.3、虚假唤醒

在正常情况下,wait类型函数返回时要么是因为被唤醒,要么是因为超时才返回,但是在实际中发现,因此操作系统的原因,wait类型在不满足条件时,它也会返回,这就导致了虚假唤醒。因此,我们一般都是使用带有谓词参数的wait函数,带谓词的wait函数等价于:

while (!(xxx条件) )
{
    //虚假唤醒发生,由于while循环,再次检查条件是否满足,
    //否则继续等待,解决虚假唤醒
    wait();  
}
//其他代码

3、notify_one与notify_all成员函数

notify_one()notify_all()常用来唤醒阻塞的线程,两者的区别如下:

  • notify_one():因为只唤醒等待队列中的第一个线程;不存在锁争用,所以能够立即获得锁。其余的线程不会被唤醒,需要等待再次调用notify_one()或者notify_all()。
  • notify_all():会唤醒所有等待队列中阻塞的线程,存在锁争用,只有一个线程能够获得锁。其他线程会继续尝试获得锁(类似于轮询),而不会再次阻塞。当持有锁的线程释放锁时,这些线程中的一个会获得锁。而其余的会接着尝试获得锁。

4、生产者消费者问题

生产者消费者问题,也称有限缓冲问题,是一个多进程/线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个进程/线程——即所谓的生产者消费者,在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者,代码实现如下:

#include <mutex>
#include <deque>
#include <iostream>
#include <thread>
#include <condition_variable>

// 缓存区最大数目
const size_t MAX_NUM = 30;

class PCModle {
public:
    void producer_thread() {
        while (true) {
            std::this_thread::sleep_for(std::chrono::milliseconds(500));

            // 加锁
            std::unique_lock<std::mutex> lk(m_cvMutex);

            // 当队列未满时,继续添加数据
            m_cv.wait(lk, [this]() {
                return this->m_dataDeque.size() <= MAX_NUM;
            });

            // 流程只要能走到这里来,这个互斥锁一定是锁着的

            m_dateIndex++;
            m_dataDeque.push_back(m_dateIndex);
            std::cout << "producer " << m_dateIndex << ", queue size: " << m_dataDeque.size() << std::endl;
            // 唤醒其他线程
            m_cv.notify_all();
            // 自动释放锁
        }
    }

    void consumer_thread() {
        while (true) {
            // 加锁
            std::unique_lock<std::mutex> lk(m_cvMutex);

            // 当队列中没有数据时,继续等待数据
            m_cv.wait(lk, [this] {
                return !this->m_dataDeque.empty();
            });

            // 流程只要能走到这里来,这个互斥锁一定是锁着的

            // 消费队列中的数据
            int data = m_dataDeque.front();
            m_dataDeque.pop_front();
            std::cout << "consumer " << data << ", deque size: " << m_dataDeque.size() << std::endl;
            // 唤醒其他线程
            m_cv.notify_all();
            // 自动释放锁
        }
    }

private:
    // 互斥锁
    std::mutex m_cvMutex;
    // 条件变量
    std::condition_variable m_cv;
    // 缓存区
    std::deque<int> m_dataDeque;
    // 数据
    long m_dateIndex = 0;
};

int main() {
    PCModle obj;

    std::thread producerThread  = std::thread(&PCModle::producer_thread, &obj);
    std::thread consumerThread1 = std::thread(&PCModle::consumer_thread, &obj);
    std::thread consumerThread2 = std::thread(&PCModle::consumer_thread, &obj);

    producerThread.join();
    consumerThread1.join();
    consumerThread2.join();

    return 0;
}

文章来源:https://blog.csdn.net/cloud323/article/details/135675909
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。