多线程

发布时间:2024年01月20日

Linux系统的多线程

1. Linux线程概念

1.1 什么是线程

  • 一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序列”
  • 一切进程至少都有一个执行线程。
  • 线程在进程内部运行,本质是在进程地址空间内运行。
  • 在Linux系统中,在CPU眼中,看到的PCB都要比传统的进程更加轻量化。
  • 透过进程虚拟地址空间,可以看到进程的大部分资源,将进程资源合理分配给每个执行流,就形成了线程执行流。


前置知识:进程的概念和如下的页表概念

1.2 页表的概念

1.2.1 一级页表的缺点


在32位平台下有232个地址,也就是一张页表就要有232个映射关系。
每张表的内容出了映射关系外,还包含了一些权限相关信息。比如页表分为内核级页表和用户级页表,通过权限信息来区分。

每个表项中存储了一个物理地址和虚拟地址,这里一共要占用8个字节,再考虑权限相关的信息,这里粗略地认为每个表项总共占用了10个字节。

那么总共就需要232 * 10 字节,也就是40GB的大小。
在32位平台下最大的内存也就仅有4GB,说明这种页表映射的方式是不合理的。

1.2.2 二级页表

在Linux中的处理方式是建立一个二级页表。
实现方法:

  1. 虚拟地址前10个比特位在页目录中进行查找,找到相应的页表
  2. 拿10个比特位在对应页表中进行查询,找到物理内存中对应页框的起始地址。
  3. 最后12个比特位作为偏移量从页框对应地址处向后偏移,找到物理内存中的某一个对应字节数据。对应页框的起始地址(20个比特位)+ 虚拟地址的最后12个比特位就能够定位任意一个内存字节地址。


物理地址是以“块”为单位的,这个块的大小就是4KB也就是212,对应了偏移量最大值。

这就是二级页表的结构,页目录就是一个一级页表,而表项就是一个二级页表。

计算页表的总大小:
首先只用了20个比特位来建立映射关系,那么最大也就是220个字节,也就是1MB。在页表中,左边占了10个比特位,而右边占了20个比特位,共30个比特位,这里假设加起来一共占了32个比特位(方便计算),也就是4个字节。那么总大小就是220 * 4 byte = 4MB。

映射过程是由MMU这个硬件完成的,页表是一种软件映射,MMU是一种硬件映射。

MMU是Memory Management Unit的缩写,中文名是内存管理单元,有时称作分页内存管理单元(英语:paged memorymanagementunit,缩写为PMMU)。它是一种负责处理中央处理器(CPU)的内存访问请求的计算机硬件。它的功能包括虚拟地址到物理地址的转换(即虚拟内存管理)、内存保护、中央处理器高速缓存的控制,在较为简单的计算机体系结构中,负责总线的仲裁以及存储体切换

1.3 线程的优缺点

优点:

  • 创建一个新线程的代价要比创建一个新进程小得多
  • 与进程之间的切换相比,线程之间的切换需要操作系统做的工作要少很多
  • 线程占用的资源要比进程少很多
  • 能充分利用多处理器的可并行数量
  • 在等待慢速I/O操作结束的同时,程序可执行其他的计算任务
  • 计算密集型应用,为了能在多处理器系统上运行,将计算分解到多个线程中实现
  • I/O密集型应用,为了提高性能,将I/O操作重叠。线程可以同时等待不同的I/O操作。

缺点:

  1. 性能损失
    • 一个很少被外部事件阻塞的计算密集型线程往往无法与共它线程共享同一个处理器。如果计算密集型线程的数量比可用的处理器多,那么可能会有较大的性能损失,这里的性能损失指的是增加了额外的同步和调度开销,而可用的资源不变。
  2. 健壮性降低
    • 编写多线程需要更全面更深入的考虑,在一个多线程程序里,因时间分配上的细微偏差或者因共享了不该共享的变量而造成不良影响的可能性是很大的,换句话说线程之间是缺乏保护的。
  3. 缺乏访问控制
    • 进程是访问控制的基本粒度,在一个线程中调用某些OS函数会对整个进程造成影响。
  4. 编程难度提高
    • 编写与调试一个多线程程序比单线程程序困难得多

1.4 线程异常

  • 单个线程如果出现除零,野指针问题导致线程崩溃,进程也会随着崩溃
  • 线程是进程的执行分支,线程出异常,就类似进程出异常,进而触发信号机制,终止进程,进程终止,该
    进程内的所有线程也就随即退出

1.5 线程用途

  • 合理的使用多线程,能提高CPU密集型程序的执行效率
  • 合理的使用多线程,能提高IO密集型程序的用户体验(如生活中我们一边写代码一边下载开发工具,就是多线程运行的一种表现)

1.6 Linux进程VS线程

  1. 进程是资源分配的基本单位。
  2. 线程是调度的基本单位。
  3. 线程共享进程数据,但也拥有自己的一部分数据:
    • 线程ID。
    • 一组寄存器。
    • 栈。
    • errno。
    • 信号屏蔽字。
    • 调度优先级

进程的多个线程共享 同一地址空间,因此Text Segment、Data Segment都是共享的,如果定义一个函数,在各线程中都可以调用,如果定义一个全局变量,在各线程中都可以访问到,除此之外,各线程还共享以下进程资源和环境:

  • 文件描述符表。
  • 每种信号的处理方式(SIG_ IGN、SIG_ DFL或者自定义的信号处理函数)。
  • 当前工作目录。
  • 用户id和组id。

进程和线程的关系如下图:

2. Linux线程控制

在Linux系统的的视角下,Linux下没有真正意义的线程,而是用进程模拟的线程(LWP,轻量级进程),所以Linux不会提供直接创建线程的系统调用,最多提供创建轻量级进程的接口。

但是对于用户来说,用户需要的是线程接口。
所以Linux提供了用户线程库,对下将Linux接口封装,对上给用户提供进行线程控制的接口,也就是pthread库(原生线程库)

PROSIX线程库:

  • 与线程有关的函数构成了一个完整的系列,绝大多数函数的名字都是以“pthread_”打头的。
  • 要使用这些函数库,要通过引入头文件<pthread.h>
  • 链接这些线程函数库时要使用编译器命令的“-lpthread”选项

2.1 创建线程

函数原型:

int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

函数功能:创建一个新的线程
参数:

  • thread:线程ID
  • attr:设置线程的属性,attr为NULL表示使用默认属性
  • start_routine:是个函数地址,线程启动后要执行的函数
  • arg:传给线程启动函数的参数

返回值:成功返回0;失败返回错误码

错误检查:

  • 传统的一些函数是,成功返回0,失败返回-1,并且对全局变量errno赋值以指示错误。
  • pthreads函数出错时不会设置全局变量errno(而且大部分其他POSIX函数也会这样做),而是将错误代码通过返回值返回。
  • pthreads同样也提供了线程内的errno变量,以支持其它使用errno的代码。对于pthreads函数的错误,建议通过返回值来判定,因为读取返回值要比读取线程内的errno变量的开销更小。

(1)代码示例:创建一个新线程

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* threadRun(void* args)
{
    while(1)
    {
        cout << "new thread running : " << getpid() << endl;
        sleep(1);
    }
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, threadRun, nullptr);

    while(1)
    {
        cout << "main thread running : " << getpid() << endl;
        sleep(1);
    }

    return 0;
}

(2)运行结果:


由打印结果可以看到,主线程和新线程都打印了相应的字符串。
使用

ps -aL | head -1 && ps -aL | grep test1

命令查看执行的线程,test是C++可执行文件

2.2 线程ID及地址空间布局

  • pthread_ create函数会产生一个线程ID,存放在第一个参数指向的地址中。该线程ID和前面说的线程ID不是一回事。
  • 前面讲的线程ID属于进程调度的范畴。因为线程是轻量级进程,是操作系统调度器的最小单位,所以需要一个数值来唯一表示该线程。
  • pthread_ create函数第一个参数指向一个虚拟内存单元,该内存单元的地址即为新创建线程的线程ID,属于本地线程库的范畴。线程库的后续操作,就是根据该线程ID来操作线程的。
  • 本地线程库提供了pthread_ self函数,可以获得线程自身的ID:
pthread_t pthread_self(void);

(1)代码示例:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* threadRun(void* args)
{
    while(1)
    {
        cout << "new thread running : " << getpid()  << "线程id : " << pthread_self() << endl;
        sleep(1);
    }
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, threadRun, nullptr);

    while(1)
    {
        cout << "main thread running : " << getpid() << endl;
        sleep(1);
    }

    return 0;
}

(2)运行结果:


pthread_t 到底是什么类型呢?取决于实现。对于Linux目前实现的本地线程库实现而言,pthread_t类型的线程ID,本质就是一个进程地址空间上的一个地址。

下图中的mmap区域是共享区:

2.3 线程终止

只终止某个线程而不是终止整个进程,可以有三种方法:

  1. 线程函数处进行return。
  2. 线程可以自己调用pthread_exit函数终止自己。
  3. 一个线程可以调用pthread_cancel函数终止同一进程中的另一个线程。

2.3.1 线程函数处进行return

注意:在线程中使用return代表该线程退出,而在main函数(主线程)中使用return代表整个进程退出。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* threadRun(void* args)
{
    while(1)
    {
        cout << "new thread running : " << getpid()  << "线程id : " << pthread_self() << endl;
        sleep(1);
    }
	
	return nullptr;
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, threadRun, nullptr);

    return 0;
}

该代码并不会打印新线程中的字符串,因为主线程退出整个进程都终止了。

2.3.2 使用pthread_exit函数:

void pthread_exit(void *retval);

函数功能:线程终止
参数

  • retval:线程退出时的退出码信息

返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)

(1)pthread_exit 或者 return 返回的指针所指向的内存单元必须是全局的或者是 malloc 分配的,不能在线程函数的栈上分配,因为当其他线程得到这个返回指针时,线程函数已经退出了。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* threadRun(void* args)
{
    int cnt = 5;
    while(cnt--)
    {
        cout << "new thread running : " << getpid()  << "线程id : " << pthread_self() << endl;
        sleep(1);
    }

    pthread_exit((void*)1);
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, threadRun, nullptr);

    void *ret = nullptr;

    // pthread_join表示线程等待,主线程执行完后还需等待其他线程
    pthread_join(pid, &ret); 
    
    // 这里会把线程退出码信息通过该函数给ret
    cout << "new thread exit code is : " << (int64_t)ret << endl;
    // 这里使用int64_t强制转换是因为平台下Linux的指针是8字节的。

    return 0;
}

(2)运行结果:


在线程等待的情况下,新线程在5秒后结束了并返回了线程退出码。

2.3.3 pthread_cancel函数:

