前置知识:进程的概念和如下的页表概念
在32位平台下有232个地址,也就是一张页表就要有232个映射关系。
每张表的内容出了映射关系外,还包含了一些权限相关信息。比如页表分为内核级页表和用户级页表,通过权限信息来区分。
每个表项中存储了一个物理地址和虚拟地址,这里一共要占用8个字节,再考虑权限相关的信息,这里粗略地认为每个表项总共占用了10个字节。
那么总共就需要232 * 10 字节,也就是40GB的大小。
在32位平台下最大的内存也就仅有4GB,说明这种页表映射的方式是不合理的。
在Linux中的处理方式是建立一个二级页表。
实现方法:
物理地址是以“块”为单位的,这个块的大小就是4KB也就是212,对应了偏移量最大值。
这就是二级页表的结构,页目录就是一个一级页表,而表项就是一个二级页表。
计算页表的总大小:
首先只用了20个比特位来建立映射关系,那么最大也就是220个字节,也就是1MB。在页表中,左边占了10个比特位,而右边占了20个比特位,共30个比特位,这里假设加起来一共占了32个比特位(方便计算),也就是4个字节。那么总大小就是220 * 4 byte = 4MB。
映射过程是由MMU这个硬件完成的,页表是一种软件映射,MMU是一种硬件映射。
MMU是Memory Management Unit的缩写,中文名是内存管理单元,有时称作分页内存管理单元(英语:paged memorymanagementunit,缩写为PMMU)。它是一种负责处理中央处理器(CPU)的内存访问请求的计算机硬件。它的功能包括虚拟地址到物理地址的转换(即虚拟内存管理)、内存保护、中央处理器高速缓存的控制,在较为简单的计算机体系结构中,负责总线的仲裁以及存储体切换
优点:
缺点:
进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:
进程和线程的关系如下图:
在Linux系统的的视角下,Linux下没有真正意义的线程,而是用进程模拟的线程(LWP,轻量级进程),所以Linux不会提供直接创建线程的系统调用,最多提供创建轻量级进程的接口。
但是对于用户来说,用户需要的是线程接口。
所以Linux提供了用户线程库,对下将Linux接口封装,对上给用户提供进行线程控制的接口,也就是pthread库(原生线程库)
PROSIX线程库:
函数原型:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
函数功能:创建一个新的线程
参数:
返回值:成功返回0;失败返回错误码
错误检查:
(1)代码示例:创建一个新线程
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* args)
{
while(1)
{
cout << "new thread running : " << getpid() << endl;
sleep(1);
}
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, threadRun, nullptr);
while(1)
{
cout << "main thread running : " << getpid() << endl;
sleep(1);
}
return 0;
}
(2)运行结果:
由打印结果可以看到,主线程和新线程都打印了相应的字符串。
使用
ps -aL | head -1 && ps -aL | grep test1
命令查看执行的线程,test是C++可执行文件
pthread_t pthread_self(void);
(1)代码示例:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* args)
{
while(1)
{
cout << "new thread running : " << getpid() << "线程id : " << pthread_self() << endl;
sleep(1);
}
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, threadRun, nullptr);
while(1)
{
cout << "main thread running : " << getpid() << endl;
sleep(1);
}
return 0;
}
(2)运行结果:
pthread_t 到底是什么类型呢?取决于实现。对于Linux目前实现的本地线程库实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。
下图中的mmap区域是共享区:
只终止某个线程而不是终止整个进程,可以有三种方法:
注意:在线程中使用return代表该线程退出,而在main函数(主线程)中使用return代表整个进程退出。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* args)
{
while(1)
{
cout << "new thread running : " << getpid() << "线程id : " << pthread_self() << endl;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, threadRun, nullptr);
return 0;
}
该代码并不会打印新线程中的字符串,因为主线程退出整个进程都终止了。
void pthread_exit(void *retval);
函数功能:线程终止
参数:
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
(1)pthread_exit 或者 return 返回的指针所指向的内存单元必须是全局的或者是 malloc 分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回指针时,线程函数已经退出了。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* args)
{
int cnt = 5;
while(cnt--)
{
cout << "new thread running : " << getpid() << "线程id : " << pthread_self() << endl;
sleep(1);
}
pthread_exit((void*)1);
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, threadRun, nullptr);
void *ret = nullptr;
// pthread_join表示线程等待,主线程执行完后还需等待其他线程
pthread_join(pid, &ret);
// 这里会把线程退出码信息通过该函数给ret
cout << "new thread exit code is : " << (int64_t)ret << endl;
// 这里使用int64_t强制转换是因为平台下Linux的指针是8字节的。
return 0;
}
(2)运行结果:
在线程等待的情况下,新线程在5秒后结束了并返回了线程退出码。
int pthread_cancel(pthread_t thread);
功能:取消一个执行中的线程。
参数:
返回值:成功返回0;失败返回错误码。
(1)线程是可以取消自己的,甚至新线程也可以取消主线程,取消成功的线程的退出码一般是 -1。
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* threadRun(void* args)
{
int cnt = 5;
while(cnt--)
{
cout << "new thread running : " << getpid() << "线程id : " << pthread_self() << endl;
sleep(1);
}
pthread_exit((void*)1);
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, threadRun, nullptr);
sleep(3);
//取消新线程
pthread_cancel(pid);
void *ret = nullptr;
// pthread_join表示线程等待,主线程执行完后还需等待其他线程
pthread_join(pid, &ret);
// 这里会把线程退出码信息通过该函数给ret
cout << "new thread exit code is : " << (int64_t)ret << endl;
// 这里使用int64_t强制转换是因为平台下Linux的指针是8字节的。
return 0;
}
(2)运行结果:
(1)为什么需要线程等待?
(2)线程等待函数:
int pthread_join(pthread_t thread, void **retval);
功能:等待线程结束。
参数:
返回值:线程等待成功返回0,失败返回错误码。
调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:
(3)代码演示让新线程创建5s后退出,随后再过几秒后被thread_join等待,当主进程开始打印消息时,说明新线程join等待完成:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* thread_rum(void* args)
{
const char* name = static_cast<const char*>(args);
int cnt = 5;
while (true)
{
printf("%s 正在运行, thread id: 0x%x\n", name, pthread_self());
sleep(1);
if (!(cnt--))
{
break;
}
}
cout << "线程退出啦...." << endl;
return nullptr;
}
int main()
{
pthread_t pit;
int n = pthread_create(&pit, nullptr, thread_rum, (void*)"new thread");
sleep(3);
pthread_join(pit, nullptr);
cout << "main thread join success" << endl;
sleep(3);
while(1)
{
cout << "main thread " << pthread_self() << endl;
sleep(1);
}
return 0;
}
运行结果:
可以使用如下脚本来监控线程运行状况:
[xiaomaker@VM-28-13-centos test_12_07]$ while :; do ps -aL | head -1 && ps -aL | grep mythread; sleep 1; done
通过如上代码可以发现当创建线程后,线程1正在运行,5s后新线程退出了,我们的监控脚本观察到线程由两个变成了一个,但是正常情况下预期应该是两个线程,随后线程等待成功,这里还是只能看到一个线程。不是说好退出后应该看到的是两个线程吗,事实上一个线程退出后我们并没有看到预期结果。原因是ps命令在查的时候退出的线程是不给你显示的,所以你只能看到一个线程。但是现在不能证明当前的新线程在退出没有被join的时候就没有内存泄漏。
所以线程退出的时候,一般必须要进行join,如果不进行join,就会造成类似于进程那样的内存泄漏问题。
(4)线程异常的问题:
野指针代码演示:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* thread_rum(void* args)
{
const char* name = static_cast<const char*>(args);
int cnt = 5;
while (true)
{
printf("%s 正在运行, thread id: 0x%x\n", name, pthread_self());
sleep(1);
if (!(cnt--))
{
int* p = nullptr;
*p = 100;
break;
}
}
cout << "线程退出啦...." << endl;
return (void*)10;
}
int main()
{
pthread_t pit;
int n = pthread_create(&pit, nullptr, thread_rum, (void*)"new thread");
void *ret = nullptr;
pthread_join(pit, &ret);
cout << "main thread join success, *ret: " << (long long)ret << endl;
while(1)
{
cout << "main thread " << pthread_self() << endl;
sleep(1);
}
return 0;
}
运行结果:
此时会发现:待线程出现野指针问题时,左边会显示段错误,而右边监控脚本中的线程直接就没了。此时就说明当线程异常了,那么整个进程整体异常退出,线程异常 == 进程异常。所以线程会影响其它线程的运行 —— 线程的健壮性(鲁棒性)较低。
(5)如何理解第二个参数retval?
参数retval是线程退出时的退出码,这是一个二级指针,一个输出型参数。刚刚我们的代码中,以及涉及到了线程退出的方式(从线程函数return)。退出的类型是void*,这里我们把先前退出返回的nullptr改为(void*)10。
此线程退出后,我们是通过pthread_join函数获得此线程的退出结果,退出结果是void*类型,可retval是void**类型,我们需要传入一个二级指针。下面演示获得此线程的退出结果的过程,并打印此退出码,代码如下:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;
void* thread_rum(void* args)
{
const char* name = static_cast<const char*>(args);
int cnt = 5;
while (true)
{
printf("%s 正在运行, thread id: 0x%x\n", name, pthread_self());
sleep(1);
if (!(cnt--))
{
break;
}
}
cout << "线程退出啦...." << endl;
return (void*)10;
}
int main()
{
pthread_t pit;
int n = pthread_create(&pit, nullptr, thread_rum, (void*)"new thread");
void *ret = nullptr;
pthread_join(pit, &ret);
cout << "main thread join success, *ret: " << (long long)ret << endl;
while(1)
{
cout << "main thread " << pthread_self() << endl;
sleep(1);
}
return 0;
}
运行结果:
这里我们就得到了新线程退出时的退出码 10。综上ptherad_join的第二个参数retval的作用就是一个输出型参数,获取新线程退出时的退出码。我们先前讲过进程退出时,分为三种情况:
在线程退出时,代码跑完,结果不正确和结果正确都可以得到退出码,但是线程异常时并不会出现退出码。那么为什么异常时主线程没有获取新线程退出时的信号呢?
其实线程终止有3种方法,见上文。
int pthread_detach(pthread_t thread);
可以线程组内其他线程对目标线程进行分离,也可以是线程自己分离。
当一个线程分离后,是不能够被join的。
(1)错误使用示例:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
using namespace std;
void* thread_run(void* args)
{
char* name = static_cast<char*>(args);
int cnt = 5;
while(cnt)
{
cout << name << " : " << cnt -- << endl;
}
return nullptr;
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, thread_run, (void*)"thread_1");
pthread_detach(pid);
//error,线程分离后不能join
int n = pthread_join(pid, nullptr);
if(n != 0)
{
cerr << "error : " << n << " : " << strerror(n) << endl;
}
return 0;
}
(2)这里可能会出现两种情况,一种是线程1先执行完,再提示出main函数里的错误打印。一种是直接错误打印。
原因是,线程之间谁先执行是不确定的。假设线程1先执行,因为代码没有sleep函数,执行完也就一瞬间的事情。线程1执行完后,接下来是主线程执行,主线程这时候才去判断有没有join,所以会导致最后才打印错误信息。
假设主线程先执行,发现线程分离了以后还join了,所以程序之间报错,结束程序。
(3)正确使用示例:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
using namespace std;
void* thread_run(void* args)
{
char* name = static_cast<char*>(args);
int cnt = 5;
while(cnt)
{
cout << name << " : " << cnt -- << endl;
}
return nullptr;
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, thread_run, (void*)"thread_1");
pthread_detach(pid);
usleep(1000);
return 0;
}
这里反复运行,发现会有两种情况,一种是主线程先执行,主线程执行结束直接return,进程直接结束。另外一种是线程1先执行,再到主线程。
这里主要是想说明,线程分离并不影响“主线程结束,导致其他线程被迫退出”的情况。线程分离后,主线程执行结束,各线程会主动释放空间,避免僵尸进程的情况。
(4)让主线程的休眠时间大于新线程即可让新线程先打印
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
using namespace std;
void* thread_run(void* args)
{
char* name = static_cast<char*>(args);
int cnt = 5;
while(cnt)
{
cout << name << " : " << cnt -- << endl;
}
return nullptr;
}
int main()
{
pthread_t pid;
pthread_create(&pid, nullptr, thread_run, (void*)"thread_1");
pthread_detach(pid);
sleep(5);
return 0;
}
(5)运行结果:
(1)写个代码体验线程不安全:
#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <vector>
using namespace std;
int ticket = 3000;
class threadData
{
public:
threadData(int number)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
};
void* getTicket(void* args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while (1)
{
if(ticket > 0)
{
usleep(1000);
printf("who=%s, get a ticket: %d\n", name, ticket);
ticket--;
}
else
{
break;
}
usleep(13);
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
vector<pthread_t> tids;
vector<threadData*> thread_datas;
for(int i = 1; i <= 4; i++)
{
pthread_t tid;
threadData* td = new threadData(i);
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
tids.push_back(tid);
}
for (auto& thread : tids)
{
pthread_join(thread, nullptr);
}
for (auto& td : thread_datas)
{
delete td;
}
return 0;
}
(2)运行结果:
运行以后,发现出现了负数票,这不合理,票抢完就应该停止了,包括我们的代码逻辑都是这样写的,但是此时就出现了这种情况。
其实上面现象的原因是发生了线程不安全问题。
(3)如何产生线程不安全现象:
上面现象是故意弄出来的,涉及到了线程调度,利用了线程调度的特性造出了一个这样的现象。要想出现上面的现象,就需要:
虽然看起来是多个线程在同时运行,但这是由于CPU运行速度太快导致的,实际上,CPU是一个线程一个线程执行的。现在就是要让CPU频繁调度,不停的切换线程,一个线程还没有执行完就再执行下一个,每个线程都执行一点,这样交叉执行。
当一个线程进行延时的时候,CPU并不会等它,而是会将它放在等待队列里,然后去执行另一个线程,等延时线程醒来以后才会接着执行。
线程检测是否切换是以内核态的身份去检测的,执行的是3~4G内核空间中的代码,本质上是操作系统在检测。
(4)产生线程不安全现象的原因:
①假设tickets已经只剩一张了,即全局变量tickets = 1。
主线程创建好4个新线程以后,4个新线程便开始执行了,在执行到延时的时候,新线程就会被放在等待队列里。
②CPU及内核if判断的本质逻辑:
③当线程user1符合条件继续向下执行延时代码时,CPU将线程user1切走了,换上了user2。
user2被调度时仍然重复user1的过程,执行延时被切走,再换上user3,以此类推,直到user4被切走。
④user1唤醒以后接着被切走的位置继续执行:
执行tickets - - 的本质:
虽然C/C++代码只有一条语句,但是汇编后至少有3条语句。
user1执行tickets- - 以后,抢票成功了,并且将抢票后的tickets=0写回到了内存中。
⑤此时user2醒来了,同样接着它被切走的位置继续执行,此时user2回来后认为tickets=1,所以就向下执行了:
当执行tickets减减时,仍然需要三步:
当user2执行完后,user3和user4醒来同样继续向下执行,重复上面的过程,仍然对tickets减一,所以导致结果不合理。
(1)只存在两个线程,对全局变量tickets仅作减减操作:
线程A先被CPU调度,进行减减操作。
(2)线程A切走的同时,它的上下文,也就是tickets=999也被切走了。
线程B此时被调度,线程A在等待队列。
(3)线程B被切走以后,线程A又接着被调度。
线程A接着被切走的位置开始执行,也就是执行减减的第三步操作—写回。
线程B辛辛苦苦将tickets从1000减到了200,线程A重新被调度后,直接将tickets又从200写回到了999。
上面这种现象被叫做数据不一致问题。
导致数据不一致问题的原因:共享资源没有被保护,多线程对该资源进行了交叉访问。
而解决数据不一致问题的办法就是对共享资源加锁。
初始化互斥量有两种方法:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);
参数:
返回值:初始化成功返回0,失败返回错误码。
函数功能:将创建的锁初始化。
销毁互斥量需要注意:
销毁互斥量接口:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
参数:
返回值:销毁成功返回0,失败返回错误码。
函数功能:当锁使用完后,必须进行销毁。
int pthread_mutex_lock(pthread_mutex_t *mutex);
参数:
返回值:加锁成功返回0,失败返回错误码。
函数功能:给临界区加锁,让多线程串行访问临界资源。
调用 pthread_mutex_lock 时,可能会遇到以下情况:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数:
返回值:解锁成功返回0,失败返回错误码。
函数功能:解锁,让多线程恢复并发执行。
锁其实起一个区间划分的作用,在加锁和解锁之间的代码就是临界区,多个执行流只能串行执行临界区代码,从而保护公共资源,使之成为临界资源。
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(lock);
//临界区
//...
pthread_mutex_unlock(lock);
加锁和解锁两句代码圈定了临界区的范围。
(1)现在将上面的代码加上锁,看看是否还会出现多线程数据不一致问题:
在主线程中创建一个互斥锁,并且初始化,在所有新线程等待成功后将锁释放。
(2)但是此时的锁是存在于主线程的栈结构中,需要让所有新线程看到这把锁。
(3)在线程数据结构体中再增加一个锁指针,此时所有线程就都能看到这把锁了。
在新线程中对临界区加锁和解锁,让所有线程串行执行临界区中代码。
最后运行结果:
此时抢票的结果是正常了,最终抢到1结束,符合我们的预期。
(4)抢票的速度比以前慢了好多。
(5)我们可以发现只有3号进程在抢票,其他线程没有抢。
只有3号进程在执行,说明3号进程的竞争能力强,别的线程抢不过它。因为现在的抢票逻辑是抢到票以后立马释放然后就又立马申请锁了,所以之前持有锁的线程更加容易再次申请到锁。
(6)实际上,抢票成功后不可能立刻再去抢,还需要做一些工作,比如给用户打印订单等等。
在抢票成功后延时13微秒,代表线程做的后续工作。
运行结果:
此时就成了多个线程在一起抢票。
加锁后的代码结构上如上图所示。
我们应该如何看待锁?
既然是共享资源,锁也必须是安全的,那么是谁来保证锁的安全性呢?
(1)一个线程,如果申请成功锁,那么它就会继续向下执行,如果暂时申请不成功呢?如下代码:
运行结果:
查看后台线程和进程:
此时代码就被阻塞住了,线程和进程都是存在的。
当一个线程申请锁暂时失败以后,就会阻塞不动。
当多个线程在执行上面这部分代码。
操作系统内部并不存在锁的概念,所以调度器在调度轻量级进程的时候并不会考虑是否有锁。
所以站在其他线程的角度,锁只有两种状态:
站在其他线程的角度,看到当前持有锁的过程就是原子的。
(2)加锁解锁的原理:
经过上面的例子,我们认识到一个事实,c/c++中加加和减减的操作并不是原子的,所以会导致多线程数据不一致的问题。
而为了能让加锁过程是原子的,在大多数体系结构了,都提供了swap或者xchange汇编指令,通过一条汇编指令来保证加锁的原子性。
加锁解锁的代码:
lock:
movb %al, $0
xchange %al, mutex
if(al寄存器的内容 > 0)
{
return 0;
}
else
{
挂起等待;
}
goto lock;
unlock:
movb mutex, $1
唤醒等待mutex的线程;
return 0;
加锁过程中,xchange是原子的,可以保证锁的安全。
(3)如上代码解析图:
①假设现在有两个线程,ThreadA和ThreadB:
线程A在执行,线程B在等待,线程A加锁时的第一步就是将0写入到al寄存器中。
② 线程A在执行下步的时候,直接将内存中mutex中的数据交换到了al寄存器中。
在执行完交换的时候,线程A同样可以被切走,而且是带着上下文走的,也就是会将al中的mutex带走。
交换的本质就是将锁交换到线程A的上下文中。
③现在线程A被切走了,而且带走了它的上下文mutex:
线程B在执行的时候,先第一步给寄存器al写0,然后执行第二步交换锁和al中的值。
④此时操作系统就会又将线程A调回来继续执行:
线程A做的第一件事情就是恢复上下文,将锁恢复到al寄存器中。
经过上面过程的描述,我们可以发现,锁只能被一个线程持有,而且由于xchange汇编只有一条指令,即使申请锁的过程被切走也不怕。
一旦一个线程通过xchage拿到了锁,即使它被切走,也是抱着锁走的,其他线程是无法拿到锁的,只有等它将锁释放。
只有持有锁的线程才能执行下去,锁相当于一张入场卷。
这样来看,释放锁的过程其实对原子性的要求并没有那么高,因为释放锁的线程必定是持有锁的线程,不持有锁的线程都不会执行到这里,都在阻塞等待。
线程A在解锁时,仅是将内存中存放锁的变量写为1,此时其他线程在xchange以后就可以通过if条件判断,申请锁了。
虽然解锁对原子性的要求不是很必要,但是在设计上还是要设计成原子的,可以看到,解锁也是只通过一条汇编就搞定了。
注意:上图中锁中的变量1仅仅是表示锁存在,并不是真正意义上的数字1。
为了更好的使用C++,像封装线程那样,将加锁也封装成一个小组件,方便我们后面使用。
mutex.hpp:
#include <iostream>
#include <pthread.h>
using namespace std;
//pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
template<class T>
class Mutex
{
public:
Mutex(pthread_mutex_t* lock = nullptr)
:_lock(lock)
{}
void lock()
{
pthread_mutex_lock(_lock);
}
void unlock()
{
pthread_mutex_unlock(_lock);
}
private:
pthread_mutex_t* _lock;
};
template<class T>
class LockGuard
{
public:
LockGuard(pthread_mutex_t* mutex)
:_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex<T> _mutex;
};
只需要创建一个LockGurd对象就可以进行加锁,需要传入锁的地址,在LockGuard对象生命周期结束的时候,会自动释放锁。
创建一个全局的锁,就不用使用pthread_mutex_init取初始化,也不用使用pthread_mutex_destroy来销毁锁了,直接使用就行。
在临界区加锁,执行完临界区代码后解锁。
运行结果:
使用封装的加锁小组将,抢票的结果和我们之前直接用系统调用加锁是一样的。
上面这种加锁的风格被称为RAII加锁。
test.cpp代码:
#include "mutex.hpp"
int ticket = 1000;
class threadData
{
public:
threadData(int number, pthread_mutex_t* lock)
:_lock(lock)
{
threadname = "thread-" + to_string(number);
}
public:
string threadname;
pthread_mutex_t* _lock;
};
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void* getTicket(void* args)
{
threadData *td = static_cast<threadData *>(args);
const char *name = td->threadname.c_str();
while (1)
{
LockGuard<int> getlock(&lock);
//pthread_mutex_lock(&lock);
if(ticket > 0)
{
usleep(1000);
printf("who=%s, get a ticket: %d\n", name, ticket);
ticket--;
//pthread_mutex_unlock(&lock);
}
else
{
//pthread_mutex_unlock(&lock);
break;
}
usleep(13);
}
printf("%s ... quit\n", name);
return nullptr;
}
int main()
{
pthread_mutex_t lock; //创建锁
pthread_mutex_init(&lock, nullptr); //初始化锁
vector<pthread_t> tids;
vector<threadData*> thread_datas;
for(int i = 1; i <= 4; i++)
{
pthread_t tid;
threadData* td = new threadData(i, &lock); //将锁传递给线程
thread_datas.push_back(td);
pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
tids.push_back(tid);
}
for (auto& thread : tids)
{
pthread_join(thread, nullptr);
}
for (auto& td : thread_datas)
{
delete td;
}
//pthread_mutex_destroy(&lock); //销毁锁
return 0;
}
(1)重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。
之前在信号部分就提到过重入,进程在执行一个函数,收到某个信号在处理信号时又调用了这个函数。今天在多线程这里,理解重入更加容易,我们上面写的多线程代码都是重入的。
(2)可重入和不可重入的区别:一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
(3)常见可重入情况:
(4)常见不可重入情况:
总的来说,一个函数中如果使用了全局数据,或者静态数据,以及堆区上的数据,就是不可重入的,反之就是可重入的。
多个线程并发同一段代码时,不会出现不同的结果(数据不一致)。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。
互斥锁就是让不安全的线程变安全,也就是前面我们所学习的内容。
(1)常见线程安全情况:
多线程共同执行的代码段中,如果有全局变量或者静态变量并且没有保护,那么就是线程不安全的。
(2)常见线程不安全情况:
(3)可重入与线程安全的联系:
多线程是通过调用函数来实现的,所以线程安全和重入就存在一些联系:
(4)可重入与线程安全的区别:
可重入和线程安全是不同的两个东西,但是又存在一定的交集。
可重入函数是线程安全函数的一种,因为不存在全局或者静态变量。
线程安全不一定是可重入的,而可重入函数则一定是线程安全的。因为线程安全的情况可能是对全局变量进行了保护(加了锁)。
由于线程可以加锁,所以说线程安全的情况比可重入要多。
我们前面例子中写的都是只有一把锁的情况,在实际使用中有可能会存在多把锁,此时就可能造成死锁。
通俗来说就是一个线程自己持有锁,并且不会释放,但是还要申请其他线程的锁,此时就容易造成死锁。
一把锁也是会死锁的,连续申请俩次就是死锁。
在上面演示一个线程暂时申请锁失败而阻塞时,就是死锁。
死锁的逻辑链条:
可以看到,往往解决一个问题就会引出新的问题,然后再区解决新的问题。
(1)死锁的四个必要条件:
这一点不用说,只要用到锁就会互斥。
请求就是指一个执行流申请其他锁,保持是指不释放自己已经持有的锁。
一个执行流已经持有锁,在不主动释放前不能强行剥夺。
线程A,B,C都持有一把锁,并且不释放。
此时就构成了环路阻塞等待。
只有符合上面四个条件就会造成死锁。而要破坏死锁只要破坏其中一个条件即可。
(2)避免死锁:
四个必要条件中的第一个无法破坏,因为我们使用的就是锁,锁就具有互斥的性质。只能破坏其他三个条件。
这是为了破坏请求与保持条件。当一个执行流在申请另一个锁的时候,要先释放已经持有的锁再申请。
这是为了避免形参环路等待,只要不构成环路即可。
临界资源尽量一次性分配好,不要分布在太多的地方加锁,这样的话导致死锁的概率就会增加。
(3)避免死锁的算法:
上面两种算法了解即可。
采用算法来避免死锁时,就会有一个执行流专门用来监测其他执行流的状态,一旦发现某个执行流长时间没有执行,就释放它所持有的锁。
从解锁的伪代码只能可以看出,解锁是可以由其他线程来完成的,只需要将锁重新赋值到锁的共享资源变量中即可。
总结:互斥锁在实际中能不用就不用,实在没有办法的时候也要尽量少用。
首先抛出一个问题:线程互斥,它是对的,但是它在任何场景合理都吗??
答:不一定合理。
举个例子:
同步概念:
我们已经直到同步是什么了,那么如何实现同步与互斥呢?
答:条件变量。
条件变量概念:
下面代码后面详细讲解:
pthread_mutex_lock()
if (YES/NO)
{
pthread_cond_wait()
}
// ....做其他事情
pthread_cond_signal() // 或者唤醒其他线程, 也可以在主线程判断唤醒
pthread_mutex_unlock(); // 解锁
(1)初始化条件变量有二种方法:
第一种方法:静态分配
#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
代码解析:
第二个方法:动态分配
#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrictattr);
参数:
返回值:初始化成功返回0,失败返回一个错误码errno。
注意:动态分配需要释放条件变量。
(2)销毁条件变量:
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond)
参数:
返回值:初始化成功返回0,失败返回一个错误码errno。
销毁条件变量需要注意:
#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
参数:
返回值:成功完成后,返回零值;否则,返回错误编号(errno)以指示错误。
#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
参数:
返回值:如果成功,pthread_cond_broadcast()和pthread-cond_signal()函数返回零;否则应返回一个错误编号(errno)以指示错误。
使用同步与互斥实现多线程间轮询运行,在主线程唤醒被条件变量阻塞的线程。
条件与条件变量:
结论:
按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false)
{
pthread_mutex_unlock(&mutex);
// 解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过 -- 发生线程切换
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
结论:
条件变量使用规范:
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
//修改条件
pthread_mutex_unlock(&mutex);
pthread_mutex_lock(&mutex);
//设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
(1)BlockQueue.hpp:
#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
using namespace std;
#define NUM 8
class blockqueue
{
public:
blockqueue(int size = NUM) //构造函数
:_size(size)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
void pushdata(const int& data) //生产
{
lockqueue(); //加锁
while(IsFull())
{
//数据已满
NotifyConsumer(); //唤醒消费者
cout << "queue full, notify consume data, product stop." << endl;
producterwait(); //生产者等待
}
_q.push(data);
NotifyConsumer();
unlockqueue(); //解锁
}
void popdata(int& data) //消费
{
lockqueue(); //加锁
while(IsEmpty())
{
//空列表
NotifyProducter();
cout << "queue empty, notify product data, consume stop." << endl;
consumerWait();
}
data = _q.front();
_q.pop();
NotifyProducter();
unlockqueue(); //解锁
}
~blockqueue() //析构函数
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
void lockqueue() //加锁
{
pthread_mutex_lock(&_lock);
}
void unlockqueue() //解锁
{
pthread_mutex_unlock(&_lock);
}
void producterwait() //生产者线程等待
{
pthread_cond_wait(&_full, &_lock);
}
void consumerWait() //消费者线程等待
{
pthread_cond_wait(&_empty, &_lock);
}
void NotifyProducter() //唤醒阻塞等待的生产者
{
pthread_cond_signal(&_full);
}
void NotifyConsumer() //唤醒阻塞等待的消费者
{
pthread_cond_signal(&_empty);
}
bool IsEmpty() //判断队列是否是空
{
return (_q.size() == 0 ? true : false);
}
bool IsFull() //判断队列是否满
{
return (_q.size() == _size ? true : false);
}
queue<int> _q;
int _size;
pthread_mutex_t _lock;
pthread_cond_t _full;
pthread_cond_t _empty;
};
(2)test.cpp:
#include "BlockQueue.hpp"
void* consumer(void* args)
{
blockqueue* bqc = static_cast<blockqueue*>(args);
int data;
while(1)
{
bqc->popdata(data);
cout << "Consume data done : " << data << endl;
//usleep(10000);
sleep(1);
}
}
void* producter(void* args)
{
blockqueue* bqp = static_cast<blockqueue*>(args);
srand((unsigned long)time(NULL));
while (1)
{
int data = rand() % 1024;
bqp->pushdata(data);
cout << "Prodoct data done: " << data << endl;
//usleep(10000);
sleep(1);
}
}
int main()
{
blockqueue bq;
pthread_t con;
pthread_t pro;
pthread_create(&con, nullptr, consumer, (void*)&bq);
pthread_create(&pro, nullptr, producter, (void*)&bq);
pthread_join(con, NULL); //线程等待消费者
pthread_join(pro, NULL); //线程等待生产者
return 0;
}
(3)运行结果:
POSIX和System V都是可移植的操作系统接口标准,它们都定义了操作系统为应用程序提供的接口标准。
信号量(信号灯)本质是一个是用来对临界资源进行更细粒度地描述和管理的计数器。
POSIX信号量主要用于实现线程间的同步。
(1)信号量的结构如下:
(2)结构体成员:
(3)信号量的PV操作:
申请不到信号量的线程被阻塞挂起
当count为0时,表示不允许其它线程再访问临界资源,这时其它申请信号量的线程会被阻塞到该信号量的等待队列中,直到其它线程释放信号量。
(1)初始化信号量:
#include <semaphore.h> //头文件
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
(2)销毁信号量:
#include <semaphore.h> //头文件
int sem_destroy(sem_t *sem);
参数:
(3)等待信号量:
#include <semaphore.h> //头文件
int sem_wait(sem_t *sem); //P()
功能:等待信号量,会将信号量的值减1
参数:
#include <semaphore.h> //头文件
int sem_post(sem_t *sem); //V()
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
如上生产者-消费者的例子是基于queue的,其空间可以动态分配,如下基于固定大小的环形队列重写这个程序(POSIX信号量)
环形队列的模型图如下,生产者和消费者共用一个容器,一开始两者从同一个地方出发,生产者先放数据,消费者跟在后面拿数据。为了保证这个行为可以持续,我们需要遵守以下几个规则。
(1)消费者不能超过生产者
消费者取数据的前提就是,所在位置中有数据。如果生产者只生产了三个数据,但是消费者却已经走了四个位置,这很显然是不合理的,当所在位置无数据时,消费者应该停下来。
所以我们采取的措施是设置空白位置的信号量blank_sem、数据个数的信号量data_sem。
信号量的本质就是一个计数器,生产者要添加数据,就需要空位置,所以需要一个记录有多少空白位置的计数器;消费者要减少数据,就需要有数据,所以需要一个有多少数据可供自己消费的计数器。可消费的数据个数为0时,data_sem就会阻止消费者继续消费数据,同时将消费者线程挂起!
(2)生产者 - 消费者通过下标添加或者消费数据
容器的大小是有限的,假设容量大小capacity = 10,因为这里是一个环形队列,采用push_back尾插的方式会造成越界!
所以这里引入下标 p_index、c_index,分别表示生产者生产到哪个位置了、消费者消费到哪个位置了。
每当生产者添加一个数据,p_index++,然后 p_index %= capacity。
每当消费者消费一个数据,c_index++,然后 c_index %= capacity。
ps:取模的目的是将下标控制在 0~capacity-1之间
(3)如下基本规则
(1)成员变量说明:
(1)RingQueue.hpp:
#pragma once
#include <iostream>
#include <semaphore.h>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
using namespace std;
#define N 11
#define CUS 3
#define PRO 2
template<class T>
class RingQueue
{
public:
RingQueue(int capacity = N) //构造函数
:_v(capacity)
,_capacity(capacity)
,p_index(0)
,c_index(0)
{
sem_init(&_proSem, 0, N);
sem_init(&_cusSem, 0, 0);
}
void push(const T& data) // 生产者生产数据
{
P(_proSem);
lock(p_lock);
_v[p_index] = data;
p_index++;
p_index %= _capacity;
unlock(p_lock);
V(_cusSem);
}
T pop() // 消费者消费数据
{
P(_cusSem);
lock(c_lock);
T tmp = _v[c_index];
c_index++;
c_index %= _capacity;
unlock(c_lock);
V(_proSem);
return tmp;
}
~RingQueue() //析构函数
{
sem_destroy(&_cusSem);
sem_destroy(&_proSem);
}
private:
void lock(pthread_mutex_t& mutex) //加锁
{
pthread_mutex_lock(&mutex);
}
void unlock(pthread_mutex_t& mutex) //解锁
{
pthread_mutex_unlock(&mutex);
}
void P(sem_t& s) // 申请信号量
{
sem_wait(&s);
}
void V(sem_t& s) // 释放信号量
{
sem_post(&s);
}
vector<T> _v; // 循环队列
sem_t _proSem; // 记录队列中空格数量的信号量
sem_t _cusSem; // 记录队列中数据数量的信号量
int p_index; // 记录当前空格所在下标
int c_index; // 记录当前数据所在下标
int _capacity; // 记录环形队列容量
pthread_mutex_t c_lock; // 消费者加锁
pthread_mutex_t p_lock; // 生产者加锁
};
成员函数说明:
(2)单生产单消费:
#include "RingQueue.hpp"
void* Customer(void* argc)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(argc);
while(1)
{
sleep(1);
int data = 0;
data = rq->pop();
cout << "消费者消费了一个数据: " << data << endl;
}
}
void* Producer(void* argc)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(argc);
while(1)
{
int data = rand() % N + 1;
rq->push(data);
cout << "生者生产了一个数据: " << data << endl;
}
}
int main()
{
srand(time(nullptr)); // 1、制造随机数种子,作为生产者push到环形队列当中的数据
RingQueue<int>* rq = new RingQueue<int>(); // 2、new一个环形队列
// 3、分别创建、等待一个生产者和一个消费者
pthread_t tid1;
pthread_t tid2;
pthread_create(&tid1, nullptr, Customer, (void*)rq);
pthread_create(&tid2, nullptr, Producer, (void*)rq);
pthread_join(tid1, nullptr);
pthread_join(tid2, nullptr);
// 4、最后delete环形队列
delete rq;
return 0;
}
(3)运行结果:
(4)多生产多消费:
因为循环队列的生产者和消费者的p_index和c_index只有一个,这样在多生产多消费的情况下就会产生互斥,所以需要加锁进行保护。
#include "RingQueue.hpp"
#include "Task.hpp"
struct ThreadData
{
RingQueue<Task> *rq;
std::string threadname;
};
void* Customer(void* argc)
{
ThreadData* td = static_cast<ThreadData*>(argc);
RingQueue<Task>* rq = td->rq;
string name = td->threadname;
while(1)
{
sleep(1);
Task t = rq->pop();
t();
cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
}
}
void* Producer(void* argc)
{
ThreadData* td = static_cast<ThreadData*>(argc);
RingQueue<Task>* rq = td->rq;
string name = td->threadname;
int len = opers.size();
while(1)
{
int data1 = rand() % 10 + 1;
usleep(10);
int data2 = rand() % 10;
char op = opers[rand() % len];
Task t(data1, data2, op);
rq->push(t);
cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
}
}
int main()
{
srand(time(nullptr)); // 1、制造随机数种子,作为生产者push到环形队列当中的数据
RingQueue<Task>* rq = new RingQueue<Task>; // 2、new一个环形队列
// 创建、等待多个生产者和多个消费者
pthread_t custid[CUS];
pthread_t protid[PRO];
for(int i = 0; i < CUS; i++)
{
ThreadData* td = new ThreadData();
td->rq = rq;
td->threadname = "Consumer-" + std::to_string(i);
pthread_create(custid + i, nullptr, Customer, (void*)td);
}
for(int i = 0; i < PRO; i++)
{
ThreadData* td = new ThreadData();
td->rq = rq;
td->threadname = "Producer-" + std::to_string(i);
pthread_create(protid + i, nullptr, Producer, (void*)td);
}
for(int i = 0; i < CUS; i++)
{
pthread_join(custid[i], nullptr);
}
for(int i = 0; i < PRO; i++)
{
pthread_join(protid[i], nullptr);
}
// 4、最后delete环形队列
delete rq;
return 0;
}
(5)Task.hpp:
#pragma once
#include <iostream>
#include <string>
std::string opers="+-*/%";
enum{
DivZero=1,
ModZero,
Unknown
};
class Task
{
public:
Task()
{}
Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
{
}
void run()
{
switch (oper_)
{
case '+':
result_ = data1_ + data2_;
break;
case '-':
result_ = data1_ - data2_;
break;
case '*':
result_ = data1_ * data2_;
break;
case '/':
{
if(data2_ == 0) exitcode_ = DivZero;
else result_ = data1_ / data2_;
}
break;
case '%':
{
if(data2_ == 0) exitcode_ = ModZero;
else result_ = data1_ % data2_;
} break;
default:
exitcode_ = Unknown;
break;
}
}
void operator ()()
{
run();
}
std::string GetResult()
{
std::string r = std::to_string(data1_);
r += " ";
r += oper_;
r += " ";
r += std::to_string(data2_);
r += " = ";
r += std::to_string(result_);
r += "[code: ";
r += std::to_string(exitcode_);
r += "]";
return r;
}
std::string GetTask()
{
std::string r = std::to_string(data1_);
r += " ";
r += oper_;
r += " ";
r += std::to_string(data2_);
r += " = ?";
return r;
}
~Task()
{
}
private:
int data1_;
int data2_;
char oper_;
int result_;
int exitcode_;
};
(6)运行结果:
线程池(thread pool):一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。
下面情况下所有工作线程处于空闲的等待状态,任务缓冲队列为空。
基于10.2.1情况,所有的工作线程已处于等待状态,主线程开始添加三个任务,添加后通知唤醒线程池中的线程开始取任务执行。此时的任务缓冲队列还是空。
基于10.2.2 情况,所有工作线程都在工作中,主线程开始添加第四个任务,添加后发现现在线程池中线程用完了,于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行。
此情况发生情形3且设置了任务缓冲队列大小后面,主程序添加第N个任务,添加后发现线程池中线程已经用完了,任务缓冲队列已满,于是进入等待状态,等待任务缓冲队列中任务腾空通知。但是这种情形会阻塞主线程,本文不限制任务队列的大小,必要时再优化。
线程池的主要组成由三个部分构成:
(1)ThreadPool.hpp:
等待通知机制通过条件变量来实现。
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;
#define N 5
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
template<class T>
class ThreadPool
{
public:
void lock() //加锁
{
pthread_mutex_lock(&_mutex);
}
void unlock() //解锁
{
pthread_mutex_unlock(&_mutex);
}
void Wakeup() //唤醒线程
{
pthread_cond_signal(&_cond);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsQueueEmpty()
{
return _qt.empty();
}
string GetThreadName(pthread_t tid)
{
for(const auto& ti : _v)
{
if(ti.tid == tid)
{
return ti.name;
}
}
return "None";
}
ThreadPool(int num = N) //构造函数
:_v(N)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
static void* HandlerTask(void* args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
string name = tp->GetThreadName(pthread_self());
while(1)
{
tp->lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->pop();
tp->unlock();
t();
cout << name << " run, "<< "result: " << t.GetResult() << endl;
}
}
void Start() //创建线程池
{
int num = _v.size();
for(int i = 0; i < N; i++)
{
_v[i].name = "thread-" + to_string(i + 1);
pthread_create(&(_v[i].tid), nullptr, HandlerTask, this);
}
}
T pop() //从队列当中取出数据
{
T tmp = _qt.front();
_qt.pop();
return tmp;
}
void Push(const T& t) //往队列当中插入数据
{
lock();
_qt.push(t);
Wakeup();
unlock();
}
~ThreadPool() //析构函数
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
vector<ThreadInfo> _v; //线程池
queue<T> _qt; //任务队列
pthread_mutex_t _mutex; //锁
pthread_cond_t _cond; //条件变量
};
(2)test.cpp:
线程池最重要的方法是负责向队列添加任务,由主函数向队列添加任务。
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
ThreadPool<Task>* tp = new ThreadPool<Task>(5);
tp->Start();
srand(time(nullptr));
while(1)
{
//1. 构建任务
int x = rand() % 11 + 1;
usleep(10);
int y = rand() % 5 + 1;
char op = opers[rand() % opers.size()];
Task t(x, y, op);
tp->Push(t);
//2. 交给线程池处理
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
return 0;
}
(3)运行结果:
#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <unistd.h>
using namespace std;
typedef void (*callback_t)();
static int num = 1;
class Thread
{
public:
Thread(callback_t cb)
:_cb(cb)
,_start_timestamp(0)
,_isrunning(false)
,_name("")
{}
static void* routine(void* args)
{
Thread* t = static_cast<Thread*>(args);
t->Entery();
return nullptr;
}
void run()
{
_isrunning = true;
_name = "thread-" + to_string(num++);
_start_timestamp = time(nullptr);
pthread_create(&_tid, nullptr, routine, this);
}
void join()
{
pthread_join(_tid, nullptr);
_isrunning = false;
}
string name()
{
return _name;
}
uint64_t StartTimestamp()
{
return _start_timestamp;
}
bool IsRunning()
{
return _isrunning;
}
void Entery()
{
_cb();
}
private:
pthread_t _tid; //线程的tid
string _name; //线程名
uint64_t _start_timestamp; //线程创建的时间
bool _isrunning; //线程是否在运行
callback_t _cb; //线程所要执行的函数
};
callback_t 是函数指针,是线程需要执行的函数
某些类, 只应该具有一个对象(实例化),就称之为单例。
例如一个男人只能有一个媳妇。
在很多服务器开发场景中,经常需要让服务器加载很多的数据 (上百G) 到内存中,此时往往要用一个单例的类来管理这些数据。
(1)洗碗的例子:
懒汉方式最核心的思想是 “延时加载”,从而能够优化服务器的启动速度。
(2)饿汉方式实现单例模式:
template <typename T>
class Singleton
{
static T data;
public:
static T* GetInstance()
{
return &data;
}
};
只要通过 Singleton 这个包装类来使用 T 对象,则一个进程中只有一个T对象的实例。
(3)懒汉方式实现单例模式:
template <typename T>
class Singleton
{
static T* inst;
public:
static T* GetInstance()
{
if (inst == NULL)
{
inst = new T();
}
return inst;
}
};
存在一个严重的问题,线程不安全。
第一次调用 GetInstance 的时候,如果两个线程同时调用,可能会创建出两份 T 对象的实例。
但是后续再次调用,就没有问题了。
(1)ThreadPool.hpp:
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;
#define N 5
struct ThreadInfo
{
pthread_t tid;
std::string name;
};
template<class T>
class ThreadPool
{
public:
void lock() //加锁
{
pthread_mutex_lock(&_mutex);
}
void unlock() //解锁
{
pthread_mutex_unlock(&_mutex);
}
void Wakeup() //唤醒线程
{
pthread_cond_signal(&_cond);
}
void ThreadSleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsQueueEmpty()
{
return _qt.empty();
}
string GetThreadName(pthread_t tid)
{
for(const auto& ti : _v)
{
if(ti.tid == tid)
{
return ti.name;
}
}
return "None";
}
static void* HandlerTask(void* args)
{
ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
string name = tp->GetThreadName(pthread_self());
while(1)
{
tp->lock();
while(tp->IsQueueEmpty())
{
tp->ThreadSleep();
}
T t = tp->pop();
tp->unlock();
t();
cout << name << " run, "<< "result: " << t.GetResult() << endl;
}
}
void Start() //创建线程池
{
int num = _v.size();
for(int i = 0; i < N; i++)
{
_v[i].name = "thread-" + to_string(i + 1);
pthread_create(&(_v[i].tid), nullptr, HandlerTask, this);
}
}
T pop() //从队列当中取出数据
{
T tmp = _qt.front();
_qt.pop();
return tmp;
}
void Push(const T& t) //往队列当中插入数据
{
lock();
_qt.push(t);
Wakeup();
unlock();
}
static ThreadPool<T>* GetInstance()
{
if(_tp == nullptr) //因为if只会进去一次,所以后面在加锁会影响效率,所以加锁前在判断一次即可避免
{
pthread_mutex_lock(&_lock);
if (_tp == nullptr)
{
_tp = new ThreadPool<T>();
}
pthread_mutex_unlock(&_lock);
}
return _tp;
}
private:
ThreadPool(int num = N) //构造函数
:_v(N)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
~ThreadPool() //析构函数
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
ThreadPool(const ThreadPool<T>& tp) = delete;
const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete; // a=b=c
vector<ThreadInfo> _v; //线程池
queue<T> _qt; //任务队列
pthread_mutex_t _mutex; //锁
pthread_cond_t _cond; //条件变量
static ThreadPool<T>* _tp;
static pthread_mutex_t _lock;
};
template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;
(2)test.cpp:
#include "Task.hpp"
#include "ThreadPool.hpp"
int main()
{
ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance();
//ThreadPool<Task>* tp = new ThreadPool<Task>(5);
tp->Start();
srand(time(nullptr));
while(1)
{
//1. 构建任务
int x = rand() % 11 + 1;
usleep(10);
int y = rand() % 5 + 1;
char op = opers[rand() % opers.size()];
Task t(x, y, op);
tp->Push(t);
//2. 交给线程池处理
std::cout << "main thread make task: " << t.GetTask() << std::endl;
sleep(1);
}
return 0;
}
将四个函数私有化后,利用静态函数GetInstance来获取唯一对象指针。
(1)STL中的容器是否是线程安全的?
不是:原因是STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)。
因此 STL 默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。
(2)智能指针是否是线程安全的?
对于 unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题。
对于 shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题,但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证 shared_ptr 能够高效,原子的操作引用计数。
在编写多线程的时候,有些公共数据读的概率远远大于修改的几率。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。我们引入了读写锁即自旋锁处理这种多读少写的情况。
什么是自旋锁
它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。
这种锁相对于自旋锁而言,能提高并发性。
在多处理器系统中:
(1)对于读者:它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑CPU数。
(2) 对于写者:写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者。
自旋锁相关的API函数
使用自旋锁,必须包含头文件,并链接库-lpthread
#include <pthread.h>
(1)初始化函数:
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
功能:初始化自旋锁, 当线程使用该函数初始化一个未初始化或者被destroy过的自旋锁。该函数会为自旋锁申请资源并且初始化自旋锁为unlocked状态。
参数 :
pthread_spinlock_t :要初始化自旋锁
pshared取值:
返回值:若成功,返回0;否则,返回错误编号
(2)销毁函数:
int pthread_spin_destroy(pthread_spinlock_t *lock);
功能:用来销毁指定的自旋锁并释放所有相关联的资源(所谓的资源指的是由pthread_spin_init自动申请的资源),如果调用该函数时自旋锁正在被使用或者自旋锁未被初始化则结果是未定义的。
参数 :
返回值:若成功,返回0;否则,返回错误编号
(3)加锁函数:
int pthread_spin_lock(pthread_spinlock_t *lock);
功能:用来获取(锁定)指定的自旋锁. 如果该自旋锁当前没有被其它线程所持有,则调用该函数的线程获得该自旋锁,否则该函数在获得自旋锁之前不会返回。
参数 :
返回值:若成功,返回0;否则,返回错误编号
(4)解锁函数:
int pthread_spin_unlock(pthread_spinlock_t *lock);
功能:用来解锁指定的自旋锁.。
参数 :
返回值:若成功,返回0;否则,返回错误编号
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。如果给这种代码段加锁,会极大地降低我们程序的效率。
那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。
为了解决我们的问题我们先来分析多线程中两个角色的关系:读者,写者。
使用自旋锁,必须包含头文件,并链接库-lpthread
#include <pthread.h>
(1)初始化/销毁函数:
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
功能:初始化/销毁一个读写锁。
参数:
返回值:若成功,返回0;否则,返回错误编号
(2)读者加锁函数:
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
(3)写者加锁函数:
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
(4)解锁函数:
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
功能:无论是读者解锁还是写者解锁都要使用此函数。
返回值:若成功,返回0;否则,返回错误编号
想要使用正确的使用读写锁就还要简单理解一下读写锁的原理,而读写锁就是要维护好上面的读者与写者的关系。
如下代码,来理解读写锁的原理。
(1)读者的逻辑:
(2)写者的逻辑:
(3)简单分析:
注意:写独占,读共享,读锁优先级高。
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/
#include <iostream>
#include <sstream>
#include <string>
#include <vector>
#include <pthread.h>
#include <unistd.h>
using namespace std;
// 线程的属性
struct ThreadAttr
{
pthread_t _tid;
string _name;
};
// 票数
volatile int ticket = 100;
// 读写锁
pthread_rwlock_t rwlock;
void rwattr_init(pthread_rwlockattr_t* pattr, int flag) // 读写锁属性初始化
{
pthread_rwlockattr_init(pattr);
if (flag == 0) // flag为0表示读者优先,其他表示写着优先
{
pthread_rwlockattr_setkind_np(pattr, PTHREAD_RWLOCK_PREFER_READER_NP);
}
else
{
pthread_rwlockattr_setkind_np(pattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
}
}
void rwattr_destroy(pthread_rwlockattr_t* prwlock) // 读写锁属性的销毁
{
pthread_rwlockattr_destroy(prwlock);
}
void rwlock_init(pthread_rwlock_t* prwlock, int flag = 0) // 锁的初始化
{
pthread_rwlockattr_t rwattr;
rwattr_init(&rwattr, flag);
pthread_rwlock_init(prwlock, &rwattr);
rwattr_destroy(&rwattr);
}
const string create_writer_name(size_t i) // 创建name
{
stringstream ssm("thread writer : ", ios::in | ios::out | ios::ate);
ssm << i;
return ssm.str();
}
const string create_reader_name(size_t i)
{
stringstream ssm("thread reader : ", ios::in | ios::out | ios::ate);
ssm << i;
return ssm.str();
}
void* readerRoutine(void* args) // 读者历程
{
string* ps = static_cast<string*>(args);
// 进行查票
while (true)
{
pthread_rwlock_rdlock(&rwlock);
if (ticket != 0)
{
cout << *ps << " ticket number : " << ticket << endl;
}
else
{
cout << *ps << " done!!!!!" << endl;
// 防止死锁
pthread_rwlock_unlock(&rwlock);
break;
}
pthread_rwlock_unlock(&rwlock);
// 休眠0.1ms
usleep(100);
}
}
void* writerRoutine(void* args) // 写者历程
{
string* ps = static_cast<string*>(args);
// 进行改票
while (true)
{
pthread_rwlock_wrlock(&rwlock);
if (ticket != 0)
{
cout << *ps << " ticket number : " << --ticket << endl;
}
else
{
cout << *ps << " done!!!!!" << endl;
// 防止死锁
pthread_rwlock_unlock(&rwlock);
break;
}
pthread_rwlock_unlock(&rwlock);
// 休眠0.1ms
usleep(100);
}
}
void reader_init(vector<ThreadAttr>& readers)
{
int i = 1;
for (auto& e : readers)
{
e._name = create_reader_name(i++);
pthread_create(&e._tid, nullptr, readerRoutine, &e._name);
}
}
void writer_init(vector<ThreadAttr>& writers)
{
int i = 1;
for (auto& e : writers)
{
e._name = create_writer_name(i++);
pthread_create(&e._tid, nullptr, writerRoutine, &e._name);
}
}
void reader_join(const vector<ThreadAttr>& readers)
{
for (auto& e : readers)
{
pthread_join(e._tid, nullptr);
}
}
void writer_join(const vector<ThreadAttr>& writers)
{
for (auto& e : writers)
{
pthread_join(e._tid, nullptr);
}
}
int main()
{
// 初始化锁,并设置读写者优先属性
rwlock_init(&rwlock, 0);
const int reader_count = 30;
const int writer_count = 2;
vector<ThreadAttr> readers(reader_count);
vector<ThreadAttr> writers(writer_count);
reader_init(readers);
writer_init(writers);
reader_join(readers);
writer_join(writers);
return 0;
}