目录
详见
5.1 进程、线程基础知识 | 小林codinghttps://xiaolincoding.com/os/4_process/process_base.html#%E7%BA%BF%E7%A8%8B
#include <pthread.h>
int pthread_create(pthread_t* thread, const pthread_attr_t* attr,
void* (*start_routine)(void*), void* arg);
// 成功时返回0,失败时返回错误码
// thread 输出型参数,获取新线程ID typedef unsigned long int pthread_t;
// attr 指向新线程的属性,一般使用默认值,通常为NULL
// start_routine 函数指针,新线程将运行的函数
// arg 传给start_routine函数的参数
因为pthread并非Linux系统的默认库,而是POSIX线程库。在Linux中将其作为一个库来使用,因此加上-pthread以显式链接该库。函数在执行错误时的错误信息将作为返回值返回,并不修改系统全局变量errno,当然也无法使用perror打印错误信息。
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
void* callback(void* arg)
{
printf("I am child thread.\n");
printf("arg value: %d\n", *(int*)arg);
}
int main()
{
// main函数所在的线程为主线程,其余创建的线程为子线程
pthread_t tid;
int num = 10;
// 创建一个子线程
int ret = pthread_create(&tid, NULL, callback, (void*)&num);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error: %s\n", errstr);
}
printf("I am main thread.\n");
for (int i = 0; i < 5; i++)
{
printf("%d\n", i);
}
sleep(1);
return 0;
}
#include <pthread.h>
pthread_t pthread_self(void);
// 获取当前线程ID
// 这个函数总是成功的
int pthread_equal(pthread_t t1, pthread_t t2);
// 比较两个线程ID是否相等
// 这个函数总是成功的,相等返回非0值,不相等返回0
#include <pthread.h>
void pthread_exit(void* retval);
// retval 可以指向任何类型的数据,它指向的数据将作为线程退出时的返回值,不关心则设置为NULL
// 不能指向局部变量
// 可以被主线程接收
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
void* callback(void* arg)
{
printf("child thread id: %ld\n", pthread_self());
return NULL; // 子线程终止,等价于pthread_exit(NULL);
}
int main()
{
// 子线程
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error: %s\n", errstr);
}
// 主线程
for (int i = 0; i < 5; i++)
{
printf("%d\n", i);
}
printf("tid: %ld, main thread id: %ld\n", tid, pthread_self());
pthread_exit(NULL); // 主线程终止,不会影响其他正常运行的线程
return 0; // 进程终止,等价于exit(0);
}
#include <pthread.h>
int pthread_cancel(pthread_t thread);
// 给目标线程发送取消请求,目标线程收到取消请求不会立刻终止,而是执行到一个取消点才会终止
// 取消点:系统规定好的一些系统调用
// 成功时返回0,失败时返回错误码
#include <pthread.h>
int pthread_join(pthread_t thread, void** retval);
// 以阻塞的方式等待目标子线程终止,回收子线程资源
// 成功时返回0,失败时返回错误码
// thread 目标子线程ID
// retval 接收子线程退出时的返回值,不关心则设置为NULL
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
int value = 10;
void* callback(void* arg)
{
printf("child thread id: %ld\n", pthread_self());
pthread_exit((void*)&value); // return (void*)&value;
}
int main()
{
// 子线程
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error1: %s\n", errstr);
}
// 主线程
for (int i = 0; i < 5; i++)
{
printf("%d\n", i);
}
printf("tid: %ld, main thread id: %ld\n", tid, pthread_self());
// 回收子线程资源
int* thread_retval;
ret = pthread_join(tid, (void**)&thread_retval);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error2: %s\n", errstr);
}
printf("exit data: %d\n", *thread_retval);
printf("回收子线程资源成功!\n");
pthread_exit(NULL);
return 0;
}
#include <pthread.h>
int pthread_detach(pthread_t thread);
// 将目标线程标识为已分离的(detached)线程
// 默认情况下,新创建的线程是可连接的(joinable)的,需要主线程调用pthread_join回收子线程资源
// 已分离的(detached)线程终止时资源被自动释放回系统,当不关心子线程退出时的返回值时,我们可以分离它
// joinable和detached是冲突的
// 成功时返回0,失败时返回错误码
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <string.h>
void* callback(void* arg)
{
printf("chid thread id: %ld\n", pthread_self());
return NULL;
}
int main()
{
// 子线程
pthread_t tid;
int ret = pthread_create(&tid, NULL, callback, NULL);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error1: %s\n", errstr);
}
// 主线程
printf("tid: %ld, main thread id: %ld\n", tid, pthread_self());
// 分离子线程
ret = pthread_detach(tid);
if (ret != 0)
{
char* errstr = strerror(ret);
printf("error2: %s\n", errstr);
}
pthread_exit(NULL);
return 0;
}
为了避免多线程并发地访问共享资源时出现问题,可以使用互斥量(mutual exclusion,mutex)来确保对任意共享资源的原子访问。
原子性:指事务的不可分割性,一个事务的所有操作要么不间断地全部被执行,要么一个也没有执行。
互斥量本质是锁,在访问共享资源前对互斥量加锁,在访问完成后解锁。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 用宏来静态初始化互斥量
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t* restrict mutex,
const pthread_mutexattr_t* restrict attr);
// 成功时返回0,失败时返回错误码
// mutex 指向要初始化的互斥量
// attr 指向互斥量的属性,通常设置为NULL,表示以系统默认的属性完成初始化
// restrict,C语言中的一种类型限定符,用于告诉编译器,对象已经被指针所引用,
// 不能通过除该指针外所有其他直接或间接的方式修改该对象的内容。
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t* mutex);
// 静态初始化的互斥量不需要销毁
// 不要销毁一个已经加锁的互斥量
// 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
// 成功时返回0,失败时返回错误码
// mutex 指向要销毁的互斥量
#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t* mutex);
// 加锁,阻塞的,如果有一个线程加锁了,那么其他的线程只能阻塞等待
int pthread_mutex_trylock(pthread_mutex_t* mutex);
// 尝试加锁,如果加锁失败,不会阻塞,调用失败返回错误码
int pthread_mutex_unlock(pthread_mutex_t* mutex);
// 解锁
// 成功时返回0,失败时返回错误码
多线程模拟卖票:
3个窗口一共卖20张票:
// mythread.c
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
// 共享变量
int tickets = 20;
// 互斥量
pthread_mutex_t mutex;
// 卖票函数
void* sellTickets(void* arg)
{
char* id = (char*)arg;
while (1)
{
usleep(1000); // 单位是微秒
// 加锁
pthread_mutex_lock(&mutex);
if (tickets > 0)
{
printf("%s正在卖第%d张门票\n", id, 20 - tickets + 1);
tickets--;
}
else
{
// 解锁
pthread_mutex_unlock(&mutex);
break;
}
// 解锁
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main()
{
// 初始化互斥量
pthread_mutex_init(&mutex, NULL);
// 创建3个子线程
pthread_t tid1, tid2, tid3;
pthread_create(&tid1, NULL, sellTickets, "thread 1");
pthread_create(&tid2, NULL, sellTickets, "thread 2");
pthread_create(&tid3, NULL, sellTickets, "thread 3");
// 回收子线程资源
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_join(tid3, NULL);
// 销毁互斥量
pthread_mutex_destroy(&mutex);
// 退出主线程
pthread_exit(NULL);
return 0;
}
概念:
常见的线程不安全的情况:
常见的线程安全的情况:
常见的不可重入的情况:
常见的可重入的情况:
可重入与线程安全的联系:
可重入与线程安全的区别:
例如一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待“条件变量的条件成立”而挂起;另一个线程使“条件成立”(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 用宏来静态初始化条件变量
#include <pthread.h>
int pthread_cond_init(pthread_cond_t* restrict cond,
const pthread_condattr_t* restrict attr);
// 成功时返回0,失败时返回错误码
// cond 指向要初始化的条件变量
// attr 指向条件变量的属性,通常设置为NULL,表示以系统默认的属性完成初始化
// restrict,C语言中的一种类型限定符,用于告诉编译器,对象已经被指针所引用,
// 不能通过除该指针外所有其他直接或间接的方式修改该对象的内容。
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t* cond);
// 静态初始化的条件变量不需要销毁
// 成功时返回0,失败时返回错误码
// cond 指向要销毁的条件变量
当条件不成立时,条件变量可以阻塞当前线程,所有被阻塞的线程会构成一个等待队列。
#include <pthread.h>
int pthread_cond_timedwait(pthread_cond_t* restrict cond,
pthread_mutex_t* restrict mutex,
const struct timespec* restrict abstime);
// 在abstime指定的时间内阻塞线程,直到条件成立
// 超时返回ETIMEDOUT
int pthread_cond_wait(pthread_cond_t* restrict cond,
pthread_mutex_t* restrict mutex);
// 永久阻塞线程,直到条件成立
// 成功时返回0,失败时返回错误码
// cond 指向已经初始化的条件变量
// mutex 指向已经加锁的互斥量
// abstime 指向表示绝对时间的结构体,系统当前时间+等待时间
线程对互斥量加锁后准备执行任务,发现条件不成立,这时就需要阻塞线程,等待条件成立。如果什么都不做,直接等待,这个线程占着锁,其他线程就申请不到锁了。所以要把加锁的互斥量传给函数,函数被调用后会阻塞线程并自动释放锁,等待条件成立或超时,重新加锁并解除阻塞,然后函数返回。
#include<pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
// 唤醒一个等待cond条件发生的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 唤醒所有等待cond条件发生的线程
// 成功时返回0,失败时返回错误码
把缓冲区设计成一个阻塞队列,有先进先出的特性,并且,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出。
321原则:
生产者-消费者模型的优点:
// BlockingQueue.hpp
#pragma once
#include <queue>
#include <pthread.h>
template<typename T>
class BlockingQueue
{
public:
BlockingQueue(int capacity = 5)
: _capacity(capacity)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_producerCond, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
}
bool isFull() { return _q.size() == _capacity; }
bool isEmpty() { return _q.empty(); }
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while (isFull()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_producerCond, &_mutex);
}
// 缓冲区不满就放入
_q.push(in);
// 生产完,缓冲区一定不是空的,唤醒消费者线程
// 可以加策略,比如缓冲区内数据的个数>容量的一半时,再唤醒
// if(_q.size() >= _cap/2) pthread_cond_signal(&_consumerCond);
// 我们这里就不加策略了,直接唤醒
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_consumerCond); // 唤醒放在解锁前面和后面均可
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while (isEmpty()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
// 缓冲区不空就取出
*out = _q.front();
_q.pop();
// 消费完,缓冲区一定不是满的,唤醒生产者线程
// 这里也可以加策略,但是我们就不加了,直接唤醒
pthread_cond_signal(&_producerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_producerCond); // 唤醒放在解锁前面和后面均可
}
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_producerCond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _producerCond; // 生产者对应的条件变量,缓冲区满就等待
pthread_cond_t _consumerCond; // 消费者对应的条件变量,缓冲区空就等待
};
// main.cc
#include "BlockingQueue.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*>(arg);
while (true)
{
sleep(1);
// 1. 通过某种渠道获取数据
int data = rand() % 10 + 1; // 生产1-10的数字
// 2. 将数据放入阻塞队列中
bq->push(data);
std::cout << "producer data: " << data << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
BlockingQueue<int>* bq = static_cast<BlockingQueue<int>*>(arg);
while (true)
{
int data = 0;
// 1. 从阻塞队列中取出数据
bq->pop(&data);
// 2. 结合某种业务逻辑处理数据
// 我们这里写一行打印代码当作处理数据了
std::cout << "consumer data: " << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
BlockingQueue<int>* bq = new BlockingQueue<int>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, bq);
pthread_create(&p[1], nullptr, producer, bq);
pthread_create(&p[2], nullptr, producer, bq);
pthread_create(&c[0], nullptr, consumer, bq);
pthread_create(&c[1], nullptr, consumer, bq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete bq;
return 0;
}
BlockingQueue.hpp内容不变,再增加一个Task.hpp用来定义Task类,main.cc内容稍微改变。
// BlockingQueue.hpp
#pragma once
#include <queue>
#include <pthread.h>
template<typename T>
class BlockingQueue
{
public:
BlockingQueue(int capacity = 5)
: _capacity(capacity)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_producerCond, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
}
bool isFull() { return _q.size() == _capacity; }
bool isEmpty() { return _q.empty(); }
void push(const T& in)
{
pthread_mutex_lock(&_mutex);
while (isFull()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_producerCond, &_mutex);
}
// 缓冲区不满就放入
_q.push(in);
// 生产完,缓冲区一定不是空的,唤醒消费者线程
// 可以加策略,比如缓冲区内数据的个数>容量的一半时,再唤醒
// if(_q.size() >= _cap/2) pthread_cond_signal(&_consumerCond);
// 我们这里就不加策略了,直接唤醒
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_consumerCond); // 唤醒放在解锁前面和后面均可
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while (isEmpty()) // 用while不用if,防止虚假唤醒
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
// 缓冲区不空就取出
*out = _q.front();
_q.pop();
// 消费完,缓冲区一定不是满的,唤醒生产者线程
// 这里也可以加策略,但是我们就不加了,直接唤醒
pthread_cond_signal(&_producerCond);
pthread_mutex_unlock(&_mutex);
// pthread_cond_signal(&_producerCond); // 唤醒放在解锁前面和后面均可
}
~BlockingQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_producerCond);
}
private:
std::queue<T> _q;
int _capacity;
pthread_mutex_t _mutex;
pthread_cond_t _producerCond; // 生产者对应的条件变量,缓冲区满就等待
pthread_cond_t _consumerCond; // 消费者对应的条件变量,缓冲区空就等待
};
// Task.hpp
#pragma once
#include <string>
class Task
{
public:
Task() {}
Task(int x, int y, char op)
: _x(x)
, _y(y)
, _op(op)
, _result(0)
, _exitCode(0)
{}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "=";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task() {}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
// main.cc
#include "BlockingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
BlockingQueue<Task>* bq = static_cast<BlockingQueue<Task>*>(arg);
std::string ops = "+-*/%";
while (true)
{
// 1. 通过某种渠道获取数据
int x = rand() % 20 + 1; // x的范围是1~20
int y = rand() % 10 + 1; // y的范围是1~10
char op = ops[rand() % ops.size()]; // 随机取一个运算符
// 2. 将数据放入阻塞队列中
Task t(x, y, op);
bq->push(t);
std::cout << "producer Task: " << t.formatArg() << "?" << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
BlockingQueue<Task>* bq = static_cast<BlockingQueue<Task>*>(arg);
while (true)
{
sleep(1);
Task t;
// 1. 从阻塞队列中取出数据
bq->pop(&t);
// 2. 结合某种业务逻辑处理数据
t();
std::cout << "consumer Task: " << t.formatArg() << t.formatRes() << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
BlockingQueue<Task>* bq = new BlockingQueue<Task>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, bq);
pthread_create(&p[1], nullptr, producer, bq);
pthread_create(&p[2], nullptr, producer, bq);
pthread_create(&c[0], nullptr, consumer, bq);
pthread_create(&c[1], nullptr, consumer, bq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete bq;
return 0;
}
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
条件变量和信号量的区别:
#include <semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);
// 成功时返回0,失败时返回-1并设置errno
// sem 指向要初始化的信号量
// pshared 0表示线程间共享,非0表示进程间共享
// value 信号量初始值,信号量的值表示资源的数量
#include <semaphore.h>
int sem_destroy(sem_t* sem);
// 成功时返回0,失败时返回-1并设置errno
// sem 指向要销毁的信号量
#include <semaphore.h>
int sem_wait(sem_t* sem);
// 如果信号量的值>0,信号量的值-1
// 如果信号量的值=0,永久阻塞线程,直到信号量的值>0,信号量的值-1
int sem_trywait(sem_t* sem);
// 如果信号量的值>0,信号量的值-1
// 如果信号量的值=0,不阻塞线程,调用失败
int sem_timedwait(sem_t* sem, const struct timespec* abs_timeout);
// 如果信号量的值>0,信号量的值-1
// 如果信号量的值=0,在abs_timeout指定的时间内阻塞线程,直到信号量的值>0,信号量的值-1
// 超时则调用失败
// 成功时返回0,失败时返回-1并设置errno
#include <semaphore.h>
int sem_post(sem_t* sem);
// 信号量的值+1
// 成功时返回0,失败时返回-1并设置errno
用数组模拟环形队列:
生产者-消费者模型分析:
321原则:
两种生产者-消费者模型的区别:
// RingQueue.hpp
#pragma once
#include <vector>
#include <pthread.h>
#include <semaphore.h>
template<typename T>
class RingQueue
{
public:
RingQueue(int capacity = 5)
: _ring(capacity)
, _capacity(capacity)
, _head(0)
, _tail(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, capacity);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void push(const T& in)
{
// 先加锁,再P操作会怎么样?线程申请到锁之后,如果申请信号量失败,会带着锁阻塞,其他同类型的线程还会竞争锁但是竞争不到锁
// 先P操作,再加锁会怎么样?线程如果申请信号量失败,就不会去申请锁,这样会减少申请锁的次数,申请锁也是有代价的
// 先P操作,再加锁,效率更高
P(_space_sem); // 空闲的空间-1
Lock(_p_mutex);
_ring[_tail] = in;
_tail = (_tail + 1) % _capacity;
Unlock(_p_mutex);
V(_data_sem); // 存放的数据+1
}
void pop(T* out)
{
// 先P操作,再加锁,效率更高
P(_data_sem); // 存放的数据-1
Lock(_c_mutex);
*out = _ring[_head];
_head = (_head + 1) % _capacity;
Unlock(_c_mutex);
V(_space_sem); // 空闲的空间+1
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
std::vector<T> _ring; // 数组模拟环形队列
int _capacity; // 环形队列的容量
int _tail; // 生产者的位置
int _head; // 消费者的位置
sem_t _space_sem; // 生产者关心环形队列中空闲的空间,只要还有空间就能生产
sem_t _data_sem; // 消费者关心环形队列中存放的数据,只要还有数据就能消费
// 如果是单生产单消费不需要锁,多生产多消费需要两把锁,因为生产者和生产者之间是互斥的,消费者和消费者之间是互斥的
pthread_mutex_t _c_mutex; // 生产者锁
pthread_mutex_t _p_mutex; // 消费者锁
};
// main.cc
#include "RingQueue.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);
while (true)
{
sleep(1);
// 1. 通过某种渠道获取数据
int data = rand() % 10 + 1; // 生产1-10的数字
// 2. 将数据放入环形队列中
rq->push(data);
std::cout << "producer data: " << data << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(arg);
while (true)
{
int data = 0;
// 1. 从环形队列中取出数据
rq->pop(&data);
// 2. 结合某种业务逻辑处理数据
// 我们这里写一行打印代码当作处理数据了
std::cout << "consumer data: " << data << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
RingQueue<int>* rq = new RingQueue<int>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, rq);
pthread_create(&p[1], nullptr, producer, rq);
pthread_create(&p[2], nullptr, producer, rq);
pthread_create(&c[0], nullptr, consumer, rq);
pthread_create(&c[1], nullptr, consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete rq;
return 0;
}
RingQueue.hpp内容不变,再增加一个Task.hpp用来定义Task类,main.cc内容稍微改变。
// RingQueue.hpp
#pragma once
#include <vector>
#include <pthread.h>
#include <semaphore.h>
template<typename T>
class RingQueue
{
public:
RingQueue(int capacity = 5)
: _ring(capacity)
, _capacity(capacity)
, _head(0)
, _tail(0)
{
sem_init(&_data_sem, 0, 0);
sem_init(&_space_sem, 0, capacity);
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
void push(const T& in)
{
// 先加锁,再P操作会怎么样?线程申请到锁之后,如果申请信号量失败,会带着锁阻塞,其他同类型的线程还会竞争锁但是竞争不到锁
// 先P操作,再加锁会怎么样?线程如果申请信号量失败,就不会去申请锁,这样会减少申请锁的次数,申请锁也是有代价的
// 先P操作,再加锁,效率更高
P(_space_sem); // 空闲的空间-1
Lock(_p_mutex);
_ring[_tail] = in;
_tail = (_tail + 1) % _capacity;
Unlock(_p_mutex);
V(_data_sem); // 存放的数据+1
}
void pop(T* out)
{
// 先P操作,再加锁,效率更高
P(_data_sem); // 存放的数据-1
Lock(_c_mutex);
*out = _ring[_head];
_head = (_head + 1) % _capacity;
Unlock(_c_mutex);
V(_space_sem); // 空闲的空间+1
}
~RingQueue()
{
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t &m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t &m)
{
pthread_mutex_unlock(&m);
}
std::vector<T> _ring; // 数组模拟环形队列
int _capacity; // 环形队列的容量
int _tail; // 生产者的位置
int _head; // 消费者的位置
sem_t _space_sem; // 生产者关心环形队列中空闲的空间,只要还有空间就能生产
sem_t _data_sem; // 消费者关心环形队列中存放的数据,只要还有数据就能消费
// 如果是单生产单消费不需要锁,多生产多消费需要两把锁,因为生产者和生产者之间是互斥的,消费者和消费者之间是互斥的
pthread_mutex_t _c_mutex; // 生产者锁
pthread_mutex_t _p_mutex; // 消费者锁
};
// Task.hpp
#pragma once
#include <string>
class Task
{
public:
Task() {}
Task(int x, int y, char op)
: _x(x)
, _y(y)
, _op(op)
, _result(0)
, _exitCode(0)
{}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "=";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task() {}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
// main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <unistd.h>
// 生产者线程函数
void* producer(void* arg)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(arg);
std::string ops = "+-*/%";
while (true)
{
// 1. 通过某种渠道获取数据
int x = rand() % 20 + 1; // x的范围是1~20
int y = rand() % 10 + 1; // y的范围是1~10
char op = ops[rand() % ops.size()]; // 随机取一个运算符
// 2. 将数据放入环形队列中
Task t(x, y, op);
rq->push(t);
std::cout << "producer Task: " << t.formatArg() << "?" << std::endl;
}
}
// 消费者线程函数
void* consumer(void* arg)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(arg);
while (true)
{
sleep(1);
Task t;
// 1. 从环形队列中取出数据
rq->pop(&t);
// 2. 结合某种业务逻辑处理数据
t();
std::cout << "consumer Task: " << t.formatArg() << t.formatRes() << std::endl;
}
}
int main()
{
srand((unsigned int)time(nullptr) ^ getpid()); // 设置随机数种子
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t p[3], c[2];
pthread_create(&p[0], nullptr, producer, rq);
pthread_create(&p[1], nullptr, producer, rq);
pthread_create(&p[2], nullptr, producer, rq);
pthread_create(&c[0], nullptr, consumer, rq);
pthread_create(&c[1], nullptr, consumer, rq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete rq;
return 0;
}