int pthread_cancel(pthread_t thread);

功能:取消一个执行中的线程。
参数

  • thread:线程ID。

返回值:成功返回0;失败返回错误码。

(1)线程是可以取消自己的,甚至新线程也可以取消主线程,取消成功的线程的退出码一般是 -1。

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* threadRun(void* args)
{
    int cnt = 5;
    while(cnt--)
    {
        cout << "new thread running : " << getpid()  << "线程id : " << pthread_self() << endl;
        sleep(1);
    }

    pthread_exit((void*)1);
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, threadRun, nullptr);
    
    sleep(3);

    //取消新线程
    pthread_cancel(pid);

    void *ret = nullptr;

    // pthread_join表示线程等待,主线程执行完后还需等待其他线程
    pthread_join(pid, &ret); 

    // 这里会把线程退出码信息通过该函数给ret
    cout << "new thread exit code is : " << (int64_t)ret << endl;
    // 这里使用int64_t强制转换是因为平台下Linux的指针是8字节的。

    return 0;
}

(2)运行结果:

2.4 线程等待

(1)为什么需要线程等待?

  • 已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。
  • 创建新的线程不会复用刚才退出线程的地址空间。

(2)线程等待函数:

int pthread_join(pthread_t thread, void **retval);

功能:等待线程结束。
参数

  • thread:被等待线程的ID。
  • retval:线程退出时的退出码信息。

返回值:线程等待成功返回0,失败返回错误码。

调用该函数的线程将挂起等待,直到id为thread的线程终止。thread线程以不同的方法终止,通过pthread_join得到的终止状态是不同的,总结如下:

  • 如果thread线程通过return返回,retval 所指向的单元里存放的是thread线程函数的返回值。
  • 如果thread线程被别的线程调用 pthread_cancel 异常终止掉,retval所指向的单元里存放的是常数PTHREAD_CANCELED,就是-1。
  • 如果thread线程是自己调用 pthread_exit 终止的,retval 所指向的单元存放的是传给pthread_exit 的参数。
  • 如果对thread线程的终止状态不感兴趣,可以传 NULL给retval参数。


(3)代码演示让新线程创建5s后退出,随后再过几秒后被thread_join等待,当主进程开始打印消息时,说明新线程join等待完成:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* thread_rum(void* args)
{
    const char* name = static_cast<const char*>(args);
    int cnt = 5;
    while (true)
    {
        printf("%s 正在运行, thread id: 0x%x\n", name, pthread_self());
        sleep(1);

        if (!(cnt--))
        {
            break;
        }
    }

    cout << "线程退出啦...." << endl;

    return nullptr;
}

int main()
{
    pthread_t pit;
    int n = pthread_create(&pit, nullptr, thread_rum, (void*)"new thread");

    sleep(3);
    pthread_join(pit, nullptr);
    cout << "main thread join success" << endl;
    sleep(3);

    while(1)
    {
        cout << "main thread " << pthread_self() << endl;
        sleep(1);
    }

    return 0;
}

运行结果:

可以使用如下脚本来监控线程运行状况:

[xiaomaker@VM-28-13-centos test_12_07]$ while :; do ps -aL | head -1 && ps -aL | grep mythread; sleep 1; done


通过如上代码可以发现当创建线程后,线程1正在运行,5s后新线程退出了,我们的监控脚本观察到线程由两个变成了一个,但是正常情况下预期应该是两个线程,随后线程等待成功,这里还是只能看到一个线程。不是说好退出后应该看到的是两个线程吗,事实上一个线程退出后我们并没有看到预期结果。原因是ps命令在查的时候退出的线程是不给你显示的,所以你只能看到一个线程。但是现在不能证明当前的新线程在退出没有被join的时候就没有内存泄漏。

所以线程退出的时候,一般必须要进行join,如果不进行join,就会造成类似于进程那样的内存泄漏问题。

(4)线程异常的问题:

野指针代码演示:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* thread_rum(void* args)
{
    const char* name = static_cast<const char*>(args);
    int cnt = 5;
    while (true)
    {
        printf("%s 正在运行, thread id: 0x%x\n", name, pthread_self());
        sleep(1);

        if (!(cnt--))
        {
            int* p = nullptr;
            *p = 100;
            break;
        }
    }

    cout << "线程退出啦...." << endl;

    return (void*)10;
}

int main()
{
    pthread_t pit;
    int n = pthread_create(&pit, nullptr, thread_rum, (void*)"new thread");

    void *ret = nullptr;
    pthread_join(pit, &ret);
    cout << "main thread join success, *ret: " << (long long)ret << endl;
    
    while(1)
    {
        cout << "main thread " << pthread_self() << endl;
        sleep(1);
    }

    return 0;
}

运行结果:


此时会发现:待线程出现野指针问题时,左边会显示段错误,而右边监控脚本中的线程直接就没了。此时就说明当线程异常了,那么整个进程整体异常退出,线程异常 == 进程异常。所以线程会影响其它线程的运行 —— 线程的健壮性(鲁棒性)较低。

(5)如何理解第二个参数retval?
参数retval是线程退出时的退出码,这是一个二级指针,一个输出型参数。刚刚我们的代码中,以及涉及到了线程退出的方式(从线程函数return)。退出的类型是void*,这里我们把先前退出返回的nullptr改为(void*)10。

此线程退出后,我们是通过pthread_join函数获得此线程的退出结果,退出结果是void*类型,可retval是void**类型,我们需要传入一个二级指针。下面演示获得此线程的退出结果的过程,并打印此退出码,代码如下:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

void* thread_rum(void* args)
{
    const char* name = static_cast<const char*>(args);
    int cnt = 5;
    while (true)
    {
        printf("%s 正在运行, thread id: 0x%x\n", name, pthread_self());
        sleep(1);

        if (!(cnt--))
        {
            break;
        }
    }

    cout << "线程退出啦...." << endl;

    return (void*)10;
}

int main()
{
    pthread_t pit;
    int n = pthread_create(&pit, nullptr, thread_rum, (void*)"new thread");

    void *ret = nullptr;
    pthread_join(pit, &ret);
    cout << "main thread join success, *ret: " << (long long)ret << endl;
    
    while(1)
    {
        cout << "main thread " << pthread_self() << endl;
        sleep(1);
    }

    return 0;
}

运行结果:


这里我们就得到了新线程退出时的退出码 10。综上ptherad_join的第二个参数retval的作用就是一个输出型参数,获取新线程退出时的退出码。我们先前讲过进程退出时,分为三种情况:

  1. 代码跑完,结果正确
  2. 代码跑完,结果不正确
  3. 异常

在线程退出时,代码跑完,结果不正确和结果正确都可以得到退出码,但是线程异常时并不会出现退出码。那么为什么异常时主线程没有获取新线程退出时的信号呢?

  • 这是因为线程出异常就不再是线程的问题,而是进程的问题,应该让父进程获取退出码,知道它什么原因退出的。因此线程终止时,只需考虑正常终止。

其实线程终止有3种方法,见上文。

2.5 线程分离

  • 默认情况下,新创建的线程是joinable的,线程退出后,需要对其进行pthread_join操作,否则无法释放资源,从而造成系统泄漏。
  • 如果不关心线程的返回值,join是一种负担,这个时候,我们可以告诉系统,当线程退出时,自动释放线程资源。
int pthread_detach(pthread_t thread);

可以线程组内其他线程对目标线程进行分离,也可以是线程自己分离。
当一个线程分离后,是不能够被join的。

(1)错误使用示例:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
using namespace std;

void* thread_run(void* args)
{
    char* name = static_cast<char*>(args);
    int cnt = 5;
    while(cnt)
    {
        cout << name << " : " << cnt -- << endl;
    }

    return nullptr;
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, thread_run, (void*)"thread_1");
    pthread_detach(pid);

    //error,线程分离后不能join
    int n = pthread_join(pid, nullptr);
    if(n != 0)
    {
        cerr << "error : " << n << " : " << strerror(n) << endl; 
    }

    return 0;
}

(2)这里可能会出现两种情况,一种是线程1先执行完,再提示出main函数里的错误打印。一种是直接错误打印。


原因是,线程之间谁先执行是不确定的。假设线程1先执行,因为代码没有sleep函数,执行完也就一瞬间的事情。线程1执行完后,接下来是主线程执行,主线程这时候才去判断有没有join,所以会导致最后才打印错误信息。

假设主线程先执行,发现线程分离了以后还join了,所以程序之间报错,结束程序。

(3)正确使用示例:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
using namespace std;

void* thread_run(void* args)
{
    char* name = static_cast<char*>(args);
    int cnt = 5;
    while(cnt)
    {
        cout << name << " : " << cnt -- << endl;
    }

    return nullptr;
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, thread_run, (void*)"thread_1");
    pthread_detach(pid);

    usleep(1000);
    
    return 0;
}

这里反复运行,发现会有两种情况,一种是主线程先执行,主线程执行结束直接return,进程直接结束。另外一种是线程1先执行,再到主线程。

这里主要是想说明,线程分离并不影响“主线程结束,导致其他线程被迫退出”的情况。线程分离后,主线程执行结束,各线程会主动释放空间,避免僵尸进程的情况。

(4)让主线程的休眠时间大于新线程即可让新线程先打印

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <cstring>
using namespace std;

void* thread_run(void* args)
{
    char* name = static_cast<char*>(args);
    int cnt = 5;
    while(cnt)
    {
        cout << name << " : " << cnt -- << endl;
    }

    return nullptr;
}

int main()
{
    pthread_t pid;
    pthread_create(&pid, nullptr, thread_run, (void*)"thread_1");
    pthread_detach(pid);

    sleep(5);

    return 0;
}

(5)运行结果:

3. Linux线程互斥

