线程池(Thread Pool)是一种并发编程的设计模式,它用于管理和重复使用线程,以提高程序的性能和资源利用率。线程池通过维护一组预先创建的线程,这些线程可以在需要时被重复使用,而不是为每个任务都创建一个新的线程。
线程池通常包含以下主要组件:
线程池的优势包括:
这里我没有按照上述的三部分来说明, 我觉得分为以下四个部分更容易理解线程池的架构
struct Task
class TaskQueue
线程池类
工作线程
管理者线程
这里主要就是利用c++的封装, 把任务队列封装为一个类, 因为任务队列自身要完成添加任务, 取出任务等行为, 所以我们把任务队列封装为一个类TaskQueue
, 这个类也将作为线程池类的类成员
因为任务队列里面放的肯定是任务, 这个任务是什么类型呢? 我们把这个任务也封装一下, 封装为任务结构体Task
, 那这个结构体里有哪些成员呢? 熟悉c语言的人应该知道回调函数的基本形式, 大多是由一个**函数指针和一个函数参数也就是通用指针组成**, 也就是:
void(*func)(void* arg);
void* arg;
这声明了一个函数指针 func
,该指针指向一个接受 void*
类型的参数(通常是一个指向某种数据的指针)并返回 void
的函数。
所以我们的任务结构体可以确定, 在定义一个任务队列 class TaskQueue
:
成员:
方法:
具体的实现代码如下:
// TaskQueue.h
#ifndef TEST_TASKQUEUE_H
#define TEST_TASKQUEUE_H
#include "queue"
using namespace std;
#include "pthread.h"
using callback = void(*) (void* arg);
struct Task{
callback func;
// void(*function)(void* arg);
void* arg;
// 构造函数
Task(){
func = nullptr;
arg = nullptr;
}
// 有参构造
Task(callback f, void* a): func(f), arg(a) {};
};
class TaskQueue {
public:
// 构造和析构
TaskQueue();
~TaskQueue();
// 添加任务
void addTask(Task task);
void addTask(callback f, void* arg);
// 取出任务
Task takeTask();
// 返回任务队列中的个数, 使用内联函数, 提高效率
inline int getTaskNum(){
return (int)m_Queue.size();
}
private:
pthread_mutex_t m_Mutex;
queue<Task> m_Queue;
};
#endif //TEST_TASKQUEUE_H
// TaskQueue.cpp
#include "TaskQueue.h"
#include "pthread.h"
TaskQueue::TaskQueue() {
pthread_mutex_init(&this->m_Mutex, nullptr);
}
TaskQueue::~TaskQueue() {
pthread_mutex_destroy(&this->m_Mutex);
}
void TaskQueue::addTask(Task task) {
pthread_mutex_lock(&this->m_Mutex);
this->m_Queue.push(task);
pthread_mutex_unlock(&this->m_Mutex);
}
void TaskQueue::addTask(callback f, void *arg) {
pthread_mutex_lock(&this->m_Mutex);
this->m_Queue.push(Task(f, arg));
pthread_mutex_unlock(&this->m_Mutex);
}
Task TaskQueue::takeTask() {
Task task;
pthread_mutex_lock(&this->m_Mutex);
task = this->m_Queue.front();
this->m_Queue.pop();
pthread_mutex_unlock(&this->m_Mutex);
return task;
}
这个类就是线程池类, 实现线程池的各种操作, 其中线程池的创建和析构较为麻烦一些, 剩下的行数都比较简单
类成员:
TaskQueue
类函数:
先看声明文件:
// ThreadPool.h
#ifndef TEST_THREADPOOL_H
#define TEST_THREADPOOL_H
#include "TaskQueue.h"
class ThreadPool {
public:
// 构造函数
ThreadPool(int min, int max);
~ThreadPool();
// 添加任务
void addTask(Task task);
// 获取忙线程个数
int getBusyNum();
// 获取活着的线程个数
int getLiveNum();
private:
// 为什么要设置为静态函数呢
// 这里也可以不将其设置为静态的, 可以把worker函数和manager函数变为全局函数
// 具体原因可以看构造函数中创建线程的部分
static void* worker(void* arg);
static void* manager(void* arg);
void threadExit();
private:
TaskQueue* taskQueue; // 任务队列
pthread_t managerId; // 管理者线程ID
pthread_t* workIDs; // 工作的线程ID 多个
int m_Min; // 最少线程数量
int m_Max; // 最多线程数量
int m_BusyNum; // 忙着的线程数量
int m_LiveNum; // 存活的线程数量
int m_ExitNum; // 要销毁的线程数量 (线程较多, 任务少的时候)
pthread_mutex_t m_MutexPool; // 锁整个线程池
pthread_cond_t m_NotEmpty; // 任务队列是不是空了
bool m_ShutDown = false; // 是否要销毁线程池, 销毁 - 1, 不销毁 - 0
};
#endif //TEST_THREADPOOL_H
然后我们一个一个的看ThreadPool
里面的函数
首先是构造函数, 也就是我们创建线程池的时候要完成哪些操作:
ThreadPool(int min, int max);
函数的参数时线程池最少线程数量和最多线程数量, 意思就是就算没有任务, 也要有min个线程就绪, 任务再多, 也只能有max个线程
步骤:
来看一下构造函数的具体实现:
// 构造函数
ThreadPool::ThreadPool(int min, int max) {
do {
this->taskQueue = new TaskQueue;
if(taskQueue == nullptr){
cout << "new taskQueue failed..." << endl;
break;
}
workIDs = new pthread_t[max];
if (workIDs == nullptr) {
printf("new threadIDs failed...\n");
break;
}
// 将工作线程的id都初始化为0
memset(workIDs, 0, sizeof(pthread_t) * max);
m_Min = min;
m_Max = max;
m_BusyNum = 0;
m_LiveNum = min;
m_ExitNum = 0;
// 判断锁和条件变量是否初始化成功
if (pthread_mutex_init(&m_MutexPool, nullptr) != 0 ||
pthread_cond_init(&m_NotEmpty, nullptr) != 0) {
printf("mutex or condition init failed...\n");
break;
}
m_ShutDown = false;
// 创建线程
// 管理者线程
pthread_create(&managerId, nullptr, manager, this);
// 工作线程
for (int i = 0; i < min; ++i) {
pthread_create(&workIDs[i], nullptr, worker, this);
}
return;
} while (0);
// do while 外面这些代码都是出了问题才会走到的
delete []workIDs;
delete taskQueue;
return;
}
这里的do…while(0) 结构主要是在好几次初始化的过程中有可能失败, 失败的话每个条件判断语句中都要释放一遍堆内存, 干脆就放在do…while循环中, 如果程序都正常运行, 在do中的最后就return了, 走不到do…while的外面
在创建管理者线程和工作线程的时候, 我们把this
指针传给了manager
函数和worker
函数, 所以我们再来看这两个函数
虽然这一部分我在大纲中归为第三大部分, 但其实worker
函数和manager
函数也是ThreadPool
类的一部分, 只不过是静态成员函数(静态的原因后面再说)
看一下工作线程的处理流程
具体实现:
void *ThreadPool::worker(void *arg) {
// 类型转换
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 要一直检查队列里的内容
while(true){
pthread_mutex_lock(&pool->m_MutexPool);
// 判断任务队列是否为空
while(pool->taskQueue->getTaskNum() == 0 && !pool->m_ShutDown){
// 阻塞工作线程 条件变量
pthread_cond_wait(&pool->m_NotEmpty, &pool->m_MutexPool);
// 判断是否有要销毁的线程
if(pool->m_ExitNum > 0){
--pool->m_ExitNum;
if(pool->m_LiveNum > pool->m_Min){
--pool->m_LiveNum;
pthread_mutex_unlock(&pool->m_MutexPool);
pool->threadExit();
}
}
}
// 判断线程池是否要关闭
if(pool->m_ShutDown){
// 先解锁后退出
pthread_mutex_unlock(&pool->m_MutexPool);
pool->threadExit();
}
// 取任务
Task task = pool->taskQueue->takeTask();
// busy线程+1
++pool->m_BusyNum;
// 解锁
pthread_mutex_unlock(&pool->m_MutexPool);
// 开始工作
cout << "thread "<< pthread_self() << " start working...\n";
// 任务处理
task.func(task.arg);
delete task.arg;
task.arg = nullptr;
// 处理结束
cout << "thread "<< pthread_self() << " end working...\n";
pthread_mutex_lock(&pool->m_MutexPool);
--pool->m_BusyNum;
pthread_mutex_unlock(&pool->m_MutexPool);
}
return nullptr;
}
类型转换说明: 因为传进来的是一个void*
类型的指针, 我们要把它转换为ThreadPool*
类型离开操作
while(true)说明: 工作线程要一直检测任务队列中是否有任务, 只要有任务就要处理, 没有的话就阻塞
管理者的任务主要是按照线程池工作线程的数量和任务数量相应的创建和销毁线程
按照频率检测线程数量
具体实现代码:
void *ThreadPool::manager(void *arg) {
// 类型转换
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while(!pool->m_ShutDown){
// 按频率3s检测一次
sleep(3);
// 取出当前任务队列的任务个数 和 当前线程数 和 繁忙的线程数
pthread_mutex_lock(&pool->m_MutexPool);
int queueSize = pool->taskQueue->getTaskNum();
int liveNum = pool->m_LiveNum;
int busyNum = pool->m_BusyNum;
pthread_mutex_unlock(&pool->m_MutexPool);
// 按相应规则创建和销毁线程
// 创建线程
// 规则: 任务数量 > 线程数量 && 线程数量 < max
const int NUMBER = 2;
if(queueSize > liveNum && liveNum < pool->m_Max){
// 创建线程
// 在threadIDs数组中找一个可用的空间存放新创建的id
// 遍历整个threadID数组, 看哪些可用
pthread_mutex_lock(&pool->m_MutexPool);
int count = 0;
for(int i = 0; i < pool->m_Max && pool->m_LiveNum < pool->m_Max && count < NUMBER; ++i){
if(pool->workIDs[i] == 0){ // 空间可用
pthread_create(&pool->workIDs[i], nullptr, worker, pool);
++count;
++pool->m_LiveNum;
}
}
pthread_mutex_unlock(&pool->m_MutexPool);
}
// 销毁线程
// 规则: 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if(busyNum * 2 < liveNum && liveNum > pool->m_Min){
pthread_mutex_lock(&pool->m_MutexPool);
pool->m_ExitNum = NUMBER;
// 让线程自杀
for (int i = 0; i < NUMBER && pool->m_LiveNum > pool->m_Min; ++i) {
pthread_cond_signal(&pool->m_NotEmpty);
}
pthread_mutex_unlock(&pool->m_MutexPool);
}
}
return nullptr;
}
具体的规则已经在代码中指出, 这个规则是可以自己规定的
解释一下让线程自杀的逻辑:
让线程自杀之后, 会调用pthread_cond_signal
唤醒阻塞的worker
线程, 就是上一部分中第10行代码, 这时exitNum不为0, 就会执行第12行的逻辑, 也就是worker线程的这一部分:
// 阻塞工作线程 条件变量
pthread_cond_wait(&pool->m_NotEmpty, &pool->m_MutexPool);
// 判断是否有要销毁的线程
if(pool->m_ExitNum > 0){
--pool->m_ExitNum;
if(pool->m_LiveNum > pool->m_Min){
--pool->m_LiveNum;
pthread_mutex_unlock(&pool->m_MutexPool);
pool->threadExit();
}
}
其余各种锁的逻辑应该都能看懂, 不做过多解释
这部分的逻辑相对简单一些
直接看代码:
void ThreadPool::addTask(Task task) {
if(m_ShutDown) return;
// 添加任务,不需要加锁,任务队列中有锁
this->taskQueue->addTask(task);
// 唤醒工作线程
pthread_cond_signal(&this->m_NotEmpty);
}
int ThreadPool::getBusyNum() {
int busyNum = 0;
pthread_mutex_lock(&m_MutexPool);
busyNum = m_BusyNum;
pthread_mutex_unlock(&m_MutexPool);
return busyNum;
}
int ThreadPool::getLiveNum() {
int liveNum = 0;
pthread_mutex_lock(&m_MutexPool);
liveNum = m_LiveNum;
pthread_mutex_unlock(&m_MutexPool);
return liveNum;
}
细心的可以看出其他部分代码, 线程退出是使用的pool->threadExit();
, 而不是直接pthread_exit(NULL)
原因主要是, 在线程退出后, 我们还需要把这个线程在workerID中的线程id重置为0, 所以不是单纯的调用pthread_exit(NULL)
这么简单, 看代码:
void ThreadPool::threadExit() {
pthread_t pid = pthread_self();
for(int i = 0; i < this->m_Max; ++i){
if(this->workIDs[i] == pid){
cout << "threadExit() function: thread "
<< to_string(pthread_self()) << " exiting..." << endl;
this->workIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}
所有代码已经准备完毕, 给大家看一下整体的结构
一共5个文件, 2个.h 对应2个 .cpp, 还有一个main文件
Thread Pool.cpp文件的结构:
把完整的ThreadPool.cpp
代码放在下面:
ThreadPool.cpp
// ThreadPool.cpp
#include "ThreadPool.h"
#include "pthread.h"
#include <cstring>
#include <unistd.h>
ThreadPool::ThreadPool(int min, int max) {
do {
this->taskQueue = new TaskQueue;
if(taskQueue == nullptr){
cout << "new taskQueue failed..." << endl;
break;
}
workIDs = new pthread_t[max];
if (workIDs == nullptr) {
printf("new threadIDs failed...\n");
break;
}
// 将工作线程的id都初始化为0
memset(workIDs, 0, sizeof(pthread_t) * max);
m_Min = min;
m_Max = max;
m_BusyNum = 0;
m_LiveNum = min;
m_ExitNum = 0;
// 判断锁和条件变量是否初始化成功
if (pthread_mutex_init(&m_MutexPool, nullptr) != 0 ||
pthread_cond_init(&m_NotEmpty, nullptr) != 0) {
printf("mutex or condition init failed...\n");
break;
}
m_ShutDown = false;
// 创建线程
// 管理者线程
// 将manager和worker函数设置为静态函数的进一步解释
// 在使用pthread_create函数创建线程时,该函数的第三个参数是一个函数指针,指向线程所要执行的函数。
// 在C++中,非静态成员函数需要一个对象实例才能被调用,而pthread_create函数只接受普通的函数指针,
// 因此在这种情况下,通常会将非静态成员函数转换为静态成员函数或全局函数。
//
// 静态成员函数和全局函数没有与特定对象实例相关联,因此可以直接使用函数指针传递给pthread_create,
// 而不需要担心对象的实例。这是因为静态成员函数和全局函数不依赖于特定对象的状态,它们只能访问静态成员或全局变量。
pthread_create(&managerId, nullptr, manager, this);
// 把this传给manager函数:
// 因为manager函数是静态函数, 只能访问类的静态成员变量
// 要想访问类的非静态成员变量(函数), 必须把类的实例对象传进去
// 工作线程
for (int i = 0; i < min; ++i) {
pthread_create(&workIDs[i], nullptr, worker, this);
}
return;
} while (0);
// do while 外面这些代码都是出了问题才会走到的
delete []workIDs;
delete taskQueue;
return;
}
ThreadPool::~ThreadPool() {
m_ShutDown = true;
// 阻塞回收管理者线程
pthread_join(managerId, NULL);
// 唤醒消费者线程
for(int i = 0; i < m_LiveNum; ++i){
pthread_cond_signal(&m_NotEmpty);
}
// 删除堆内存
if(taskQueue) delete taskQueue;
if(workIDs) delete []workIDs;
pthread_mutex_destroy(&m_MutexPool);
pthread_cond_destroy(&m_NotEmpty);
}
void ThreadPool::addTask(Task task) {
if(m_ShutDown) return;
// 添加任务,不需要加锁,任务队列中有锁
this->taskQueue->addTask(task);
// 唤醒工作线程
pthread_cond_signal(&this->m_NotEmpty);
}
int ThreadPool::getBusyNum() {
int busyNum = 0;
pthread_mutex_lock(&m_MutexPool);
busyNum = m_BusyNum;
pthread_mutex_unlock(&m_MutexPool);
return busyNum;
}
int ThreadPool::getLiveNum() {
int liveNum = 0;
pthread_mutex_lock(&m_MutexPool);
liveNum = m_LiveNum;
pthread_mutex_unlock(&m_MutexPool);
return liveNum;
}
void *ThreadPool::worker(void *arg) {
// 类型转换
ThreadPool* pool = static_cast<ThreadPool*>(arg);
// 要一直检查队列里的内容
while(true){
pthread_mutex_lock(&pool->m_MutexPool);
// 判断任务队列是否为空
while(pool->taskQueue->getTaskNum() == 0 && !pool->m_ShutDown){
// 阻塞工作线程 条件变量
pthread_cond_wait(&pool->m_NotEmpty, &pool->m_MutexPool);
// 判断是否有要销毁的线程
if(pool->m_ExitNum > 0){
--pool->m_ExitNum;
if(pool->m_LiveNum > pool->m_Min){
--pool->m_LiveNum;
pthread_mutex_unlock(&pool->m_MutexPool);
pool->threadExit();
}
}
}
// 判断线程池是否要关闭
if(pool->m_ShutDown){
// 先解锁后退出
pthread_mutex_unlock(&pool->m_MutexPool);
pool->threadExit();
}
// 取任务
Task task = pool->taskQueue->takeTask();
// busy线程+1
++pool->m_BusyNum;
// 解锁
pthread_mutex_unlock(&pool->m_MutexPool);
// 开始工作
cout << "thread "<< pthread_self() << " start working...\n";
// 任务处理
task.func(task.arg);
delete task.arg;
task.arg = nullptr;
// 处理结束
cout << "thread "<< pthread_self() << " end working...\n";
pthread_mutex_lock(&pool->m_MutexPool);
--pool->m_BusyNum;
pthread_mutex_unlock(&pool->m_MutexPool);
}
return nullptr;
}
void *ThreadPool::manager(void *arg) {
// 类型转换
ThreadPool* pool = static_cast<ThreadPool*>(arg);
while(!pool->m_ShutDown){
// 按频率3s检测一次
sleep(3);
// 取出当前任务队列的任务个数 和 当前线程数 和 繁忙的线程数
pthread_mutex_lock(&pool->m_MutexPool);
int queueSize = pool->taskQueue->getTaskNum();
int liveNum = pool->m_LiveNum;
int busyNum = pool->m_BusyNum;
pthread_mutex_unlock(&pool->m_MutexPool);
// 按相应规则创建和销毁线程
// 创建线程
// 规则: 任务数量 > 线程数量 && 线程数量 < max
const int NUMBER = 2;
if(queueSize > liveNum && liveNum < pool->m_Max){
// 创建线程
// 在threadIDs数组中找一个可用的空间存放新创建的id
// 遍历整个threadID数组, 看哪些可用
pthread_mutex_lock(&pool->m_MutexPool);
int count = 0;
for(int i = 0; i < pool->m_Max && pool->m_LiveNum < pool->m_Max && count < NUMBER; ++i){
if(pool->workIDs[i] == 0){ // 空间可用
pthread_create(&pool->workIDs[i], nullptr, worker, pool);
++count;
++pool->m_LiveNum;
}
}
pthread_mutex_unlock(&pool->m_MutexPool);
}
// 销毁线程
// 规则: 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
if(busyNum * 2 < liveNum && liveNum > pool->m_Min){
pthread_mutex_lock(&pool->m_MutexPool);
pool->m_ExitNum = NUMBER;
// 让线程自杀
for (int i = 0; i < NUMBER && pool->m_LiveNum > pool->m_Min; ++i) {
pthread_cond_signal(&pool->m_NotEmpty);
}
pthread_mutex_unlock(&pool->m_MutexPool);
}
}
return nullptr;
}
void ThreadPool::threadExit() {
pthread_t pid = pthread_self();
for(int i = 0; i < this->m_Max; ++i){
if(this->workIDs[i] == pid){
cout << "threadExit() function: thread "
<< to_string(pthread_self()) << " exiting..." << endl;
this->workIDs[i] = 0;
break;
}
}
pthread_exit(NULL);
}
代码里也即是了为什么要把worker和manager函数设置为静态成员函数, 以及为什么要把this指针传给回调函数, 如果不传的话, 静态成员函数是无法访问类中的其他非静态成员变量的
最后看一下运行文件main.cpp
// main.cpp
#include "TaskQueue.h"
#include "iostream"
#include "TaskQueue.cpp"
#include "ThreadPool.h"
#include "ThreadPool.cpp"
using namespace std;
void func(void* arg){
int num = *(int*)arg;
cout << "thread " << pthread_self() << " is working, num = " << num << endl;
sleep(1);
}
int main(){
ThreadPool pool(3, 10);
for (int i = 0; i < 100; ++i) {
int* num = new int(i + 100);
pool.addTask(Task(func, num));
}
sleep(20);
return 0;
}
这里主要是写了一个数数的任务函数
运行效果如下图:
但是不知道为什么有的地方会出现bug, 大家有想法的也可以告诉我, 我改正一下