目录
嘿!收到一张超美的风景图,希望你每天都能顺心!?
互斥条件:我们可以通过其他方法解决请求与保持条件: 如果我们申请锁多次失败,就将自身拥有的锁释放,让其他线程竞争。
加锁顺序一致避免锁未释放的场景资源一次性分配
思考:我们有了互斥量(锁)之后,线程可以通过锁串行的访问临界资源,中间是否存在什么问题??
问题1:线程访问临界资源需要申请锁,申请锁后需要先检测临界资源是否就绪,无非就两种情况:情况一,就绪,访问;情况二,不就绪,释放锁,再次竞争锁。这会导致线程一直忙着竞争锁,释放锁,进而导致性能下降。(访问临界资源,先申请锁,后检测资源是否就绪,而检测资源本身也是访问临界资源,所以:检测资源一定在加锁与解锁之间)
问题2:引发出线程竞争锁的不平衡问题。?互斥量(锁)的方式解决了共享资源安全性的问题,但由于多线程竞争锁的随机性导致锁的分配不均,进而产生某一线程迟迟未竞争到锁——饥饿问题
例如:一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免 饥饿问题 ,叫做同步。同步让线程竞争锁有了先来后到的顺序, 配合锁一同控制线程访问临界资源。
竞态条件:多个线程在访问共享资源时,由于执行顺序的不确定性(执行操作的原子性以及CPU调度情况)导致出现的问题。竞态条件可能会导致数据不一致或者程序出现错误的情况。
这个可以类比互斥量,两种初始化方法。
?pthread_cond_t? ? ?pc? ?=? PTHREAD_COND_INITIALIZER;? ? //??定义全局条件变量
静态全局锁我们不需要销毁。
int pthread_cond_init(pthread_cond_t *restrict cond,? const pthread_condattr_t *restrict attr);
通过条件变量函数初始化的,需要我们在生命周期结束时,进行手动销毁。
int pthread_cond_destroy(pthread_cond_t *cond)
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
int pthread_cond_signal(pthread_cond_t *? cond);
int pthread_cond_broadcast(pthread_cond_t *? ?cond);
功能: 一次性唤醒,在条件变量队列中的所有线程。
实践:
#define NUM 3
volatile bool quit = false;
typedef void (*func_t)(const string&, pthread_mutex_t* mtx, pthread_cond_t* cond);
void func1(const string& str, pthread_mutex_t* mtx, pthread_cond_t* cond)
{
while (!quit)
{
pthread_mutex_lock(mtx);
if (!quit)
pthread_cond_wait(cond, mtx);
cout << str << " running --a" << endl;
pthread_mutex_unlock(mtx);
}
}
void func2(const string& str, pthread_mutex_t* mtx, pthread_cond_t* cond)
{
while (!quit)
{
pthread_mutex_lock(mtx);
if (!quit)
pthread_cond_wait(cond, mtx);
cout << str << " running --b" << endl;
pthread_mutex_unlock(mtx);
}
}
void func3(const string& str, pthread_mutex_t* mtx, pthread_cond_t* cond)
{
while (!quit)
{
pthread_mutex_lock(mtx);
if (!quit)
pthread_cond_wait(cond, mtx);
cout << str << " running --c" << endl;
pthread_mutex_unlock(mtx);
}
}
class thread_Data
{
public:
thread_Data(const string& pt_name, func_t fc, pthread_mutex_t* mtx, pthread_cond_t* cond)
: _pth_name(pt_name), _fc(fc), _mtx(mtx), _cond(cond)
{}
string _pth_name;
func_t _fc;
pthread_mutex_t* _mtx;
pthread_cond_t* _cond;
};
void* func(void* argc)
{
thread_Data* tmp = (thread_Data*)argc;
tmp->_fc(tmp->_pth_name, tmp->_mtx, tmp->_cond);
delete tmp;
}
int main()
{
pthread_mutex_t mtx;
pthread_cond_t cond;
pthread_mutex_init(&mtx,nullptr);
pthread_cond_init(&cond,nullptr);
pthread_t pth[NUM];
func_t fun[3] = {func1, func2, func3};
for (int i = 0; i < NUM; i++)
{
string name("thread ");
name += to_string(i + 1);
thread_Data* tmp = new thread_Data(name, fun[i], &mtx, &cond);
pthread_create(pth + i, nullptr, func, (void*)tmp);
}
int n = 10;
while (n--)
{
cout << "the main signal start running " << endl;
// pthread_cond_signal(&cond);
pthread_cond_broadcast(&cond);
sleep(1);
}
quit = true;
pthread_cond_broadcast(&cond); // 主线程重新唤醒一次,在quit状态改变前进入阻塞的线程,让新线程判断quit一次
for (int i = 0; i < NUM; i++)
{
pthread_join(pth[i], nullptr);
}
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
return 0;
}
pthread_mutex_lock(&mutex);
while (条件为假)
{
pthread_cond_wait(cond, mutex);
// 修改条件
}
pthread_mutex_unlock(&mutex);
pthread_cond_wait本质上还是函数调用,函数调用总会有失败的时候。如果按照if 只判断一次,万一wait没有阻塞住线程,线程就会进行异常操作,所以对条件判断得用while语句,100%能阻塞住线程。
回答条件变量小节刚开始的问题:?
这里留下2个疑问:
1. 条件满足后,将唤醒线程——如何检测到条件是否满足?
2. 在条件变量加入后,mutex(锁)的意义又发生了什么变化?
?我们用超市的例子,来理解生产消费者模型:
代码实践:自己象征性的实现一个阻塞队列,一个生产者,一个消费者。
// BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#define defigDefualtCap 10
template <class T>
class BlockQueue
{
private:
// 阻塞队列是否已满
bool isfull()
{
return !(_que.size() < _capacity);
}
// 阻塞队列是否为空
bool isempty()
{
return _que.size() == 0;
}
public:
BlockQueue(int capacity = defigDefualtCap)
:_capacity(capacity)
{
pthread_mutex_init(&_mtx,nullptr);
pthread_cond_init(&_empty,nullptr);
pthread_cond_init(&_full, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
}
int push(const T& in)
{
pthread_mutex_lock(&_mtx);
if (isfull()) // 检测阻塞队列是否未满,满了就先阻塞到,条件变量为队列满的队列中
pthread_cond_wait(&_full, &_mtx);
// 这里锁的意义:队列满,push操作陷入阻塞,同时自动释放锁;等到下次,被唤醒时,自动申请到锁,继续push
_que.push(in); // 输入后唤醒客户读取数据
if (_que.size() >= (_capacity / 2))
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mtx);
return 0;
}
int pop(T& a)
{
pthread_mutex_lock(&_mtx);
if (isempty())
{
pthread_cond_signal(&_full);
pthread_cond_wait(&_empty, &_mtx);
}
a = _que.front();
_que.pop();
pthread_mutex_unlock(&_mtx);
// pthread_cond_signal(&_full);
return 0;
}
private:
std::queue<T> _que; // 阻塞队列
int _capacity; // 队列值咯
pthread_mutex_t _mtx;
pthread_cond_t _empty; // 为什么要使用2个条件变量,这不代表要2个
pthread_cond_t _full;
};
// ConPor.cxx
#include "BlockQueue.hpp"
void* costomer(void* arg)
{
BlockQueue<int>* clint = (BlockQueue<int>*)arg;
while (1)
{
int a;
clint->pop(a);
std::cout << "消费一个:" << a << std::endl;
}
}
void* producter(void* arg)
{
BlockQueue<int>* product = (BlockQueue<int>*)arg;
int a = 1;
while (1)
{
product->push(a);
std::cout << "生产一个: " << a << std:: endl;
a++;
sleep(1);
}
}
int main()
{
// 先完成单对单的 生产者,消费者
pthread_t a, b;
BlockQueue<int> BQ;
pthread_create(&a, nullptr, costomer, (void*)&BQ);
pthread_create(&b, nullptr, producter, (void*)&BQ);
pthread_join(a, nullptr);
pthread_join(b, nullptr);
return 0;
}
思考生产消费者模型的效率提升,不仅仅是生产者将数据拷贝到缓冲区;消费者从缓冲区获取数据,以及并发效率提升,我们知道在消费者获取到商品(数据)后,需要时间将数据进行处理,而这时生产端在允许的情况下,可以一直生产,这样并发的效率优势就显现出来了。
如果生产与消费时间长,我们可以通过多生产多消费提高效率。如果生产时间短,消费时间短,就不一定需要多生产,因为调度时间占比就比较大,效率下降?。
这里就利用超市——生产消费者模型的例子,回答上面的例子:
1. 条件满足后,将唤醒线程——如何检测到条件是否满足?
答:假设是,消费者进入缓冲区发现数据还未加载,随后阻塞住,并唤醒生产者开始生产数据;当生产者生产完成后,条件满足,就可以唤醒消费者。总之,检测条件的必然不是角色自己本身,是其他资源供应角色的唤醒。
2. 在条件变量加入后,mutex(锁)的意义又发生了什么变化?
答:具体来说,当使用条件变量时,线程在等待条件变量时会释放mutex,这样其他线程就可以获得mutex并访问共享资源。而当条件满足时,唤醒线程会重新获取mutex,然后再次检查条件,这样就能确保在修改条件和唤醒等待线程之间的操作是原子的,从而避免了竞争条件的发生。
因此,引入条件变量后,mutex不仅用于保护共享资源,还用于协调线程的等待和唤醒过程,确保线程在等待和唤醒时能够正确地访问共享资源。
首先,我们曾经的场景是多个执行流,访问一个整体的共享资源。我们试想一下是否有这样的场景,这几个执行流,每个访问的是不同位置的资源,我们是否可以将整体的共享资源切成多块资源,使多执行流之间形成并发。
理解信号量,举一个简单的例子:电影院例子
看电影需要买票,而买票的本质:资源(座位)的预定。而信号量就是里面的票数,申请票数就票数--(信号量--,管他叫:P操作);反之++(叫: V操作),在票数为0时,无法在申请电影票。
问:如何知道资源的个数,以及剩余多少??(信号量剩余多少的准确性,由信号量的P,V原子性保证)并且能保证这个资源是我该拥有的呢?(信号量预定)
#include <semaphore.h>int sem_init(sem_t *sem, int pshared, unsigned int value);参数:pshared:0 表示线程间共享,非零表示进程间共享。value :信号量初始值
?int sem_destroy(sem_t *sem);
功能:等待信号量,会将信号量的值减 1?int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加 1 。
1. 计数器法。2. 预留一个单位空间作为满的状态。
#ifndef __RING_QUEUE_
#define __RING_QUEUE_
#include <iostream>
#include <vector>
#include "sem_num.hpp"
#define _default_queue_size 10
template <class T>
class RingQueue
{
public:
RingQueue(const int Num = _default_queue_size)
:ringqueue(Num)
,_num(Num)
,c_step(0)
,p_step(0)
,space_sem(_default_queue_size)
,data_sem(0)
{}
~RingQueue()
{}
void push(const T& data)
{
//生产者循环输入资源
space_sem.P(); // 空间资源--
ringqueue[p_step++] = data;
p_step %= _num;
data_sem.V(); // 信号量资源++
}
void pop(T& data)
{
//消费者循环获取数据
data_sem.P();
data = ringqueue[c_step++];
c_step %= _num;
space_sem.V();
}
private:
std::vector<T> ringqueue;
int _num; // 记录圆环长度
int c_step; //消费者所在下标值
int p_step; //生产者所在下标值
SemNum space_sem; // 空间资源信号量,给生产者
SemNum data_sem; // 数据资源信号量, 给消费者
};
#endif
#ifndef __SAM_NUM_
#define __SAM_NUM_
#include <iostream>
#include <semaphore.h>
class SemNum
{
public:
SemNum(const int value)
{
sem_init(&sem, 0, value);
}
~SemNum()
{
sem_destroy(&sem);
}
void P()
{
sem_wait(&sem);
}
void V()
{
sem_post(&sem);
}
private:
sem_t sem;
};
// main函数
#include "RingQueue.hpp"
#include <time.h>
#include <unistd.h>
void* costomer(void* args)
{
RingQueue<int>* RQ = (RingQueue<int>*)args;
while (1)
{
int data = -1;
RQ->pop(data);
std::cout << "已销售: " << data << std::endl; // " 线程ID: " << pthread_self() << std::endl;
}
}
void* porducer(void* args)
{
RingQueue<int>* RQ = (RingQueue<int>*)args;
while (1)
{
int data = rand() % 1000 + 1;
std::cout << "已生产: " << data << std::endl; // " 线程ID: "<< pthread_self() << std::endl;
RQ->push(data);
}
}
int main()
{
pthread_t p, c;
srand(time(0) * 23 ^ 0Xeeee);
RingQueue<int> RQ;
pthread_create(&p,nullptr, porducer, (void*)&RQ);
pthread_create(&c,nullptr, costomer, (void*)&RQ);
pthread_join(c,nullptr);
pthread_join(p,nullptr);
return 0;
}
简单的生产消费模型已经差不多了,现在我们尝试实现多生产,多消费。
之前是单生产单消费,我们转变为多生产多消费,就是多了生产与生产之间及消费与消费之间两个关系。生产与生产,消费与消费的共享资源就是对下标的修改,也是我们需要保护的地方。精华修改如下:
void push(const T& data)
{
//生产者循环输入资源
space_sem.P(); // 空间资源--
pthread_mutex_lock(&p_lock);
ringqueue[p_step++] = data;
p_step %= _num;
pthread_mutex_unlock(&p_lock);
data_sem.V(); // 信号量资源++
}
void pop(T& data)
{
//消费者循环获取数据
data_sem.P();
pthread_mutex_lock(&c_lock);
data = ringqueue[c_step++];
c_step %= _num;
pthread_mutex_unlock(&c_lock);
space_sem.V();
}
这里我们有一个需要讨论的问题:
(1). 为什么先申请信号量,后申请锁?
答:? 1. 锁覆盖的串行区域,应尽量的短小; 2. 申请信号量,本身是原子操作,与条件变量不同,不用担心信号量的数据安全。
(2). 这里,多生产多消费的意义?
答:生产者拿到数据以及消费者处理数据本身是最废时间的,这样体现出多生产多消费在特定情况下的并发优势。
生产者的本质:私人任务? ?——>? ?公共空间中
消费者的本质:公共空间中的任务——>? 私人任务
(3). 信号量本身是一个计数器,那计数器的意义?
答:曾经生产消费者模型:加锁——> 检测(条件变量),访问 ——> 解锁 ;在没有访问临界资源前,我们无法得知资源就绪情况。条件变量减少了不必要锁的申请,但仍需要在临界资源中的检测。
而信号量,是提前预知资源情况,而且在PV操作中,提前在外部得知临界资源的变化情况!
下期:网络编程!!
? ?本小节就到这里了,感谢小伙伴的浏览,如果有什么建议,欢迎在评论区评论,如果给小伙伴带来一些收获请留下你的小赞,你的点赞和关注将会成为博主创作的动力。