3.1 进程线程间的互斥相关背景概念

  • 临界资源:多线程执行流共享的资源就叫做临界资源。
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区。
  • 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用。
  • 原子性(后面讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成。

3.2 互斥量mutex

  • 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来一些问题。

3.3 代码演示:

(1)写个代码体验线程不安全:

#include <iostream>
#include <cstdio>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
#include <vector>

using namespace std;

int ticket = 3000;

class threadData
{
public:

    threadData(int number)
    {
        threadname = "thread-" + to_string(number);
    }

public:

    string threadname;
};

void* getTicket(void* args)
{
    threadData *td = static_cast<threadData *>(args);
    const char *name = td->threadname.c_str();

    while (1)
    {
        if(ticket > 0)
        {
            usleep(1000);
            printf("who=%s, get a ticket: %d\n", name, ticket);
            ticket--;
        }
        else
        {
            break;
        }

        usleep(13);
    }

    printf("%s ... quit\n", name);

    return nullptr;
}

int main()
{
    vector<pthread_t> tids;
    vector<threadData*> thread_datas;

    for(int i = 1; i <= 4; i++)
    {
        pthread_t tid;
        threadData* td = new threadData(i);
        thread_datas.push_back(td);
        pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
        tids.push_back(tid);
    }

    for (auto& thread : tids)
    {
        pthread_join(thread, nullptr);
    }

    for (auto& td : thread_datas)
    {
        delete td;
    }

    return 0;
}
  • 全局变量tickets表示票的数量。
  • 主线程中创建了4个新线程,代表4个用户去抢票。
  • 每个新线程在抢到票后将票数减一,并且打印出票的数量。
  • 当票被抢完以后,线程退出,不再进行抢票。

(2)运行结果:


运行以后,发现出现了负数票,这不合理,票抢完就应该停止了,包括我们的代码逻辑都是这样写的,但是此时就出现了这种情况。
其实上面现象的原因是发生了线程不安全问题。

(3)如何产生线程不安全现象:
上面现象是故意弄出来的,涉及到了线程调度,利用了线程调度的特性造出了一个这样的现象。要想出现上面的现象,就需要:

  • 尽可能让多个线程交叉执行。
  • 多个线程交叉执行的本质:就是让调度器尽可能的频繁发生线程调度与切换。

虽然看起来是多个线程在同时运行,但这是由于CPU运行速度太快导致的,实际上,CPU是一个线程一个线程执行的。现在就是要让CPU频繁调度,不停的切换线程,一个线程还没有执行完就再执行下一个,每个线程都执行一点,这样交叉执行。

当一个线程进行延时的时候,CPU并不会等它,而是会将它放在等待队列里,然后去执行另一个线程,等延时线程醒来以后才会接着执行。

  • 线程在时间片到来,更高优先级线程到来,线程等待的时候会发生线程切换。
  • 线程是在从内核态转换成用户态的时候检测是否达到线程切换的条件的。

线程检测是否切换是以内核态的身份去检测的,执行的是3~4G内核空间中的代码,本质上是操作系统在检测。

(4)产生线程不安全现象的原因:

①假设tickets已经只剩一张了,即全局变量tickets = 1。


主线程创建好4个新线程以后,4个新线程便开始执行了,在执行到延时的时候,新线程就会被放在等待队列里。

②CPU及内核if判断的本质逻辑:

  • 从内存中读取数据到CPU寄存器。
  • 进行判断。
  • 在线程user1执行到if判断时,CPU从内存中将tickets变量中的数据1拿到了CPU的寄存器ebx中。
  • CPU进行判断后,发现符合大于0的条件。

③当线程user1符合条件继续向下执行延时代码时,CPU将线程user1切走了,换上了user2。

  • 在线程user1被切走的时候,它的上下文数据也会被切走。
  • 所以ebx寄存器中的1也会跟着user1的PCB被切走。

user2被调度时仍然重复user1的过程,执行延时被切走,再换上user3,以此类推,直到user4被切走。

  • 四个线程都拿到了tickets=1,所以符合条件,都能向下执行。
  • 当user4被挂起后,就会轮到user1进行调度了。

④user1唤醒以后接着被切走的位置继续执行:

执行tickets - - 的本质:

  • 从内存中读取数据到CPU的寄存器
  • 更改数据
  • 写回数据到内存中

虽然C/C++代码只有一条语句,但是汇编后至少有3条语句。

user1执行tickets- - 以后,抢票成功了,并且将抢票后的tickets=0写回到了内存中。

⑤此时user2醒来了,同样接着它被切走的位置继续执行,此时user2回来后认为tickets=1,所以就向下执行了:

当执行tickets减减时,仍然需要三步:

  • 从内存中读取tickets=0到CPU寄存器ebx中。
  • 修改值,从0变成-1。
  • 将-1写回内存中。

当user2执行完后,user3和user4醒来同样继续向下执行,重复上面的过程,仍然对tickets减一,所以导致结果不合理。

3.4 线程不安全的原因

(1)只存在两个线程,对全局变量tickets仅作减减操作:

线程A先被CPU调度,进行减减操作。

  • 从内存中将tickets=1000取到寄存器ebx中。
  • 进行减减操作,tickets变成了999。
  • 在执行第三步写回数据之前,线程A被切走了。

(2)线程A切走的同时,它的上下文,也就是tickets=999也被切走了。

线程B此时被调度,线程A在等待队列。

  • 线程B先从内存中读取tickets = 1000到寄存器ebx中。
  • 进行减减操作。
  • 将减减后的值写回到内存中。
  • 线程B将减减操作完整的执行了很多遍,直到tickets=200时才被切下去。

(3)线程B被切走以后,线程A又接着被调度。

线程A接着被切走的位置开始执行,也就是执行减减的第三步操作—写回。

  • 线程A被调度后,先恢复上下文,将被切走时的tickets=999恢复到了ebx寄存器中。
  • 然后执行第三步,将tickets=999写回到了内存中。

线程B辛辛苦苦将tickets从1000减到了200,线程A重新被调度后,直接将tickets又从200写回到了999。

上面这种现象被叫做数据不一致问题。
导致数据不一致问题的原因:共享资源没有被保护,多线程对该资源进行了交叉访问。

而解决数据不一致问题的办法就是对共享资源加锁。

4. 加锁

4.1 互斥量的接口

4.1.1 初始化互斥量

初始化互斥量有两种方法:

  • 方法1,静态分配:(全局变量)
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
  • 方法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);

参数

  • mutex:创建的互斥锁指针。
  • attr:锁的属性,一般情况下设为nullptr。

返回值:初始化成功返回0,失败返回错误码。
函数功能:将创建的锁初始化。

4.1.2 销毁互斥量

  1. 销毁互斥量需要注意

    • 使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁。
    • 不要销毁一个已经加锁的互斥量。
    • 已经销毁的互斥量,要确保后面不会有线程再尝试加锁。
  2. 销毁互斥量接口:

int pthread_mutex_destroy(pthread_mutex_t *mutex)

参数

  • mutex:创建的互斥锁指针。

返回值:销毁成功返回0,失败返回错误码。
函数功能:当锁使用完后,必须进行销毁。

4.1.3 互斥量加锁和解锁

  1. 加锁接口:
int pthread_mutex_lock(pthread_mutex_t *mutex);

参数

  • mutex:创建的互斥锁指针。

返回值:加锁成功返回0,失败返回错误码。
函数功能:给临界区加锁,让多线程串行访问临界资源。

调用 pthread_mutex_lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_ lock调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
  1. 解锁接口:
int pthread_mutex_unlock(pthread_mutex_t *mutex);

参数

  • mutex:创建的互斥锁指针。

返回值:解锁成功返回0,失败返回错误码。
函数功能:解锁,让多线程恢复并发执行。

锁其实起一个区间划分的作用,在加锁和解锁之间的代码就是临界区,多个执行流只能串行执行临界区代码,从而保护公共资源,使之成为临界资源。

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_lock(lock);
//临界区
//...
pthread_mutex_unlock(lock);

加锁和解锁两句代码圈定了临界区的范围。

4.2 改进代码

(1)现在将上面的代码加上锁,看看是否还会出现多线程数据不一致问题:


在主线程中创建一个互斥锁,并且初始化,在所有新线程等待成功后将锁释放。

(2)但是此时的锁是存在于主线程的栈结构中,需要让所有新线程看到这把锁。

(3)在线程数据结构体中再增加一个锁指针,此时所有线程就都能看到这把锁了。


在新线程中对临界区加锁和解锁,让所有线程串行执行临界区中代码。

  • 解锁不能放在else的代码块后面,防止break出循环,但是没有解锁。
  • 在else的break前也要有解锁,防止if条件不满足直接跳出循环没有解锁。

最后运行结果:


此时抢票的结果是正常了,最终抢到1结束,符合我们的预期。

(4)抢票的速度比以前慢了好多。

  • 加锁和解锁的过程是多个线程串行执行的,并且临界区的代码也是串行执行的,所以速度就变慢了。

(5)我们可以发现只有3号进程在抢票,其他线程没有抢。

  • 锁只规定了互斥访问,并没有规定必须让谁先执行。
  • 锁是让多个执行流进行竞争的结果。

只有3号进程在执行,说明3号进程的竞争能力强,别的线程抢不过它。因为现在的抢票逻辑是抢到票以后立马释放然后就又立马申请锁了,所以之前持有锁的线程更加容易再次申请到锁。

(6)实际上,抢票成功后不可能立刻再去抢,还需要做一些工作,比如给用户打印订单等等。


在抢票成功后延时13微秒,代表线程做的后续工作。

运行结果:


此时就成了多个线程在一起抢票。

  • 当一个线程从临界区中出来并且释放锁后,执行后续任务时,其他线程才有能力去竞争锁。


加锁后的代码结构上如上图所示。

  • 加锁时,一定要保证临界区的粒度非常小。将那些不是必须放在临界区中的代码放在临界区外。
  • 加锁是程序员行为,要加锁就所有线程都加锁,否则就起不到保护共享资源的效果。

4.3 锁的本质

我们应该如何看待锁?

  • 在上面代码中,一个锁必须让所有线程都看到,所以锁本身就是一个共享资源。

既然是共享资源,锁也必须是安全的,那么是谁来保证锁的安全性呢?

  • 锁是通过加锁和解锁是原子的来保证自身的安全的。

(1)一个线程,如果申请成功锁,那么它就会继续向下执行,如果暂时申请不成功呢?如下代码:


运行结果:

查看后台线程和进程:


此时代码就被阻塞住了,线程和进程都是存在的。

  • 一个锁只能被申请一次,只有锁被释放后才能再次申请。

当一个线程申请锁暂时失败以后,就会阻塞不动。


当多个线程在执行上面这部分代码。

  • 当一个线程申请锁成功,进入临界区访问临界资源,其他线程要想进入临界区只能阻塞等待,等锁释放。
  • 当一个线程申请锁成功,进入临界区访问临界资源,同样是能被切走的,而且该线程是抱着锁走的,其他线程仍然无法申请锁成功。

操作系统内部并不存在锁的概念,所以调度器在调度轻量级进程的时候并不会考虑是否有锁。

所以站在其他线程的角度,锁只有两种状态:

  • 申请锁前
  • 申请锁后

站在其他线程的角度,看到当前持有锁的过程就是原子的。

(2)加锁解锁的原理:
经过上面的例子,我们认识到一个事实,c/c++中加加和减减的操作并不是原子的,所以会导致多线程数据不一致的问题。

而为了能让加锁过程是原子的,在大多数体系结构了,都提供了swap或者xchange汇编指令,通过一条汇编指令来保证加锁的原子性。

