什么是线程池?
一次预先申请一批线程,让这批线程有任务,就处理任务;没任务,就处于等待状态。
为什么要有线程池?
以空间换时间,预先申请一批线程,当有任务到来,可以直接指派给线程执行。
// task.hpp
#pragma once
#include <functional>
using namespace std;
typedef function<int(int, int)> calc_func_t;
class Task
{
public:
Task() {}
Task(int x, int y, calc_func_t func)
: _x(x), _y(y), _calc_func(func)
{}
// 加法计算的任务
int operator()() { return _calc_func(_x, _y); }
int get_x() { return _x; }
int get_y() { return _y; }
private:
int _x;
int _y;
calc_func_t _calc_func;
};
// log.hpp
#pragma once
#include <string>
#include <stdarg.h>
#include <unordered_map>
using namespace std;
#define LOG_FILE "./threadpool.log"
// 日志是有日志级别的
enum LogLevel
{
DEBUG,
NORMAL,
WARNING,
ERROR,
FATAL
};
// 针对枚举类型的哈希函数
template <typename T>
class EnumHash
{
public:
size_t operator()(const T& t) const { return static_cast<size_t>(t); }
};
unordered_map<LogLevel, string, EnumHash<LogLevel>> logLevelMap = {
{DEBUG, "DEBUG"},
{NORMAL, "NORMAL"},
{WARNING, "WARNING"},
{ERROR , "ERROR"},
{FATAL, "FATAL"}
};
// 完整的日志功能,至少有:日志等级 时间 支持用户自定义
void logMessage(LogLevel log_level, const char* format, ...)
{
#ifndef DEBUG_SHOW
if(log_level == DEBUG) return; // DEBUG_SHOW没有定义,不展示DEBUG信息
#endif
char stdBuffer[1024]; // 标准部分
char logBuffer[1024]; // 自定义部分
time_t timestamp = time(nullptr);
struct tm* ploct = localtime(×tamp);
snprintf(stdBuffer, sizeof stdBuffer, "[%s] [%04d-%02d-%02d %02d:%02d:%02d]", logLevelMap[log_level].c_str(),\
1900 + ploct->tm_year, 1 + ploct->tm_mon, ploct->tm_mday, ploct->tm_hour, ploct->tm_min, ploct->tm_sec);
va_list args;
va_start(args, format);
vsnprintf(logBuffer, sizeof logBuffer, format, args);
va_end(args);
FILE* log_file = fopen(LOG_FILE, "a");
fprintf(log_file, "%s %s\n", stdBuffer, logBuffer);
fclose(log_file);
}
va_*
系列函数与vprintf
系列函数配合使用可以格式化打印传入的可变参数的内容。
// thread.hpp
#pragma once
#include <string>
#include <cstdio>
#include <pthread.h>
using namespace std;
// 对应创建线程时的routine函数的类型
typedef void*(*func_t)(void*);
class ThreadData
{
public:
void* _ptpool; // 指向线程池对象
string _name;
};
class Thread
{
public:
Thread(int num, func_t callBack, void* _ptpool)
: _func(callBack)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof(nameBuffer), "Thread_%d", num);
_tdata._name = nameBuffer;
_tdata._ptpool = _ptpool;
}
void start() { pthread_create(&_tid, nullptr, _func, (void*)&_tdata); }
void join() { pthread_join(_tid, nullptr); }
const string& name() { return _tdata._name; }
private:
pthread_t _tid; // 线程ID
func_t _func; // 线程routine
ThreadData _tdata; // 线程数据
};
// threadPool.hpp
#pragma once
#include <vector>
#include <queue>
#include "thread.hpp"
#include "lockGuard.hpp"
#include "log.hpp"
const int g_thread_num = 3;
// 线程池:本质是生产消费模型
template<class T>
class threadPool
{
private:
threadPool(int thread_num = g_thread_num)
: _thread_num(thread_num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
for(int i = 0; i < _thread_num; ++i)
{
_threads.push_back(new Thread(i + 1/*线程编号*/, routine, this/*可以传this指针*/));
}
}
threadPool(const threadPool<T>&) = delete;
const threadPool<T>& operator=(const threadPool<T>&) = delete;
public:
// 考虑多个线程使用单例的情况
static threadPool<T>* getThreadPool(int thread_num = g_thread_num)
{
if(nullptr == _pthread_pool)
{
lockGuard lock_guard(&_pool_lock);
// 在单例创建好后,锁也就没有意义了
// 将来任何一个线程要获取单例,仍必须调用getThreadPool接口
// 这样一定会存在大量的申请和释放锁的行为
// 所以外层if判断,用于在单例创建的情况下,拦截大量的线程因请求单例而访问锁的行为
if(nullptr == _pthread_pool)
{
_pthread_pool = new threadPool<T>(thread_num);
}
}
return _pthread_pool;
}
void run()
{
for(auto& pthread : _threads)
{
pthread->start();
logMessage(NORMAL, "%s %s", (pthread->name()).c_str(), "启动成功");
}
}
void pushTask(const T& task)
{
lockGuard lock_guard(&_lock);
_task_queue.push(task);
pthread_cond_signal(&_cond);
}
~threadPool()
{
for(auto& pthread : _threads)
{
pthread->join();
delete pthread;
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
public:
pthread_mutex_t* getMutex()
{
return &_lock;
}
bool isEmpty()
{
return _task_queue.empty();
}
void waitCond()
{
pthread_cond_wait(&_cond, &_lock);
}
T& getTask()
{
T& task = _task_queue.front();
_task_queue.pop();
return task;
}
private:
// 消费过程
static void* routine(void* args)
{
ThreadData* tdata = (ThreadData*)args;
threadPool<T>* tpool = (threadPool<T>*)tdata->_ptpool;
while(true)
{
T task;
{
lockGuard lock_guard(tpool->getMutex());
while (tpool->isEmpty()) tpool->waitCond();
task = tpool->getTask();
}
logMessage(WARNING, "%s 处理完成: %d + %d = %d", (tdata->_name).c_str(), task.get_x(), task.get_y(), task());
}
}
private:
vector<Thread*> _threads; // 数组存放创建的线程的地址
int _thread_num; // 创建的线程个数
queue<T> _task_queue; // 阻塞式任务队列
pthread_mutex_t _lock; // 针对任务队列的锁
pthread_cond_t _cond; // 队列空满情况的条件变量
static threadPool<T>* _pthread_pool; // 饿汉式线程池
static pthread_mutex_t _pool_lock; // 针对线程池的锁
};
template<class T>
threadPool<T>* threadPool<T>::_pthread_pool = nullptr;
template<class T>
pthread_mutex_t threadPool<T>::_pool_lock = PTHREAD_MUTEX_INITIALIZER;
// test.cc
#include "task.hpp"
#include "threadPool.hpp"
#include <unistd.h>
#include <ctime>
void test1()
{
srand((unsigned int)time(nullptr) ^ getpid());
threadPool<Task>::getThreadPool()->run();
while(true)
{
// 生产的过程 - 制作任务的时候要花时间的
int x = rand() % 100 + 1;
usleep(2023);
int y = rand() % 50 + 1;
Task task(x, y, [](int x, int y){ return x + y; });
logMessage(DEBUG, "制作任务完成: %d + %d = ?", x, y);
// 推送任务到线程池
threadPool<Task>::getThreadPool()->pushTask(task);
sleep(1);
}
}
# Makefile
test:test.cc
g++ -o $@ $^ -std=c++11 -lpthread -DDEBUG_SHOW
.PHONY:clean
clean:
rm -f test
运行结果:
自旋锁:本质是通过不断检测锁的状态,来确定资源是否就绪的方案。
什么时候使用自旋锁?这个由临界资源就绪的时间长短决定。
自旋锁的初始化 & 销毁:
自旋锁的加锁:
自旋锁的解锁:
写者与写者:互斥关系
读者与写者:互斥 & 同步关系
读者与读者:共享关系
读者写者问题和生产消费模型的本质区别在于,消费者会拿走数据(做修改),而读者不会。
读写锁的初始化 & 销毁:
读写锁之读者加锁:
读写锁之写者加锁:
读写锁的解锁:
关于是读者还是写者优先的问题,抛开应用场景去谈技术细节就是耍流氓。
而pthread库中的读写锁默认采用读者优先,这类的应用场景主要是:数据被读取的频率非常高,被修改的频率非常低。