参考引用
线程同步是为了对共享资源的访问进行保护
保护的目的是为了解决数据一致性的问题
出现数据一致性问题其本质在于:进程中的多个线程对共享资源的并发访问(同时访问)
当一个线程修改变量时,其它的线程在读取这个变量时可能会看到不一致的值
如何解决对共享资源的并发访问出现数据不一致的问题?
线程同步机制如何选择?
- 互斥锁:一次只允许一个线程访问临界区,其他线程需要等待释放锁,适合对共享资源进行独占式访问控制
- 读写锁:允许多个线程同时读取共享资源,但在写入时要求互斥,适合对共享资源进行读取频率高于写入频率
- 条件变量:允许线程在满足某个条件之前等待,通过通知唤醒相应的等待线程,适合某些条件满足时进行同步
- 信号量:控制多个线程对有限资源的访问权限,适合在资源数目有限且需要精确控制访问权限的情况
- 原子操作:保证特定操作的原子性,不会被中断或干扰,适合在对共享变量进行简单读取和写入操作时进行同步
使用 PTHREAD_MUTEX_INITIALIZER 宏初始化互斥锁
// PTHREAD_MUTEX_INITIALIZER 宏已经携带了互斥锁的默认属性
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
使用 pthread_mutex_init() 函数初始化互斥锁
#include <pthread.h>
// mutex:指向需要进行初始化操作的互斥锁对象
// attr:用于定义互斥锁的属性,若将参数 attr 设置为 NULL,则表示将互斥锁的属性设置为默认值
// 这种情况等价于 PTHREAD_MUTEX_INITIALIZER 初始化,不同之处在于,使用宏不进行错误检查
// 返回值:成功返回 0;失败将返回一个非 0 的错误码
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
$ sudo apt-get install manpages-posix-dev
$ man 3 pthread_mutex_init
使用示例
// 方式一
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
// 方式二
pthread_mutex_t *mutex = malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(mutex, NULL);
调用 pthread_mutex_lock() 函数对互斥锁进行上锁
调用 pthread_mutex_unlock() 函数将已经处于锁定状态的互斥锁进行解锁,以下行为均属错误
#include <pthread.h>
// 返回值:调用成功时返回 0;失败将返回一个非 0 值的错误码
int pthread_mutex_lock(pthread_mutex_t *mutex); // 对互斥锁加锁、获取互斥锁
int pthread_mutex_unlock(pthread_mutex_t *mutex); // 对互斥锁解锁、释放互斥锁
示例:使用互斥锁保护全局变量的访问
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
static pthread_mutex_t mutex;
static int g_count = 0;
static void *new_thread_start(void *arg) {
int loops = *((int*)arg);
int l_count, j;
for (j = 0; j < loops; j++) {
pthread_mutex_lock(&mutex); // 互斥锁上锁
l_count = g_count;
l_count++;
g_count = l_count;
pthread_mutex_unlock(&mutex); // 互斥锁解锁
}
return (void*)0;
}
static int loops;
int main(int argc, char *argv[]) {
pthread_t tid1, tid2;
int ret;
/* 获取用户传递的参数 */
if (argc < 2) {
loops = 10000000; // 没有传递参数默认为 1000 万次
} else {
loops = atoi(argv[1]);
}
/* 初始化互斥锁 */
pthread_mutex_init(&mutex, NULL);
/* 创建 2 个新线程 */
ret = pthread_create(&tid1, NULL, new_thread_start, &loops);
if (ret) {
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
exit(-1);
}
ret = pthread_create(&tid2, NULL, new_thread_start, &loops);
if (ret) {
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
exit(-1);
}
/* 等待线程结束 */
ret = pthread_join(tid1, NULL);
if (ret) {
fprintf(stderr, "pthread_join error: %s\n", strerror(ret));
exit(-1);
}
ret = pthread_join(tid2, NULL);
if (ret) {
fprintf(stderr, "pthread_join error: %s\n", strerror(ret));
exit(-1);
}
/* 打印结果 */
printf("g_count = %d\n", g_count);
exit(0);
}
$ gcc mutex.c -o mutex -l pthread
$ ./mutex
# 每次对 g_count 的累加总是能够保持正确,但是在运行程序的过程中,明显会感觉到锁消耗的时间会比较长
g_count = 20000000
当互斥锁已经被其它线程锁住时,调用 pthread_mutex_lock() 函数会被阻塞,直到互斥锁解锁;如果线程不希望被阻塞,可以使用 pthread_mutex_trylock() 函数尝试对互斥锁进行加锁,如果互斥锁处于未锁住状态,那么调用 pthread_mutex_trylock() 将会锁住互斥锁并立马返回,如果互斥锁已经被其它线程锁住,调用 pthread_mutex_trylock() 加锁失败,但不会阻塞,而是返回错误码 EBUSY
#include <pthread.h>
int pthread_mutex_trylock(pthread_mutex_t *mutex);
示例
...
static void *new_thread_start(void *arg) {
int loops = *((int*)arg);
int l_count, j;
for (j = 0; j < loops; j++) {
while (pthread_mutex_trylock(&mutex)); // 以非阻塞方式上锁
l_count = g_count;
l_count++;
g_count = l_count;
pthread_mutex_unlock(&mutex); // 互斥锁解锁
}
return (void*)0;
}
...
当不再需要互斥锁时,应该将其销毁,通过调用 pthread_mutex_destroy() 函数来销毁互斥锁
#include <pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
示例:销毁互斥锁
// 其余代码同 2.2 小节
...
pthread_mutex_destroy(&mutex);
...
// 线程 A
pthread_mutex_lock(mutex1);
pthread_mutex_lock(mutex2); // 死锁
// 线程 B
pthread_mutex_lock(mutex2);
pthread_mutex_lock(mutex1); // 死锁
解决办法
- 1、定义互斥锁的层级关系,当多个线程对一组互斥锁操作时,总是应该按照相同的顺序对该组互斥锁进行锁定。如在上述场景中,如果线程 A、B 总是先锁定 mutex1 再锁定 mutex2,死锁就不会出现
- 2、使用 pthread_mutex_trylock() 以不阻塞的方式尝试对互斥锁进行加锁:先用 pthread_mutex_lock() 锁定第一个互斥锁,然后使用 pthread_mutex_trylock() 锁定其余互斥锁,若任一 pthread_mutex_trylock() 调用失败,那么该线程释放所有互斥锁,可以经过一段时间之后从头再试
调用 pthread_mutex_init() 函数初始化互斥锁时可以设置互斥锁的属性,通过参数 attr 指定
当定义 pthread_mutexattr_t 对象后
#include <pthread.h>
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
互斥锁的类型属性控制着互斥锁的锁定特性,一共有 4 种类型
可以使用 pthread_mutexattr_gettype() 函数得到互斥锁的类型属性,使用 pthread_mutexattr_settype() 修改/设置互斥锁类型属性
#include <pthread.h>
int pthread_mutexattr_gettype(const pthread_mutexattr_t *attr, int *type);
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
示例
pthread_mutex_t mutex;
pthread_mutexattr_t attr;
/* 初始化互斥锁属性对象 */
pthread_mutexattr_init(&attr);
/* 将类型属性设置为 PTHREAD_MUTEX_NORMAL */
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_NORMAL);
/* 初始化互斥锁 */
pthread_mutex_init(&mutex, &attr);
......
/* 使用完之后进行销毁 */
pthread_mutexattr_destroy(&attr);
pthread_mutex_destroy(&mutex);
条件变量是线程可用的另一种同步机制
生产者-消费者模式,生产者负责生产产品,而消费者负责消费产品,对于消费者来说,没有产品的时候只能等待产品出来,有产品就使用它。使用一个变量表示这个产品,生产者生产一件产品变量加 1,消费者消费一次变量减 1
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
static pthread_mutex_t mutex;
static int g_avail = 0;
/* 消费者线程 */
static void *consumer_thread(void *arg) {
for (;;) {
pthread_mutex_lock(&mutex);
while (g_avail > 0) {
g_avail--; // 消费
}
pthread_mutex_unlock(&mutex);
}
return (void*)0;
}
/* 主线程(生产者) */
int main(int argc, char *argv[]) {
pthread_t tid;
int ret;
/* 初始化互斥锁 */
pthread_mutex_init(&mutex, NULL);
/* 创建新线程 */
ret = pthread_create(&tid, NULL, consumer_thread, NULL);
if (ret) {
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
exit(-1);
}
for (;;) {
pthread_mutex_lock(&mutex);
g_avail++; // 生产
pthread_mutex_unlock(&mutex);
}
exit(0);
}
上述代码由于新线程中会不停的循环检查全局变量 g_avail 是否大于 0,造成 CPU 资源浪费。采用条件变量可解决这一问题:条件变量允许一个线程休眠(阻塞等待)直至获取另一个线程的通知(收到信号)再执行自己的操作
- 当条件 g_avail > 0 不成立时,消费者线程会进入休眠状态,而生产者生成产品后(g_avail++,此时 g_avail 将会大于 0),向处于等待状态的线程发出 “信号”(非 Linux 信号),而其它线程收到 “信号” 后便会被唤醒
- 条件本身由互斥锁保护,线程在改变条件状态前必须首先锁住互斥锁,否则可能引发线程不安全问题
#include <pthread.h>
// 可将参数 attr 设置为 NULL,表示使用属性的默认值来初始化条件变量
// 返回值:函数调用成功返回 0,失败将返回一个非 0 值的错误码
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
#include <pthread.h>
// pthread_cond_signal() 至少能唤醒一个线程,而 pthread_cond_broadcast() 则能唤醒所有线程
// pthread_cond_broadcast() 总能产生正确结果,唤醒所有等待状态的线程,但 pthread_cond_signal() 更为高效,因为它只需确保至少唤醒一个线程即可
// 如果程序当中只有一个处于等待状态的线程,建议使用 pthread_cond_signal()
// 返回值:调用成功返回 0;失败将返回一个非 0 值的错误码
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
#include <pthread.h>
// cond:指向需要等待的条件变量,目标条件变量
// mutex:是一个 pthread_mutex_t 类型指针,指向一个互斥锁对象
// 返回值:调用成功返回 0;失败将返回一个非 0 值的错误码
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
条件变量并不保存状态信息,只是传递应用程序状态信息的一种通讯机制
- 如果调用 pthread_cond_signal() 和 pthread_cond_broadcast() 向指定条件变量发送信号时,若无任何线程等待该条件变量,这个信号也就会不了了之
- 当调用 pthread_cond_broadcast() 同时唤醒所有线程时,互斥锁也只能被某一线程锁住,其它线程获取锁失败又会陷入阻塞
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
static pthread_mutex_t mutex; // 定义互斥锁
static pthread_cond_t cond; // 定义条件变量
static int g_avail = 0; // 全局共享资源
/* 消费者线程 */
static void *consumer_thread(void *arg) {
for (;;) {
pthread_mutex_lock(&mutex); // 上锁
while (g_avail <= 0)
pthread_cond_wait(&cond, &mutex); // 等待条件满足
while (g_avail > 0)
g_avail--; // 消费
pthread_mutex_unlock(&mutex); // 解锁
}
return (void *)0;
}
/* 主线程(生产者) */
int main(int argc, char *argv[]) {
pthread_t tid;
int ret;
/* 初始化互斥锁和条件变量 */
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cond, NULL);
/* 创建新线程 */
ret = pthread_create(&tid, NULL, consumer_thread, NULL);
if (ret) {
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
exit(-1);
}
for (;;) {
pthread_mutex_lock(&mutex); // 上锁
g_avail++; // 生产
pthread_mutex_unlock(&mutex); // 解锁
pthread_cond_signal(&cond); // 向条件变量发送信号
}
exit(0);
}
在访问共享资源之前对自旋锁进行上锁,在访问完成后释放自旋锁(解锁)
如果在获取自旋锁时,自旋锁处于未锁定状态,那么将立即获得锁(对自旋锁上锁);如果在获取自旋锁时,自旋锁已经处于锁定状态,那么获取锁操作将会在原地 “自旋”,直到该自旋锁的持有者释放了锁
自旋锁的不足之处
试图对同一自旋锁加锁两次必然会导致死锁,而试图对同一互斥锁加锁两次不一定会导致死锁,原因在于:当互斥锁设置为 PTHREAD_MUTEX_ERRORCHECK 类型时,会进行错误检查,第二次加锁会返回错误,所以不会进入死锁状态
自旋锁与互斥锁之间的区别
#include <pthread.h>
// lock:指向了需要进行初始化或销毁的自旋锁对象
// pshared:表示自旋锁的进程共享属性
// PTHREAD_PROCESS_SHARED:共享自旋锁。该自旋锁可以在多个进程中的线程之间共享
// PTHREAD_PROCESS_PRIVATE:私有自旋锁。只有本进程内的线程才能够使用该自旋锁
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
使用 pthread_spin_lock() 或 pthread_spin_trylock() 对自旋锁进行加锁,前者在未获取到锁时一直 “自旋”,后者如果未能获取到锁,就立刻返回错误码为 EBUSY,使用 pthread_spin_unlock() 对自旋锁进行解锁
#include <pthread.h>
// 参数 lock 指向自旋锁对象,调用成功返回 0,失败将返回一个非 0 值的错误码
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
示例:使用自旋锁实现线程同步
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
static pthread_spinlock_t spin; // 定义自旋锁
static int g_count = 0;
static void *new_thread_start(void *arg) {
int loops = *((int *)arg);
int l_count, j;
for (j = 0; j < loops; j++) {
pthread_spin_lock(&spin); // 自旋锁上锁
l_count = g_count;
l_count++;
g_count = l_count;
pthread_spin_unlock(&spin); // 自旋锁解锁
}
return (void *)0;
}
static int loops;
int main(int argc, char *argv[]) {
pthread_t tid1, tid2;
int ret;
/* 获取用户传递的参数 */
if (2 > argc)
loops = 10000000; // 没有传递参数默认为 1000 万次
else
loops = atoi(argv[1]);
/* 初始化自旋锁(私有) */
pthread_spin_init(&spin, PTHREAD_PROCESS_PRIVATE);
/* 创建 2 个新线程 */
ret = pthread_create(&tid1, NULL, new_thread_start, &loops);
if (ret) {
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
exit(-1);
}
ret = pthread_create(&tid2, NULL, new_thread_start, &loops);
if (ret) {
fprintf(stderr, "pthread_create error: %s\n", strerror(ret));
exit(-1);
}
/* 等待线程结束 */
ret = pthread_join(tid1, NULL);
if (ret) {
fprintf(stderr, "pthread_join error: %s\n", strerror(ret));
exit(-1);
}
ret = pthread_join(tid2, NULL);
if (ret) {
fprintf(stderr, "pthread_join error: %s\n", strerror(ret));
exit(-1);
}
/* 打印结果 */
printf("g_count = %d\n", g_count);
/* 销毁自旋锁 */
pthread_spin_destroy(&spin);
exit(0);
}
互斥锁或自旋锁要么是加锁状态、要么是不加锁状态,而且一次只有一个线程可以对其加锁
读写锁有 3 种状态:读模式下的加锁状态(读加锁状态)、写模式下的加锁状态(写加锁状态)和不加锁状态
读写锁有如下两个规则
读写锁适合于对共享数据读的次数远大于写次数的情况,读写锁也叫做共享互斥锁
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
#include <pthread.h>
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr);
以读模式对读写锁进行上锁,需要调用 pthread_rwlock_rdlock() 函数;以写模式对读写锁进行上锁,调用 pthread_rwlock_wrlock() 函数,不管是以何种方式锁住读写锁,均可以调用 pthread_rwlock_unlock() 解锁
#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
示例:使用读写锁实现线程同步
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
static pthread_rwlock_t rwlock; // 定义读写锁
static int g_count = 0;
static void *read_thread(void *arg) {
int number = *((int *)arg);
int j;
for (j = 0; j < 10; j++) {
pthread_rwlock_rdlock(&rwlock); // 以读模式获取锁
printf("读线程<%d>, g_count=%d\n", number+1, g_count);
pthread_rwlock_unlock(&rwlock); // 解锁
sleep(1);
}
return (void *)0;
}
static void *write_thread(void *arg) {
int number = *((int *)arg);
int j;
for (j = 0; j < 10; j++) {
pthread_rwlock_wrlock(&rwlock); // 以写模式获取锁
printf("写线程<%d>, g_count=%d\n", number+1, g_count+=20);
pthread_rwlock_unlock(&rwlock); // 解锁
sleep(1);
}
return (void *)0;
}
static int nums[5] = {0, 1, 2, 3, 4};
int main(int argc, char *argv[]) {
pthread_t tid[10];
int j;
/* 对读写锁进行初始化 */
pthread_rwlock_init(&rwlock, NULL);
/* 创建 5 个读 g_count 变量的线程 */
for (j = 0; j < 5; j++)
pthread_create(&tid[j], NULL, read_thread, &nums[j]);
/* 创建 5 个写 g_count 变量的线程 */
for (j = 0; j < 5; j++)
pthread_create(&tid[j+5], NULL, write_thread, &nums[j]);
/* 等待线程结束 */
for (j = 0; j < 10; j++)
pthread_join(tid[j], NULL); // 回收线程
/* 销毁自旋锁 */
pthread_rwlock_destroy(&rwlock);
exit(0);
}
$ gcc rwlock.c -o rwlock -l pthread
$ ./rwlock
读线程<2>, g_count=0
读线程<1>, g_count=0
读线程<4>, g_count=0
读线程<5>, g_count=0
读线程<3>, g_count=0
写线程<3>, g_count=20
写线程<1>, g_count=40
写线程<2>, g_count=60
写线程<5>, g_count=80
写线程<4>, g_count=100
读线程<4>, g_count=100
...
读写锁的属性使用 pthread_rwlockattr_t 数据类型来表示,当定义 pthread_rwlockattr_t 对象时
#include <pthread.h>
int pthread_rwlockattr_destroy(pthread_rwlockattr_t *attr);
int pthread_rwlockattr_init(pthread_rwlockattr_t *attr);
读写锁只有一个属性,那便是进程共享属性
#include <pthread.h>
// attr:指向 pthread_rwlockattr_t 对象
// pshared:获取读写锁的共享属性,将其保存在参数 pshared 所指向的内存中
int pthread_rwlockattr_getpshared(const pthread_rwlockattr_t *attr, int *pshared);
// attr:指向 pthread_rwlockattr_t 对象
// pshared:设置读写锁的共享属性,将其设置为参数 pshared 指定的值
// PTHREAD_PROCESS_SHARED:共享读写锁。该读写锁可以在多个进程中的线程之间共享
// PTHREAD_PROCESS_PRIVATE:私有读写锁。只有本进程内的线程才能够使用该读写锁,这是读写锁共享属性的默认值
int pthread_rwlockattr_setpshared(pthread_rwlockattr_t *attr, int pshared);
示例
pthread_rwlock_t rwlock; // 定义读写锁
pthread_rwlockattr_t attr; // 定义读写锁属性
/* 初始化读写锁属性对象 */
pthread_rwlockattr_init(&attr);
/* 将进程共享属性设置为 PTHREAD_PROCESS_PRIVATE */
pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_PRIVATE);
/* 初始化读写锁 */
pthread_rwlock_init(&rwlock, &attr);
......
/* 使用完之后 */
pthread_rwlock_destroy(&rwlock); // 销毁读写锁
pthread_rwlockattr_destroy(&attr); // 销毁读写锁属性对象