加锁解锁的代码:

lock:
	movb %al, $0
	xchange %al, mutex
	if(al寄存器的内容 > 0)
	{
		return 0;
	}
	else
	{
		挂起等待;	
	}
	goto lock;

unlock:
	movb mutex, $1
	唤醒等待mutex的线程;
	return 0;

加锁过程中,xchange是原子的,可以保证锁的安全。

(3)如上代码解析图:

①假设现在有两个线程,ThreadA和ThreadB:

线程A在执行,线程B在等待,线程A加锁时的第一步就是将0写入到al寄存器中。

  • 在执行完第一条汇编后,线程A是可以被切走的,而且在切走的时候会将它的上下文,也就是al中的0带走。
  • 这一步的本质就是将0写入到线程A的上下文中。

② 线程A在执行下步的时候,直接将内存中mutex中的数据交换到了al寄存器中。

在执行完交换的时候,线程A同样可以被切走,而且是带着上下文走的,也就是会将al中的mutex带走。

交换的本质就是将锁交换到线程A的上下文中。

③现在线程A被切走了,而且带走了它的上下文mutex:

线程B在执行的时候,先第一步给寄存器al写0,然后执行第二步交换锁和al中的值。

  • 此时al中的值虽然交换了,但是仍然是0,根据上面的伪代码,if条件不成立,所以将线程B挂起等待了。

④此时操作系统就会又将线程A调回来继续执行:

线程A做的第一件事情就是恢复上下文,将锁恢复到al寄存器中。

  • 线程A执行下一步时,if条件成了,所以该线程就申请锁成功了。

经过上面过程的描述,我们可以发现,锁只能被一个线程持有,而且由于xchange汇编只有一条指令,即使申请锁的过程被切走也不怕。

一旦一个线程通过xchage拿到了锁,即使它被切走,也是抱着锁走的,其他线程是无法拿到锁的,只有等它将锁释放。

只有持有锁的线程才能执行下去,锁相当于一张入场卷。

这样来看,释放锁的过程其实对原子性的要求并没有那么高,因为释放锁的线程必定是持有锁的线程,不持有锁的线程都不会执行到这里,都在阻塞等待。

线程A在解锁时,仅是将内存中存放锁的变量写为1,此时其他线程在xchange以后就可以通过if条件判断,申请锁了。

虽然解锁对原子性的要求不是很必要,但是在设计上还是要设计成原子的,可以看到,解锁也是只通过一条汇编就搞定了。

注意:上图中锁中的变量1仅仅是表示锁存在,并不是真正意义上的数字1。

4.4 对锁进行封装

为了更好的使用C++,像封装线程那样,将加锁也封装成一个小组件,方便我们后面使用。

mutex.hpp:

#include <iostream>
#include <pthread.h>

using namespace std;

//pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

template<class T>
class Mutex
{
public:
    Mutex(pthread_mutex_t* lock = nullptr)
        :_lock(lock)
    {}

    void lock()
    {
        pthread_mutex_lock(_lock);
    }

    void unlock()
    {
        pthread_mutex_unlock(_lock);
    }

private:

    pthread_mutex_t* _lock;
};

template<class T>
class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex)
        :_mutex(mutex)
    {
        _mutex.lock();
    }

    ~LockGuard()
    {
        _mutex.unlock();
    }

private:

    Mutex<T> _mutex;
};

只需要创建一个LockGurd对象就可以进行加锁,需要传入锁的地址,在LockGuard对象生命周期结束的时候,会自动释放锁。

创建一个全局的锁,就不用使用pthread_mutex_init取初始化,也不用使用pthread_mutex_destroy来销毁锁了,直接使用就行。
在临界区加锁,执行完临界区代码后解锁。

  • 将临界区放在一个代码块中,此时LockGuard的生命周期就是这个代码块。
  • 创建LockGuard对象时在构造函数中自动加锁,出作用域时析构函数自动解锁。

运行结果:


使用封装的加锁小组将,抢票的结果和我们之前直接用系统调用加锁是一样的。

上面这种加锁的风格被称为RAII加锁。

test.cpp代码:

#include "mutex.hpp"

int ticket = 1000;

class threadData
{
public:

    threadData(int number, pthread_mutex_t* lock)
        :_lock(lock)
    {
        threadname = "thread-" + to_string(number);
    }

public:

    string threadname;
    pthread_mutex_t* _lock;
};

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

void* getTicket(void* args)
{
    threadData *td = static_cast<threadData *>(args);
    const char *name = td->threadname.c_str();

    while (1)
    {
        LockGuard<int> getlock(&lock);
        //pthread_mutex_lock(&lock);
        if(ticket > 0)
        {
            usleep(1000);
            printf("who=%s, get a ticket: %d\n", name, ticket);
            ticket--;
            //pthread_mutex_unlock(&lock);
        }
        else
        {
        	//pthread_mutex_unlock(&lock);
            break;
        }

        usleep(13);
    }

    printf("%s ... quit\n", name);
    return nullptr;
}

int main()
{
    pthread_mutex_t lock;  //创建锁
    pthread_mutex_init(&lock, nullptr);  //初始化锁 
    vector<pthread_t> tids;
    vector<threadData*> thread_datas;

    for(int i = 1; i <= 4; i++)
    {
        pthread_t tid;
        threadData* td = new threadData(i, &lock);  //将锁传递给线程
        thread_datas.push_back(td);
        pthread_create(&tid, nullptr, getTicket, thread_datas[i - 1]);
        tids.push_back(tid);
    }


    for (auto& thread : tids)
    {
        pthread_join(thread, nullptr);
    }

    for (auto& td : thread_datas)
    {
        delete td;
    }

    //pthread_mutex_destroy(&lock);  //销毁锁

    return 0;
}

5. 可重入VS线程安全

5.1 重入概念

(1)重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。

之前在信号部分就提到过重入,进程在执行一个函数,收到某个信号在处理信号时又调用了这个函数。今天在多线程这里,理解重入更加容易,我们上面写的多线程代码都是重入的。

(2)可重入和不可重入的区别:一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。

(3)常见可重入情况:

  • 不使用全局变量或静态变量。
  • 不使用用malloc或者new开辟出的空间。
  • 不返回静态或全局数据,所有数据都有函数的调用者提供。

(4)常见不可重入情况:

  • 调用了malloc/free函数,因为malloc函数是用全局链表来管理堆的函数。
  • 可重入函数体内使用了静态的数据结构。
  • 调用了标准I/O库函数,标准I/O库的很多实现都以不可重入的方式使用全局数据结构。

总的来说,一个函数中如果使用了全局数据,或者静态数据,以及堆区上的数据,就是不可重入的,反之就是可重入的。

5.2 线程安全

多个线程并发同一段代码时,不会出现不同的结果(数据不一致)。常见对全局变量或者静态变量进行操作,并且没有锁保护的情况下,会出现该问题。

互斥锁就是让不安全的线程变安全,也就是前面我们所学习的内容。

(1)常见线程安全情况:

  • 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的。
  • 类或者接口对于线程来说都是原子操作。
  • 多个线程之间的切换不会导致该接口的执行结果存在二义性。

多线程共同执行的代码段中,如果有全局变量或者静态变量并且没有保护,那么就是线程不安全的。

(2)常见线程不安全情况:

  • 不保护共享变量的函数。
  • 函数状态随着被调用,状态发生变化的函数。
  • 返回指向静态变量指针的函数。

(3)可重入与线程安全的联系:

多线程是通过调用函数来实现的,所以线程安全和重入就存在一些联系:

  • 函数是可重入的,那就是线程安全的,因为没有全局或者静态变量,不会产生数据不一致问题。
  • 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题。出发对不可重入函数的全局变量进行保护。
  • 如果一个函数中有全局变量并且没有保护,那么这个函数既不是线程安全也不是可重入的。

(4)可重入与线程安全的区别:

可重入和线程安全是不同的两个东西,但是又存在一定的交集。

  • 可重入说的是函数。
  • 线程安全说的是线程。

可重入函数是线程安全函数的一种,因为不存在全局或者静态变量。

线程安全不一定是可重入的,而可重入函数则一定是线程安全的。因为线程安全的情况可能是对全局变量进行了保护(加了锁)。

由于线程可以加锁,所以说线程安全的情况比可重入要多。

6. 死锁

6.1 死锁的概念

我们前面例子中写的都是只有一把锁的情况,在实际使用中有可能会存在多把锁,此时就可能造成死锁。

  • 死锁:一组执行流中的各个执行流均占有不会释放的锁资源,但因互相申请被其他进程所站用不会释放的锁资源而处于的一种永久等待状态。

通俗来说就是一个线程自己持有锁,并且不会释放,但是还要申请其他线程的锁,此时就容易造成死锁。

一把锁也是会死锁的,连续申请俩次就是死锁。

在上面演示一个线程暂时申请锁失败而阻塞时,就是死锁。

死锁的逻辑链条:

可以看到,往往解决一个问题就会引出新的问题,然后再区解决新的问题。

6.2 死锁的必要条件

(1)死锁的四个必要条件:

  1. 互斥条件:一个资源每次只能被一个执行流使用

这一点不用说,只要用到锁就会互斥。

  1. 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

请求就是指一个执行流申请其他锁,保持是指不释放自己已经持有的锁。

  1. 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

一个执行流已经持有锁,在不主动释放前不能强行剥夺。

  1. 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系请求


线程A,B,C都持有一把锁,并且不释放。

  • 线程A 申请 线程B持有的锁B
  • 线程B 申请 线程C持有的锁C
  • 线程C 申请 线程A持有的锁A

此时就构成了环路阻塞等待。

只有符合上面四个条件就会造成死锁。而要破坏死锁只要破坏其中一个条件即可。

(2)避免死锁:

四个必要条件中的第一个无法破坏,因为我们使用的就是锁,锁就具有互斥的性质。只能破坏其他三个条件。

  • 避免锁未释放的场景

这是为了破坏请求与保持条件。当一个执行流在申请另一个锁的时候,要先释放已经持有的锁再申请。

  • 加锁顺序一致

这是为了避免形参环路等待,只要不构成环路即可。

  • 资源一次性分配

临界资源尽量一次性分配好,不要分布在太多的地方加锁,这样的话导致死锁的概率就会增加。

(3)避免死锁的算法:

  • 死锁检测算法
  • 银行家算法

上面两种算法了解即可。

采用算法来避免死锁时,就会有一个执行流专门用来监测其他执行流的状态,一旦发现某个执行流长时间没有执行,就释放它所持有的锁。


从解锁的伪代码只能可以看出,解锁是可以由其他线程来完成的,只需要将锁重新赋值到锁的共享资源变量中即可。

