线程同步--生产者消费者模型

发布时间:2024年01月18日

在这里插入图片描述

一.条件变量

  • 条件变量是线程间共享的全局变量,线程间可以通过条件变量进行同步控制
  • 条件变量的使用必须依赖于互斥锁以确保线程安全,线程申请了互斥锁后,可以调用特定函数进入条件变量等待队列(同时释放互斥锁),其他线程则可以通过条件变量在特定的条件下唤醒该线程(唤醒后线程重新获得互斥锁),实现线程同步.
    • 例如一个线程访问队列时,发现队列为空,则它只能等待其它线程将数据添加到队列中,这种情况就需要用到条件变量.
    • 线程同步的概念:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问共享资源,从而有效避免线程饥饿问题(饥饿问题指线程长时间等待资源而无法被调度).
      在这里插入图片描述

pthread线程库提供的条件变量操作

//声明全局互斥锁并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//声明全局条件变量并初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  • 线程等待条件:
任务线程代码{
	pthread_mutex_lock(&mutex);
	if(条件为假)
		pthread_cond_wait(&cond, &mutex);//等待时会释放互斥锁,等待完后自动加锁
	//访问共享资源....
	pthread_mutex_unlock(&mutex);
}

线程调用pthread_cond_wait等待时,该接口会释放互斥锁,等待结束后自动加锁

  • 控制线程给条件变量发送唤醒信号
控制线程代码{
	if(满足唤醒条件){
		pthread_mutex_lock(&mutex);
		pthread_cond_signal(cond);
		pthread_mutex_unlock(&mutex);
	}
}

唤醒操作加锁是为了避免信号丢失

  • 示例:
#include <iostream>
#include <unistd.h>
#include <pthread.h>

int cnt = 0;
//声明全局互斥锁并初始化
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//声明全局条件变量并初始化
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void *Count(void * args)
{
    //线程分离,无需主线程等待
    pthread_detach(pthread_self());
    uint64_t number = (uint64_t)args;
    std::cout << "pthread: " << number << " create success" << std::endl;

    while(true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);               
        std::cout << "pthread: " << number << " , cnt: " << cnt++ << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    for(uint64_t i = 0; i < 4; i++)
    {
        pthread_t tid;
        pthread_create(&tid, nullptr, Count, (void*)i);
        usleep(1000);
    }
    sleep(3);
    std::cout << "main thread ctrl begin: " << std::endl;

    while(true) 
    {
        sleep(1);
        //唤醒在cond的等待队列中等待的一个线程,默认都是第一个
        pthread_mutex_lock(&mutex);
        pthread_cond_signal(&cond); 
        pthread_mutex_unlock(&mutex);
        //按顺序唤醒在cond的等待队列中的所有线程
        //pthread_cond_broadcast(&cond);
        std::cout << "signal one thread..." << std::endl;
    }

    return 0;
}
  • 线程同步过程图解:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 条件变量和锁的销毁:
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);

二.生产者消费者模型

  • 生产者消费者模型是一种多线程并发协作的设计框架,生产者负责生成并发送数据,消费者负责接收并处理数据.
  • 生产者和消费者之间存在一个数据容器作为缓冲区,生产者生产的数据存入容器中,消费者需要的数据从容器中获取,实现了生产者和消费者之间的数据传输解耦
  • 数据容器由互斥锁保护,同一个时刻只能有一个线程访问数据容器,生产者和消费者之间通过条件变量(或信号量)实现同步
  • 对于数据容器的访问,生产者和消费者遵循三个原则:
    • 生产者和生产者之间互斥
    • 消费者和消费者之间互斥
    • 生产者和消费者之间互斥并同步
      在这里插入图片描述

生产者消费者模型的高效性

  • 由于生产者和消费者之间的数据传输解耦,生产者生产完数据之后不用等待消费者处理数据,而是直接将数据存入容器,消费者不需要向生产者请求数据,而是直接从容器里获取数据,因此即便在生产者和消费者的效率不对等且多变的情况下,多个生产者依然可以高效专一地并发生产数据,多个消费者依然可以高效专一地并发处理数据,使得系统整体的并发量得到提高
    在这里插入图片描述

基于环形队列实现生产者消费者模型中的数据容器

  • 环形队列中,消费者访问队列的头指针进行数据出队操作,生产者访问队列的尾指针进行数据入队操作
  • 两把互斥锁分别保证消费者和消费者之间的互斥以及生产者和生产者之间的互斥,两个信号量实现消费者和生产者之间的互斥与同步
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

  • 当环形队列既不为空也不为满时,支持一个生产者和一个消费者并发地进行数据的存取
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>

//环形队列默认容量
const static int defaultcap = 5;

template<class T>
class RingQueue{
private:
    std::vector<T> ringqueue_;
    int cap_;          //容器的容量

    int c_step_;       // 消费者环形队列指针
    int p_step_;       // 生产者环形队列指针

    sem_t cdata_sem_;  // 消费者的数据资源
    sem_t pspace_sem_; // 生产者的空间资源

    pthread_mutex_t c_mutex_;   //消费者与消费者之间的互斥锁
    pthread_mutex_t p_mutex_;   //生产者与生产者之间的互斥锁
public:
    RingQueue(int cap = defaultcap)
        :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        //初始化生产者和消费者的信号量-->消费者一开始没有信号量资源,生产者一开始具有最多的空间资源
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);

        pthread_mutex_init(&c_mutex_, nullptr);
        pthread_mutex_init(&p_mutex_, nullptr);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }

    //信号量的资源状态可以区分队列的空和满

    void Push(const T &in) 
    {
        //生产者等待空间资源
        sem_wait(&pspace_sem_);
        pthread_mutex_lock(&p_mutex_);
        ringqueue_[p_step_] = in;
        p_step_++;
        p_step_ %= cap_;
        pthread_mutex_unlock(&p_mutex_);
        //生产完数据后增加消费者的信号量资源
        sem_post(&cdata_sem_);
    }
    void Pop(T *out)      
    {
        //消费者等待数据资源
        sem_wait(&cdata_sem_);
        pthread_mutex_lock(&c_mutex_);
        *out = ringqueue_[c_step_];
        c_step_++;
        c_step_ %= cap_;
        pthread_mutex_unlock(&c_mutex_);
        //消费完数据后增加生产者的信号量资源
        sem_post(&pspace_sem_);
    }
};

在这里插入图片描述

文章来源:https://blog.csdn.net/weixin_73470348/article/details/135585080
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。