总结:互斥锁在实际中能不用就不用,实在没有办法的时候也要尽量少用。

7. Linux线程同步

7.1 同步概念与竞态条件

首先抛出一个问题:线程互斥,它是对的,但是它在任何场景合理都吗??

答:不一定合理。

举个例子:

  • 我们去食堂打饭,食堂打饭的规则是竞争式的抢饭(不用排队)。
  • 力气大的人会优先去抢到饭(男生 --优先级高的线程),力气小的就会一直抢不到饭(女生 – 优先级小的线程)。
  • 这种规则没有错,确实食堂阿姨一次只能给一个人打饭,但是不合理,会造成弱小的人的饥饿问题(迟迟没有吃到饭)!!!
  • 在多线程竞争锁来看,优先级高的线程会一直优先申请到锁资源,而优先级低的线程会长时间得不到对应的资源,会造成多执行流下的饥饿问题!!!
  • 互斥下的饥饿问题:多线程下的某个执行流,长时间得不到某种资源。

同步概念:

  • 同步:在保证数据安全的前提下(互斥),让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序(CPU调度)问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解。
  • 同步与互斥是互帮互助的,互斥是解决线程安全问题,而同步是解决合理性问题。

7.2 条件变量

我们已经直到同步是什么了,那么如何实现同步与互斥呢?

答:条件变量。

条件变量概念:

  • 当一个线程互斥地访问临界资源时,需要另一个线程对临界资源的状态做改变,就需要条件变量了。
  • 例如:一个线程访问队列时,发现队列为空,它只能等待,直到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
  • 当条件(由程序员设置)满足时,设置的条件变量就会阻塞等待执行该函数的线程,并且释放锁,随后下一个线程就会申请到锁继续判断。

下面代码后面详细讲解:

pthread_mutex_lock()
if (YES/NO)
{
	pthread_cond_wait()
}
// ....做其他事情
pthread_cond_signal() // 或者唤醒其他线程, 也可以在主线程判断唤醒
pthread_mutex_unlock(); // 解锁

7.3 条件变量相关接口

7.3.1 初始化和销毁条件变量

(1)初始化条件变量有二种方法:

第一种方法:静态分配

#include <pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

代码解析:

  • pthread_cond_t是条件变量,它是一个联合体,里面有一个结构体描述条件变量的属性
  • PTHREAD_COND_INITIALIZER:它是一个宏,用于初始化条件变量
  • 注意:静态分配不用释放条件变量

第二个方法:动态分配

#include <pthread.h>
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrictattr);

参数:

  • cond:要初始化的条件变量(pthread_mutex_t变量的地址)。
  • restrictattr:设置条件变量的属性,一般为NULL/nullptr。

返回值:初始化成功返回0,失败返回一个错误码errno。
注意:动态分配需要释放条件变量。

(2)销毁条件变量:

#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond)

参数:

  • cond:要销毁的条件变量(pthread_cond_t变量的地址)。

返回值:初始化成功返回0,失败返回一个错误码errno。

销毁条件变量需要注意:

  • 使用 PTHREAD_ COND_ INITIALIZER 初始化的条件变量不需要销毁。
  • 使用 pthread_cond_init初始化的条件变量,需要进行销毁。

7.3.2 阻塞等待条件函数

#include <pthread.h>
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

参数:

  • cond:阻塞要在这个条件变量上等待的线程**(pthread_cond_t变量的地址)**。
  • mutex:互斥锁,同步需要与互斥锁绑定使用,因为阻塞等待时,会释放该线程的锁,后面被唤醒时,会重新获取锁,后面代码感受。

返回值:成功完成后,返回零值;否则,返回错误编号(errno)以指示错误。

7.3.3 唤醒阻塞等待的条件变量函数

#include <pthread.h>
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

参数:

  • cond:唤醒在条件变量上等待的线程(pthread_cond_t变量的地址)。
  • pthread_cond_broadcast:唤醒被阻塞等待的全部线程。
  • pthread_cond_signal:唤醒被阻塞等待的一个线程,按顺序唤醒。

返回值:如果成功,pthread_cond_broadcast()和pthread-cond_signal()函数返回零;否则应返回一个错误编号(errno)以指示错误。

使用同步与互斥实现多线程间轮询运行,在主线程唤醒被条件变量阻塞的线程。

7.4 为什么 pthread_cond_wait 需要互斥锁?

条件与条件变量

  • 条件:对应的共享资源的状态,(比如抢票,票数小于0,就不能抢了),通过判断的方式,来判断对应的资源是否符合要求。
  • 条件变量:在条件满足或不满足的前提下,进行wait(等待) 或 signal(唤醒)的一种方式。

结论:

  • 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足。
  • 所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知(唤醒)等待在条件变量上的线程
  • 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。


按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:

// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) 
{
	pthread_mutex_unlock(&mutex);
	// 解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过 -- 发生线程切换
	pthread_cond_wait(&cond);
	pthread_mutex_lock(&mutex);
} 

pthread_mutex_unlock(&mutex);

结论:

  • 由于解锁和等待不是原子操作。调用解锁之后, pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是一个原子操作。
  • int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex); 进入该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复成原样 – 原子操作。

条件变量使用规范:

  • 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
//修改条件
pthread_mutex_unlock(&mutex);
  • 给条件发送信号代码
pthread_mutex_lock(&mutex);
//设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

8. 生产者消费者模型

8.1 为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

这个阻塞队列就是用来给生产者和消费者解耦的。

8.2 生产者消费者模型优点

  • 解耦
  • 支持并发
  • 支持忙闲不均

8.3 基于阻塞队列的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。

8.4 C++模拟阻塞队列的生产消费模型

(1)BlockQueue.hpp:

#include <iostream>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
using namespace std;

#define NUM 8

class blockqueue
{
public:
    blockqueue(int size = NUM)   //构造函数
        :_size(size)
    {
        pthread_mutex_init(&_lock, nullptr);
        pthread_cond_init(&_full, nullptr);
        pthread_cond_init(&_empty, nullptr);
    }

    void pushdata(const int& data)   //生产
    {
        lockqueue();   //加锁
        while(IsFull())
        {
            //数据已满
            NotifyConsumer();   //唤醒消费者
            cout << "queue full, notify consume data, product stop." << endl;
            producterwait();  //生产者等待
        }

        _q.push(data);
        NotifyConsumer();
        unlockqueue();   //解锁
    }

    void popdata(int& data)   //消费
    {
        lockqueue();   //加锁
        while(IsEmpty())
        {
            //空列表
            NotifyProducter();
            cout << "queue empty, notify product data, consume stop." << endl;
            consumerWait();
        }

        data = _q.front();
        _q.pop();
        NotifyProducter();
        unlockqueue();   //解锁
    }

    ~blockqueue()   //析构函数
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_full);
        pthread_cond_destroy(&_empty);
    }

private:
    void lockqueue()  //加锁
    {
        pthread_mutex_lock(&_lock);
    }

    void unlockqueue()  //解锁
    {
        pthread_mutex_unlock(&_lock);
    }

    void producterwait()    //生产者线程等待
    {
        pthread_cond_wait(&_full, &_lock);
    }

    void consumerWait()   //消费者线程等待
    {
        pthread_cond_wait(&_empty, &_lock);
    }

    void NotifyProducter()   //唤醒阻塞等待的生产者
    {
        pthread_cond_signal(&_full);
    }

    void NotifyConsumer()   //唤醒阻塞等待的消费者
    {
        pthread_cond_signal(&_empty);
    }

    bool IsEmpty()   //判断队列是否是空
    {
        return (_q.size() == 0 ? true : false);
    }

    bool IsFull()    //判断队列是否满
    {
        return (_q.size() == _size ? true : false);
    }

    queue<int> _q;
    int _size;
    pthread_mutex_t _lock;
    pthread_cond_t _full;
    pthread_cond_t _empty;
};

(2)test.cpp:

#include "BlockQueue.hpp"

void* consumer(void* args)
{
    blockqueue* bqc = static_cast<blockqueue*>(args);
    int data;
    while(1)
    {
        bqc->popdata(data);
        cout << "Consume data done : " << data << endl;
        //usleep(10000);
        sleep(1);
    }
}

void* producter(void* args)
{
    blockqueue* bqp = static_cast<blockqueue*>(args);
    srand((unsigned long)time(NULL));
    while (1)
    {
        int data = rand() % 1024;
        bqp->pushdata(data);
        cout << "Prodoct data done: " << data << endl;
        //usleep(10000);
        sleep(1);
    }
}

int main()
{
    blockqueue bq;

    pthread_t con;
    pthread_t pro;
    pthread_create(&con, nullptr, consumer, (void*)&bq);
    pthread_create(&pro, nullptr, producter, (void*)&bq);

    pthread_join(con, NULL);   //线程等待消费者
    pthread_join(pro, NULL);   //线程等待生产者

    return 0;
}

(3)运行结果:

8.5 POSIX信号量

8.5.1 什么是POSIX信号量?

POSIX和System V都是可移植的操作系统接口标准,它们都定义了操作系统为应用程序提供的接口标准。

  • POSIX信号量和System V信号量作用相同,都是用于同步和互斥操作,以达到无冲突的访问共享资源目的。
  • System V版本的信号量只适用于实现进程间的通信,而POSIX版本的信号量主要用于实现线程之间的通信

信号量(信号灯)本质是一个是用来对临界资源进行更细粒度地描述和管理的计数器。

POSIX信号量主要用于实现线程间的同步。

8.5.2 POSIX信号量实现原理

(1)信号量的结构如下:

(2)结构体成员:

  • count:记录还有多少小块的临界资源未被使用。
  • queue:当count为0时,其它未申请到信号量的线程的task_struct地址会被放到信号量等待队列中阻塞挂起。

(3)信号量的PV操作:

  • P操作:我们把申请信号量得操作称为P操作,申请信号量的本质就是申请获得整块临界资源中某小块资源的使用权限,当申请成功时临界资源中小块资源的数目应该减一,因此P操作的本质就是让count- -。
  • V操作:我们将释放信号量称为V操作,释放信号量的本质就是归还临界资源中某块资源的使用权限,当释放成功时临界资源中资源的数目就应该加一,因此V操作的本质就是让count++。

申请不到信号量的线程被阻塞挂起

当count为0时,表示不允许其它线程再访问临界资源,这时其它申请信号量的线程会被阻塞到该信号量的等待队列中,直到其它线程释放信号量。

8.5.3 POSIX信号量接口函数

(1)初始化信号量:

#include <semaphore.h>   //头文件
int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • pshared:0表示线程间共享,非零表示进程间共享。
  • value:信号量初始值。

(2)销毁信号量:

#include <semaphore.h>   //头文件
int sem_destroy(sem_t *sem);

参数:

  • sem:要销毁的信号量。

(3)等待信号量:

#include <semaphore.h>   //头文件
int sem_wait(sem_t *sem); //P()

功能:等待信号量,会将信号量的值减1
参数:

  • sem:要进行等待的信号量。
    (4)发布信号量:
#include <semaphore.h>   //头文件
int sem_post(sem_t *sem); //V()

功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。

如上生产者-消费者的例子是基于queue的,其空间可以动态分配,如下基于固定大小的环形队列重写这个程序(POSIX信号量)

8.6 基于环形队列的生产消费模型

8.6.1 环形队列

环形队列的模型图如下,生产者和消费者共用一个容器,一开始两者从同一个地方出发,生产者先放数据,消费者跟在后面拿数据。为了保证这个行为可以持续,我们需要遵守以下几个规则。

(1)消费者不能超过生产者
消费者取数据的前提就是,所在位置中有数据。如果生产者只生产了三个数据,但是消费者却已经走了四个位置,这很显然是不合理的,当所在位置无数据时,消费者应该停下来。

所以我们采取的措施是设置空白位置的信号量blank_sem、数据个数的信号量data_sem。

信号量的本质就是一个计数器,生产者要添加数据,就需要空位置,所以需要一个记录有多少空白位置的计数器;消费者要减少数据,就需要有数据,所以需要一个有多少数据可供自己消费的计数器。可消费的数据个数为0时,data_sem就会阻止消费者继续消费数据,同时将消费者线程挂起!

(2)生产者 - 消费者通过下标添加或者消费数据
容器的大小是有限的,假设容量大小capacity = 10,因为这里是一个环形队列,采用push_back尾插的方式会造成越界!

所以这里引入下标 p_index、c_index,分别表示生产者生产到哪个位置了、消费者消费到哪个位置了。

每当生产者添加一个数据,p_index++,然后 p_index %= capacity。
每当消费者消费一个数据,c_index++,然后 c_index %= capacity。
ps:取模的目的是将下标控制在 0~capacity-1之间

(3)如下基本规则

  1. 生产者只关心是否还有格子用来生产数据。
  2. 消费者只关心环形队列中是否还有数据。
  3. 一开始没有数据,生产者和消费者指向同一个位置,这时生产者要先执行生产操作,消费者阻塞挂起;数据满时,生产者和消费者也指向同一个位置,这时消费者先执行消费操作再轮到生产者生产。
  4. 生产者和消费者不能同时访问队列中的同一个位置。
  5. 生产者和消费者可以并发访问环形队列中的不同位置。

8.6.2 环形队列的实现

(1)成员变量说明:

  • 这里用一个数组来模拟环形队列,因为生产者和消费者要并发执行且不能同时操作相同位置的数据,刚好数组可以通过下标随机访问数据,所以这里我们选用数组。
  • 定义了两个无符号整型对象p_index和c_index分别指向生产者要生产数据的格子下标和消费者要拿取数据的位置下标。
  • 还定义了_proSem和_cusSem两个信号量对象,分别记录着环形队列中格子数量和以生产数据个数。
  • 最后还有必要记录环形队列的容量大小,可以用它来取模更新p_index和c_index的值。

(1)RingQueue.hpp:

#pragma once
#include <iostream>
#include <semaphore.h>
#include <vector>
#include <ctime>
#include <unistd.h>
#include <pthread.h>
using namespace std;

#define N 11
#define CUS 3
#define PRO 2

template<class T>
class RingQueue
{
public:
    RingQueue(int capacity = N)   //构造函数
        :_v(capacity)
        ,_capacity(capacity)
        ,p_index(0)
        ,c_index(0)
    {
        sem_init(&_proSem, 0, N);
        sem_init(&_cusSem, 0, 0);
    }

    void push(const T& data)   // 生产者生产数据
    {
        P(_proSem);
        lock(p_lock);
        _v[p_index] = data;
        p_index++;
        p_index %= _capacity;
        unlock(p_lock);
        V(_cusSem);
    }

    T pop()   // 消费者消费数据
    {
        P(_cusSem);
        lock(c_lock);
        T tmp = _v[c_index];
        c_index++;
        c_index %= _capacity;
        unlock(c_lock);
        V(_proSem);

        return tmp;
    }

    ~RingQueue()   //析构函数
    {
        sem_destroy(&_cusSem);
        sem_destroy(&_proSem);
    }

private:    
    void lock(pthread_mutex_t& mutex)   //加锁
    {
        pthread_mutex_lock(&mutex);
    }

    void unlock(pthread_mutex_t& mutex)   //解锁
    {
        pthread_mutex_unlock(&mutex);
    }

    void P(sem_t& s)    // 申请信号量
    { 
      sem_wait(&s);    
    }

    void V(sem_t& s)    // 释放信号量
    {
      sem_post(&s);    
    }

    vector<T> _v;   // 循环队列
    sem_t _proSem;  // 记录队列中空格数量的信号量
    sem_t _cusSem;  // 记录队列中数据数量的信号量
    int p_index;    // 记录当前空格所在下标    
    int c_index;    // 记录当前数据所在下标
    int _capacity;  // 记录环形队列容量
    pthread_mutex_t c_lock;    // 消费者加锁
    pthread_mutex_t p_lock;    // 生产者加锁
};

成员函数说明:

  • 将信号量的PV操作进行了封装,只需把信号量对象作为参数传入就能完成信号量的申请、释放操作。
  • 生产者执行Push()操作生产数据时,需要先申请(减一)_proSem信号量,生产完成后释放(加一)_cusPos信号量,让消费者来消费。反之亦然

(2)单生产单消费:

#include "RingQueue.hpp"

void* Customer(void* argc)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(argc);
    while(1)
    {
        sleep(1);
        int data = 0;
        data = rq->pop();
        cout << "消费者消费了一个数据: " << data << endl;
    }
}

void* Producer(void* argc)
{
    RingQueue<int>* rq = static_cast<RingQueue<int>*>(argc);
    while(1)
    {
        int data = rand() % N + 1;
        rq->push(data);
        cout << "生者生产了一个数据: " << data << endl;
    }
}

int main()
{
    srand(time(nullptr));  // 1、制造随机数种子,作为生产者push到环形队列当中的数据
    RingQueue<int>* rq = new RingQueue<int>();  // 2、new一个环形队列  

    // 3、分别创建、等待一个生产者和一个消费者  
    pthread_t tid1;
    pthread_t tid2;
    pthread_create(&tid1, nullptr, Customer, (void*)rq);    
    pthread_create(&tid2, nullptr, Producer, (void*)rq);

    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);

    // 4、最后delete环形队列  
    delete rq;

    return 0;
}

(3)运行结果:


(4)多生产多消费:
因为循环队列的生产者和消费者的p_index和c_index只有一个,这样在多生产多消费的情况下就会产生互斥,所以需要加锁进行保护。

#include "RingQueue.hpp"
#include "Task.hpp"
struct ThreadData
{
    RingQueue<Task> *rq;
    std::string threadname;
};

void* Customer(void* argc)
{
    ThreadData* td = static_cast<ThreadData*>(argc);
    RingQueue<Task>* rq = td->rq;
    string name = td->threadname;
    while(1)
    {
        sleep(1);
        Task t = rq->pop();

        t();
        cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;
    }
}

void* Producer(void* argc)
{
    ThreadData* td = static_cast<ThreadData*>(argc);
    RingQueue<Task>* rq = td->rq;
    string name = td->threadname;
    int len = opers.size();
    while(1)
    {
        int data1 = rand() % 10 + 1;
        usleep(10);
        int data2 = rand() % 10;
        char op = opers[rand() % len];
        Task t(data1, data2, op);

        rq->push(t);
        cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;
    }
}

int main()
{
    srand(time(nullptr));  // 1、制造随机数种子,作为生产者push到环形队列当中的数据
    RingQueue<Task>* rq = new RingQueue<Task>;  // 2、new一个环形队列  

    // 创建、等待多个生产者和多个消费者
    pthread_t custid[CUS];
    pthread_t protid[PRO];
    for(int i = 0; i < CUS; i++)
    {
        ThreadData* td = new ThreadData();
        td->rq = rq;
        td->threadname = "Consumer-" + std::to_string(i);

        pthread_create(custid + i, nullptr, Customer, (void*)td);
    }

    for(int i = 0; i < PRO; i++)
    {
        ThreadData* td = new ThreadData();
        td->rq = rq;
        td->threadname = "Producer-" + std::to_string(i);

        pthread_create(protid + i, nullptr, Producer, (void*)td);
    }

    for(int i = 0; i < CUS; i++)
    {
        pthread_join(custid[i], nullptr);
    }

    for(int i = 0; i < PRO; i++)
    {
        pthread_join(protid[i], nullptr);
    }

    // 4、最后delete环形队列  
    delete rq;
    return 0;
}

(5)Task.hpp:

#pragma once
#include <iostream>
#include <string>

std::string opers="+-*/%";

enum{
    DivZero=1,
    ModZero,
    Unknown
};

class Task
{
public:
    Task()
    {}
    Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0)
    {
    }
    void run()
    {
        switch (oper_)
        {
        case '+':
            result_ = data1_ + data2_;
            break;
        case '-':
            result_ = data1_ - data2_;
            break;
        case '*':
            result_ = data1_ * data2_;
            break;
        case '/':
            {
                if(data2_ == 0) exitcode_ = DivZero;
                else result_ = data1_ / data2_;
            }
            break;
        case '%':
           {
                if(data2_ == 0) exitcode_ = ModZero;
                else result_ = data1_ % data2_;
            }            break;
        default:
            exitcode_ = Unknown;
            break;
        }
    }
    void operator ()()
    {
        run();
    }
    std::string GetResult()
    {
        std::string r = std::to_string(data1_);
        r += " ";
        r += oper_;
        r += " ";
        r += std::to_string(data2_);
        r += " = ";
        r += std::to_string(result_);
        r += "[code: ";
        r += std::to_string(exitcode_);
        r += "]";

        return r;
    }
    std::string GetTask()
    {
        std::string r = std::to_string(data1_);
        r += " ";
        r += oper_;
        r += " ";
        r += std::to_string(data2_);
        r += " = ?";
        return r;
    }
    ~Task()
    {
    }

private:
    int data1_;
    int data2_;
    char oper_;

    int result_;
    int exitcode_;
};

(6)运行结果:

9. 线程池

9.1 基本概念

线程池(thread pool):一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在短时间任务创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数据取决于可用的并发处理器、处理器内核、内存、网络sockets等数量。

9.2 线程池工作的四种情况

9.2.1 主程序当前没有任务要执行,线程池中任务队列为空闲状态

下面情况下所有工作线程处于空闲的等待状态,任务缓冲队列为空。

9.2.2 主程序添加小于等于线程池中线程数量得任务

基于10.2.1情况,所有的工作线程已处于等待状态,主线程开始添加三个任务,添加后通知唤醒线程池中的线程开始取任务执行。此时的任务缓冲队列还是空。

9.2.3 主程序添加任务数量大于当前线程池中线程数量的任务

基于10.2.2 情况,所有工作线程都在工作中,主线程开始添加第四个任务,添加后发现现在线程池中线程用完了,于是存入任务缓冲队列。工作线程空闲后主动从任务队列取任务执行。

9.2.4 主程序添加任务数量大于当前线程池中线程数量的任务,且任务缓冲队列已满

此情况发生情形3且设置了任务缓冲队列大小后面,主程序添加第N个任务,添加后发现线程池中线程已经用完了,任务缓冲队列已满,于是进入等待状态,等待任务缓冲队列中任务腾空通知。但是这种情形会阻塞主线程,本文不限制任务队列的大小,必要时再优化。

9.3 线程池的实现


线程池的主要组成由三个部分构成:

  • 任务队列(Task Quene)
  • 线程池(Thread Pool)
  • 完成队列(Completed Tasks)

(1)ThreadPool.hpp:
等待通知机制通过条件变量来实现。

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;

#define N 5

struct ThreadInfo
{
    pthread_t tid;
    std::string name;
};

template<class T>
class ThreadPool
{
public:
    void lock()   //加锁
    {
        pthread_mutex_lock(&_mutex);
    }

    void unlock()   //解锁
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Wakeup()   //唤醒线程
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsQueueEmpty()
    {
        return _qt.empty();
    }

    string GetThreadName(pthread_t tid)
    {
        for(const auto& ti : _v)
        {
            if(ti.tid == tid)
            {
                return ti.name;
            }
        }

        return "None";
    }

    ThreadPool(int num = N)   //构造函数
        :_v(N)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        string name = tp->GetThreadName(pthread_self());

        while(1)
        {
            tp->lock();
            while(tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }

            T t = tp->pop();
            tp->unlock();

            t();
            cout << name << " run, "<< "result: " << t.GetResult() << endl;
        }
    }

    void Start()   //创建线程池
    {
        int num = _v.size();
        for(int i = 0; i < N; i++)
        {
            _v[i].name = "thread-" + to_string(i + 1);
            pthread_create(&(_v[i].tid), nullptr, HandlerTask, this);
        }
    }

    T pop()   //从队列当中取出数据
    {
        T tmp = _qt.front();
        _qt.pop();

        return tmp;
    }

    void Push(const T& t)   //往队列当中插入数据
    {
        lock();
        _qt.push(t);
        Wakeup();
        unlock();
    }
 
    ~ThreadPool()   //析构函数
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    vector<ThreadInfo> _v;   //线程池
    queue<T> _qt;    //任务队列

    pthread_mutex_t _mutex;  //锁
    pthread_cond_t _cond;   //条件变量
};

(2)test.cpp:
线程池最重要的方法是负责向队列添加任务,由主函数向队列添加任务。

#include "Task.hpp"
#include "ThreadPool.hpp"

int main()
{
    ThreadPool<Task>* tp = new ThreadPool<Task>(5);
    tp->Start();
    srand(time(nullptr));

    while(1)
    {
        //1. 构建任务
        int x = rand() % 11 + 1;
        usleep(10);
        int y = rand() % 5 + 1;
        char op = opers[rand() % opers.size()];
        Task t(x, y, op);
        tp->Push(t);

        //2. 交给线程池处理
        std::cout << "main thread make task: " << t.GetTask() << std::endl;
        sleep(1);
    }

    return 0;
}

(3)运行结果:

9.4 对线程进行简单的封装

#pragma once
#include <iostream>
#include <pthread.h>
#include <vector>
#include <unistd.h>
using namespace std;

typedef void (*callback_t)();
static int num = 1;

class Thread
{
public:
    Thread(callback_t cb)
        :_cb(cb)
        ,_start_timestamp(0)
        ,_isrunning(false)
        ,_name("")
    {}

    static void* routine(void* args)
    {
        Thread* t = static_cast<Thread*>(args);
        t->Entery();
        return nullptr;
    }

    void run()
    {
        _isrunning = true;
        _name = "thread-" + to_string(num++);
        _start_timestamp = time(nullptr);
        pthread_create(&_tid, nullptr, routine, this);
    }

    void join()
    {
        pthread_join(_tid, nullptr);
        _isrunning = false;
    }

    string name()
    {
        return _name;
    }

    uint64_t StartTimestamp()
    {
        return _start_timestamp;
    }

    bool IsRunning()
    {
        return _isrunning;
    }

    void Entery()
    {
        _cb();
    }
private:
    pthread_t _tid;   //线程的tid
    string _name;    //线程名
    uint64_t _start_timestamp;   //线程创建的时间
    bool _isrunning;   //线程是否在运行

    callback_t _cb;    //线程所要执行的函数
};

callback_t 是函数指针,是线程需要执行的函数

10. 线程安全的单例模式

10.1 单例模式的特点

某些类, 只应该具有一个对象(实例化),就称之为单例。
例如一个男人只能有一个媳妇。
在很多服务器开发场景中,经常需要让服务器加载很多的数据 (上百G) 到内存中,此时往往要用一个单例的类来管理这些数据。

10.2 饿汉实现方式和懒汉实现方式

(1)洗碗的例子:

  • 吃完饭,立刻洗碗,这种就是饿汉方式,因为下一顿吃的时候可以立刻拿着碗就能吃饭。
  • 吃完饭,先把碗放下,然后下一顿饭用到这个碗了再洗碗,就是懒汉方式。

懒汉方式最核心的思想是 “延时加载”,从而能够优化服务器的启动速度。

(2)饿汉方式实现单例模式:

template <typename T>
class Singleton 
{
	static T data;
public:
	static T* GetInstance() 
	{
		return &data;
	}
};

只要通过 Singleton 这个包装类来使用 T 对象,则一个进程中只有一个T对象的实例。

(3)懒汉方式实现单例模式:

template <typename T>
class Singleton 
{
	static T* inst;
public:
	static T* GetInstance() 
	{
		if (inst == NULL) 
		{
			inst = new T();
		}
		
		return inst;
	}
};

存在一个严重的问题,线程不安全。
第一次调用 GetInstance 的时候,如果两个线程同时调用,可能会创建出两份 T 对象的实例。
但是后续再次调用,就没有问题了。

10.3 懒汉方式实现单例模式(将线程池该成单例)

(1)ThreadPool.hpp:

#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
using namespace std;

#define N 5

struct ThreadInfo
{
    pthread_t tid;
    std::string name;
};

template<class T>
class ThreadPool
{
public:
    void lock()   //加锁
    {
        pthread_mutex_lock(&_mutex);
    }

    void unlock()   //解锁
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Wakeup()   //唤醒线程
    {
        pthread_cond_signal(&_cond);
    }

    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    bool IsQueueEmpty()
    {
        return _qt.empty();
    }

    string GetThreadName(pthread_t tid)
    {
        for(const auto& ti : _v)
        {
            if(ti.tid == tid)
            {
                return ti.name;
            }
        }

        return "None";
    }

    static void* HandlerTask(void* args)
    {
        ThreadPool<T>* tp = static_cast<ThreadPool<T>*>(args);
        string name = tp->GetThreadName(pthread_self());

        while(1)
        {
            tp->lock();
            while(tp->IsQueueEmpty())
            {
                tp->ThreadSleep();
            }

            T t = tp->pop();
            tp->unlock();

            t();
            cout << name << " run, "<< "result: " << t.GetResult() << endl;
        }
    }

    void Start()   //创建线程池
    {
        int num = _v.size();
        for(int i = 0; i < N; i++)
        {
            _v[i].name = "thread-" + to_string(i + 1);
            pthread_create(&(_v[i].tid), nullptr, HandlerTask, this);
        }
    }

    T pop()   //从队列当中取出数据
    {
        T tmp = _qt.front();
        _qt.pop();

        return tmp;
    }

    void Push(const T& t)   //往队列当中插入数据
    {
        lock();
        _qt.push(t);
        Wakeup();
        unlock();
    }

    static ThreadPool<T>* GetInstance()
    {
        if(_tp == nullptr)  //因为if只会进去一次,所以后面在加锁会影响效率,所以加锁前在判断一次即可避免
        {
            pthread_mutex_lock(&_lock);
            if (_tp == nullptr)
            {
                _tp = new ThreadPool<T>();
            }

            pthread_mutex_unlock(&_lock);
        }

        return _tp;
    }

private:
    ThreadPool(int num = N)   //构造函数
        :_v(N)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
    }

    ~ThreadPool()   //析构函数
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    ThreadPool(const ThreadPool<T>& tp) = delete;

    const ThreadPool<T>& operator=(const ThreadPool<T>& tp) = delete; // a=b=c

    vector<ThreadInfo> _v;   //线程池
    queue<T> _qt;    //任务队列

    pthread_mutex_t _mutex;  //锁
    pthread_cond_t _cond;   //条件变量

    static ThreadPool<T>* _tp;
    static pthread_mutex_t _lock;
};

template <class T>
ThreadPool<T>* ThreadPool<T>::_tp = nullptr;

template <class T>
pthread_mutex_t ThreadPool<T>::_lock = PTHREAD_MUTEX_INITIALIZER;

(2)test.cpp:

#include "Task.hpp"
#include "ThreadPool.hpp"

int main()
{
    ThreadPool<Task>* tp = ThreadPool<Task>::GetInstance();
    //ThreadPool<Task>* tp = new ThreadPool<Task>(5);
    tp->Start();
    srand(time(nullptr));

    while(1)
    {
        //1. 构建任务
        int x = rand() % 11 + 1;
        usleep(10);
        int y = rand() % 5 + 1;
        char op = opers[rand() % opers.size()];
        Task t(x, y, op);
        tp->Push(t);

        //2. 交给线程池处理
        std::cout << "main thread make task: " << t.GetTask() << std::endl;
        sleep(1);
    }

    return 0;
}

将四个函数私有化后,利用静态函数GetInstance来获取唯一对象指针。

11. STL、智能指针的线程安全

(1)STL中的容器是否是线程安全的?

不是:原因是STL 的设计初衷是将性能挖掘到极致,而一旦涉及到加锁保证线程安全,会对性能造成巨大的影响。而且对于不同的容器,加锁方式的不同,性能可能也不同(例如hash表的锁表和锁桶)。
因此 STL 默认不是线程安全,如果需要在多线程环境下使用,往往需要调用者自行保证线程安全。
(2)智能指针是否是线程安全的?
对于 unique_ptr,由于只是在当前代码块范围内生效,因此不涉及线程安全问题。
对于 shared_ptr,多个对象需要共用一个引用计数变量,所以会存在线程安全问题,但是标准库实现的时候考虑到了这个问题,基于原子操作(CAS)的方式保证 shared_ptr 能够高效,原子的操作引用计数。

12. 其他常见的各种锁

  • 悲观锁:在每次取数据时,总是担心数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,行锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进行修改。主要采用两种方式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则用新值更新。若不等则失败,失败则重试,一般是一个自旋的过程,即不断重试。
  • 自旋锁
  1. 为什么会有自旋锁

在编写多线程的时候,有些公共数据读的概率远远大于修改的几率。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。我们引入了读写锁即自旋锁处理这种多读少写的情况。
=

  1. 什么是自旋锁

    • 它把对共享资源的访问者划分成读者和写者,读者只对共享资源进行读访问,写者则需要对共享资源进行写操作。

    • 这种锁相对于自旋锁而言,能提高并发性。

    • 在多处理器系统中:
      (1)对于读者:它允许同时有多个读者来访问共享资源,最大可能的读者数为实际的逻辑CPU数。
      (2) 对于写者:写者是排他性的,一个读写锁同时只能有一个写者或多个读者(与CPU数相关),但不能同时既有读者又有写者。

  2. 自旋锁相关的API函数

使用自旋锁,必须包含头文件,并链接库-lpthread

#include <pthread.h>

(1)初始化函数:

int pthread_spin_init(pthread_spinlock_t *lock, int pshared);

功能:初始化自旋锁, 当线程使用该函数初始化一个未初始化或者被destroy过的自旋锁。该函数会为自旋锁申请资源并且初始化自旋锁为unlocked状态。
参数

  • pthread_spinlock_t :要初始化自旋锁

  • pshared取值

    • PTHREAD_PROCESS_SHARED:该自旋锁可以在多个进程中的线程之间共享。(可以被其他进程中的线程看到)
    • PTHREAD_PROCESS_PRIVATE: 仅初始化本自旋锁的线程所在的进程内的线程才能够使用该自旋锁。

返回值:若成功,返回0;否则,返回错误编号

(2)销毁函数:

int pthread_spin_destroy(pthread_spinlock_t *lock);

功能:用来销毁指定的自旋锁并释放所有相关联的资源(所谓的资源指的是由pthread_spin_init自动申请的资源),如果调用该函数时自旋锁正在被使用或者自旋锁未被初始化则结果是未定义的。
参数

  • pthread_spinlock_t :要销毁的自旋锁

返回值:若成功,返回0;否则,返回错误编号

(3)加锁函数:

int pthread_spin_lock(pthread_spinlock_t *lock);

功能:用来获取(锁定)指定的自旋锁. 如果该自旋锁当前没有被其它线程所持有,则调用该函数的线程获得该自旋锁,否则该函数在获得自旋锁之前不会返回。
参数

  • pthread_spinlock_t :要加锁的自旋锁

返回值:若成功,返回0;否则,返回错误编号

(4)解锁函数:

int pthread_spin_unlock(pthread_spinlock_t *lock);

功能:用来解锁指定的自旋锁.。
参数

  • pthread_spinlock_t :要加锁的自旋锁

返回值:若成功,返回0;否则,返回错误编号

13. 读者写者问题

在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。如果给这种代码段加锁,会极大地降低我们程序的效率。

那么有没有一种方法,可以专门处理这种多读少写的情况呢? 有,那就是读写锁。

13.1 读者与写者的关系

为了解决我们的问题我们先来分析多线程中两个角色的关系:读者,写者。

  • 写者与写者:显然写者与写者之间的关系是互斥的,多个写者之间要用锁来进行保证线程安全。
  • 读者与读者:读者与读者都只是访问数据,而不会修改数据,所以多个读者访问数据没有线程安全,读者之间毫无关系。
  • 读者与写者:当读者与写者同时访问数据时,明显有线程安全的问题,只有读者读取完了再让写者写,或者写者写完了再让读者读取才是合理的。所以读者与写者之间存在互斥且同步的关系。

13.2 读写锁的API函数

使用自旋锁,必须包含头文件,并链接库-lpthread

#include <pthread.h>

(1)初始化/销毁函数:

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

功能:初始化/销毁一个读写锁。
参数

  • pthread_rwlock_t : 读写锁的数据结构;
  • pthread_rwlockattr_t : 读写锁属性的数据结构,一般直接设置为空。

返回值:若成功,返回0;否则,返回错误编号

(2)读者加锁函数:

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
  • 功能:当读者要访问数据时要申请先读者锁,拿到读者锁以后才能访问资源。
  • 返回值:若成功,返回0;否则,返回错误编号

(3)写者加锁函数:

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
  • 功能:当写者要访问数据时要申请先写者锁,拿到写者锁以后才能访问资源。
  • 返回值:若成功,返回0;否则,返回错误编号

(4)解锁函数:

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
  • 功能:无论是读者解锁还是写者解锁都要使用此函数。

  • 返回值:若成功,返回0;否则,返回错误编号

13.3 伪代码理解读写锁的原理

想要使用正确的使用读写锁就还要简单理解一下读写锁的原理,而读写锁就是要维护好上面的读者与写者的关系。

如下代码,来理解读写锁的原理。


(1)读者的逻辑:


(2)写者的逻辑:


(3)简单分析:

  • 当读者先加锁,则读者拿到阅览室(临界区)的使用权,不影响后来其他读者,只会拦截写者,直到读者解锁。
  • 当写者先加锁,后来的写者与读者都不能够进入,直到写者解锁。
  • 实际中由于读者众多,可能会有读者络绎不绝的进入阅览室(临界区),从而导致写者迟迟无法获得阅览室(临界区)的使用权,进而导致写者饥饿问题(又叫读者优先策略)


注意:写独占,读共享,读锁优先级高。

  • 写者优先策略的实现方式:当写者加锁没有成功,在写者后面来的读者全部都不能够进入阅览室(临界区),等阅览室已经存在的读者全部离开以后,写者先进入阅览室(临界区)进行修改,然后才能让写者后面来的读者进入阅览室(临界区)。
  • pthread库里面给我们提供了一个函数可以设置读写优先级,使用man查不到这个函数,但是可以在pthread.h头文件中发现,如果我们不设置默认是读者优先。
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);

/*
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
*/

13.4 读写锁的演示

#include <iostream>
#include <sstream>
#include <string>
#include <vector>
#include <pthread.h>
#include <unistd.h>

using namespace std;

// 线程的属性
struct ThreadAttr
{
    pthread_t _tid;
    string _name;
};

// 票数
volatile int ticket = 100;
// 读写锁
pthread_rwlock_t rwlock;

void rwattr_init(pthread_rwlockattr_t* pattr, int flag)  // 读写锁属性初始化
{
    pthread_rwlockattr_init(pattr);  
    if (flag == 0)   // flag为0表示读者优先,其他表示写着优先
    {
        pthread_rwlockattr_setkind_np(pattr, PTHREAD_RWLOCK_PREFER_READER_NP);
    }
    else
    {
        pthread_rwlockattr_setkind_np(pattr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
    }
}

void rwattr_destroy(pthread_rwlockattr_t* prwlock)   // 读写锁属性的销毁
{
    pthread_rwlockattr_destroy(prwlock);
}

void rwlock_init(pthread_rwlock_t* prwlock, int flag = 0)  // 锁的初始化
{
    pthread_rwlockattr_t rwattr;
    rwattr_init(&rwattr, flag);
    pthread_rwlock_init(prwlock, &rwattr);
    rwattr_destroy(&rwattr);
}

const string create_writer_name(size_t i)   // 创建name
{
    stringstream ssm("thread writer : ", ios::in | ios::out | ios::ate);
    ssm << i;
    return ssm.str();
}

const string create_reader_name(size_t i)
{
    stringstream ssm("thread reader : ", ios::in | ios::out | ios::ate);
    ssm << i;
    return ssm.str();
}

void* readerRoutine(void* args)   // 读者历程
{
    string* ps = static_cast<string*>(args);
    // 进行查票
    while (true)
    {
        pthread_rwlock_rdlock(&rwlock);
        if (ticket != 0)
        {
            cout << *ps << "  ticket number : " << ticket << endl;
        }
        else
        {
            cout << *ps << "  done!!!!!" << endl;
            // 防止死锁
            pthread_rwlock_unlock(&rwlock);
            break;
        }
        pthread_rwlock_unlock(&rwlock);

        // 休眠0.1ms
        usleep(100);
    }
}

void* writerRoutine(void* args)  // 写者历程
{
    string* ps = static_cast<string*>(args);
    // 进行改票
    while (true)
    {
        pthread_rwlock_wrlock(&rwlock);
        if (ticket != 0)
        {
            cout << *ps << "  ticket number : " << --ticket << endl;
        }
        else
        {
            cout << *ps << "  done!!!!!" << endl;
            // 防止死锁
            pthread_rwlock_unlock(&rwlock);
            break;
        }
        pthread_rwlock_unlock(&rwlock);
        // 休眠0.1ms
        usleep(100);
    }
}

void reader_init(vector<ThreadAttr>& readers)
{
    int i = 1;
    for (auto& e : readers)
    {
        e._name = create_reader_name(i++);
        pthread_create(&e._tid, nullptr, readerRoutine, &e._name);
    }
}

void writer_init(vector<ThreadAttr>& writers)
{
    int i = 1;
    for (auto& e : writers)
    {
        e._name = create_writer_name(i++);
        pthread_create(&e._tid, nullptr, writerRoutine, &e._name);
    }
}

void reader_join(const vector<ThreadAttr>& readers)
{
    for (auto& e : readers)
    {
        pthread_join(e._tid, nullptr);
    }
}

void writer_join(const vector<ThreadAttr>& writers)
{
    for (auto& e : writers)
    {
        pthread_join(e._tid, nullptr);
    }
}

int main()
{
    // 初始化锁,并设置读写者优先属性
    rwlock_init(&rwlock, 0);

    const int reader_count = 30;
    const int writer_count = 2;

    vector<ThreadAttr> readers(reader_count);
    vector<ThreadAttr> writers(writer_count);

    reader_init(readers);
    writer_init(writers);

    reader_join(readers);
    writer_join(writers);
    return 0;
}

文章来源:https://blog.csdn.net/m0_65558082/article/details/134741925
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。