多线程并发编程硬核指南:从互斥锁到生产者模型的全场景实战与原理揭秘
文章目录
- 引言:
- 一、线程互斥
- 1.1 进程线程间的互斥相关背景概念
- 1.2 多线程可能出现的问题
- 情况一:使用过期值
- 情况二:重新读取值
- 1.3 mutex
- 1.3.1 基本概念
- 1.3.2 std::mutex
- 1.3.3 锁的管理类
- 1.3.4 常见问题与解决方案
- 1.3.5 使用场景
- 1.3.6 示例代码
- 二、线程同步
- 2.1 为什么需要线程同步?
- 2.2 条件变量
- 2.3 同步概念与竞态条件
- 2.4 条件变量函数
- 2.4.1 初始化条件变量
- 2.4.2 销毁条件变量
- 2.4.3 等待条件(阻塞)
- 2.4.4 定时等待
- 2.4.5 唤醒单个线程
- 2.4.6 唤醒所有线程
- 2.4.7 条件变量使用规范
- 2.5 生产者消费者模型
- 2.5.1 模型概述
- 2.5.2 关键组件解析
- 2.5.3 模型工作原理
- 2.5.4 模型优点
- 2.5.5 基于阻塞队列的生产者消费者模型代码示例
- 2.6 为什么 pthread_cond_wait 需要互斥量?
- 三、线程安全和重入问题
- 3.1 概念
引言:
在现代软件开发中,多线程编程已成为提升系统性能和响应能力的必备技能。从服务器后端的高并发请求处理,到客户端应用的 UI 与逻辑异步解耦,线程间的协同与竞争始终是绕不开的核心命题。然而,当多个线程同时访问共享资源时,数据竞争、死锁、超卖等问题往往让开发者束手无策 —— 正如经典的 “抢票场景” 中,ticket--
这一看似简单的操作,在汇编层面竟被拆解为三步非原子操作,最终导致重复票号与负数票额的诡异现象。
本文将从底层原理出发,系统剖析线程互斥与同步机制的实现逻辑:通过互斥锁(Mutex)保护临界区的原子性操作,借助条件变量(Condition Variable)实现生产者与消费者的高效协同,最终构建出支持多线程并发的阻塞队列模型。同时,我们将深入探讨线程安全与可重入性的本质区别 —— 为何加锁的函数是线程安全的,却可能因重入导致死锁?如何通过 RAII 机制与原子操作避免这些陷阱?
无论你是初涉多线程编程的新手,还是希望夯实并发基础的开发者,本文都将通过 “问题场景→原理分析→代码实战” 的递进式讲解,带你掌握从基础互斥到复杂生产者消费者模型的全流程实现,最终写出稳定、高效的线程安全代码。
一、线程互斥
1.1 进程线程间的互斥相关背景概念
- 临界资源:多线程执行流共享的资源就叫做临界资源
- 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性(后面讨论如何实现)不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成
1.2 多线程可能出现的问题
- 大部分情况,线程使用的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
- 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
- 多个线程并发的操作共享变量,会带来一些问题。
看下面模拟抢票的代码,可以看出多线程使用时的一些问题:
1 #include <stdio.h>2 #include <stdlib.h>3 #include <string.h>4 #include <unistd.h>5 #include <pthread.h>6 7 8 int ticket = 100;9 10 void *route(void *arg)11 {12 char *id = (char*)arg;13 while(1) {14 if(ticket > 0) {15 usleep(1000);16 printf("%s sells ticket:%d\n", id, ticket);17 ticket--;18 } else {19 break;20 }21 }22 }23 24 25 int main()26 {27 pthread_t t1, t2, t3, t4;28 29 pthread_create(&t1, NULL, route, "thread 1");30 pthread_create(&t2, NULL, route, "thread 2");31 pthread_create(&t3, NULL, route, "thread 3");32 pthread_create(&t4, NULL, route, "thread 4"); 33 34 pthread_join(t1, NULL);35 pthread_join(t2, NULL);36 pthread_join(t3, NULL);37 pthread_join(t4, NULL);38 39 return 0;40 }41
运行结果:
可以看到,在模拟多线程进行抢票的时候,出现了几个人在买完票时抢到了相同票号的情况,甚至还有票的数量竟然小于 0 的情况,这是很严重的问题。可是为什么会发生这样的情况呢?
其实主要问题出现在 ticket--
上,这看上去是一条语句,但其实当他被转换成汇编代码时,最少也是三条语句:
# C 代码对应: ticket--
movl ticket, %eax # 1. 读取阶段:将 ticket 值加载到 eax 寄存器
decl %eax # 2. 修改阶段:将 eax 寄存器的值减 1
movl %eax, ticket # 3. 写入阶段:将新值写回 ticket 内存地址
所以 --
的操作并非原子的
情况一:使用过期值
这就能解释为什么会出现多个人抢到相同票号的情况了:
假设线程 t1
、t2
是同时执行的,且 ticket == 1
,线程 t1
读取到了 ticket == 1
,将其加载到寄存器 eax
中,满足条件进入循环,此时 t1
被系统中断,切换到 t2
,t2
读取了 ticket == 1
,将其加载到寄存器 ebx
中 ,然后它将寄存器中的值减一,再将新值写入到 ticket
内存地址处,此时 ticket
内存地址处的值为 0;而后线程 t1
被恢复,虽然 ticket
的值为 0,但是寄存器 eax
的值为1,在其进行减一和写回到 ticket
的内存地址之后,ticket
的值仍为 0,所以就会出现抢到了相同票号的情况。
情况二:重新读取值
那为什么会出现超卖情况呢?
假设线程 t1
、t2
、t3
、t4
同时执行,而此时 ticket == 1
,这四个线程都认为自己满足执行条件,进入循环中,线程 t1
、t2
按照上面的方式使得出现了两次 ticket == 0
,但是因为 t4
被长时间中断,它会重新读取 ticket
内存地址中的值,然后再执行 --
操作,然后再将其写回 ticket
的内存地址中,这就导致了 ticket < 0
的情况。
1.3 mutex
那么如何解决上面说的问题呢?加锁!
1.3.1 基本概念
- 互斥锁(Mutex):一种同步原语,确保同一时间只有一个线程可以访问共享资源。
- 临界区(Critical Section):访问共享资源的代码段,需要用互斥锁保护。
- 原子性(Atomicity):锁操作是原子的,确保线程安全。
1.3.2 std::mutex
最基本的互斥锁,不支持递归锁定:
#include <mutex>std::mutex mtx;void sharedResourceAccess() {mtx.lock(); // 加锁// 临界区代码mtx.unlock(); // 解锁
}
注意:手动调用lock()
和unlock()
需确保成对出现,否则可能导致死锁。推荐使用 RAII 风格的std::lock_guard
或std::unique_lock
:
void safeAccess() {std::lock_guard<std::mutex> lock(mtx); // 自动加锁// 临界区代码
} // 作用域结束自动解锁
1.3.3 锁的管理类
C++ 提供了 RAII 风格的锁管理类,自动处理锁的生命周期:
std::lock_guard
最简单的锁管理器,构造时加锁,析构时解锁:{std::lock_guard<std::mutex> lock(mtx);// 临界区 } // 自动解锁
std::unique_lock
更灵活的锁管理器,支持延迟加锁、转移所有权和条件变量:{std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁if (needLock) {lock.lock(); // 手动加锁}// 临界区 } // 自动解锁
std::scoped_lock
C++17 引入,用于同时锁定多个互斥锁,避免死锁:std::mutex mtx1, mtx2;void safeFunction() {std::scoped_lock lock(mtx1, mtx2); // 原子性地锁定多个锁// 临界区 }
1.3.4 常见问题与解决方案
- 死锁(Deadlock)
多个线程互相等待对方释放锁,导致程序冻结。
解决方案:
- 按固定顺序加锁。
- 使用
std::lock
或std::scoped_lock
同时锁定多个锁。
- 锁粒度(Lock Granularity)
- 过粗:多个无关操作共用同一锁,导致性能下降。
- 过细:管理过多锁增加复杂度,可能引发死锁。
优化原则:
- 最小化锁的持有时间。
- 对独立资源使用独立锁。
- 性能问题
互斥锁的竞争可能成为性能瓶颈。
优化方案:
- 对于简单操作,优先使用
std::atomic
(无锁编程)。 - 对于读多写少的场景,使用
std::shared_mutex
。
1.3.5 使用场景
- 保护共享数据:如全局变量、容器(
std::vector
、std::map
等)。 - 实现线程安全的类:在类内部使用互斥锁封装共享资源的访问。
- 控制并发访问:限制同时执行某个操作的线程数量。
1.3.6 示例代码
1 #include <cstdio>2 #include <cstdlib>3 #include <cstring>4 #include <unistd.h>5 #include <pthread.h>6 7 8 int ticket = 1000;9 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;;10 11 void *route(void *arg)12 {13 char *id = (char *)arg;14 while(1)15 {16 pthread_mutex_lock(&lock);17 if(ticket > 0)18 {19 usleep(1000);20 printf("%s sells ticket: %d\n", id, ticket);21 ticket--;22 pthread_mutex_unlock(&lock);23 }24 else25 {26 pthread_mutex_unlock(&lock);27 28 break;29 }30 }31 return nullptr;32 } 33 34 35 int main()36 {37 pthread_t t1, t2, t3, t4;38 39 pthread_create(&t1, NULL, route, (void *)"thread 1");40 pthread_create(&t2, NULL, route, (void *)"thread 2");41 pthread_create(&t3, NULL, route, (void *)"thread 3");42 pthread_create(&t4, NULL, route, (void *)"thread 4");43 44 pthread_join(t1, NULL);45 pthread_join(t2, NULL);46 pthread_join(t3, NULL);47 pthread_join(t4, NULL);48 return 0;49 }
运行结果:
这样就不会出现上面那两个问题了
二、线程同步
线程同步是多线程编程中的核心概念,用于协调多个线程的执行顺序,确保它们正确、有序地访问共享资源,防止出现竞态条件(race condition)和数据不一致等问题。
2.1 为什么需要线程同步?
当多个线程同时访问共享资源(如全局变量、文件、内存区域等)时,如果没有适当的同步机制,可能会导致:
- 数据竞争(Data Race):多个线程同时读写同一数据
- 竞态条件(Race Condition):结果依赖于线程执行的顺序
- 死锁(Deadlock):线程相互等待对方释放资源
- 资源耗尽:无限制地创建线程或等待资源
2.2 条件变量
- 当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
- 例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情况就需要用到条件变量。
2.3 同步概念与竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步
- 竞态条件:因为时序问题,而导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理解
2.4 条件变量函数
2.4.1 初始化条件变量
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
- 功能:初始化条件变量
- 参数:
cond
:指向条件变量的指针attr
:条件变量属性,通常设为NULL
(使用默认属性)
- 返回值:成功返回
0
,失败返回错误码
2.4.2 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
- 功能:销毁条件变量,释放相关资源
- 参数:
cond
:指向条件变量的指针 - 返回值:成功返回
0
,失败返回错误码
2.4.3 等待条件(阻塞)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
- 功能:在条件变量上等待,同时释放关联的互斥锁
- 参数:
cond
:指向条件变量的指针mutex
:指向关联互斥锁的指针
- 返回值:成功返回
0
,失败返回错误码 - 行为:
- 原子地解锁互斥锁
- 阻塞当前线程,直到被唤醒
- 被唤醒后重新锁定互斥锁
- 返回调用线程
2.4.4 定时等待
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,const struct timespec *abstime);
- 功能:带超时的条件等待
- 参数:
cond
:指向条件变量的指针mutex
:指向关联互斥锁的指针abstime
:绝对超时时间(使用clock_gettime(CLOCK_REALTIME, ...)
获取)
- 返回值:
- 成功返回
0
- 超时返回
ETIMEDOUT
- 失败返回其他错误码
- 成功返回
2.4.5 唤醒单个线程
int pthread_cond_signal(pthread_cond_t *cond);
- 功能:唤醒至少一个在条件变量上等待的线程
- 参数:
cond
:指向条件变量的指针 - 返回值:成功返回
0
,失败返回错误码 - 注意:如果有多个线程等待,唤醒哪一个取决于调度策略
2.4.6 唤醒所有线程
int pthread_cond_broadcast(pthread_cond_t *cond);
- 功能:唤醒所有在条件变量上等待的线程
- 参数:
cond
:指向条件变量的指针 - 返回值:成功返回
0
,失败返回错误码 - 注意:所有等待线程都将被唤醒并竞争互斥锁
2.4.7 条件变量使用规范
- 等待条件代码
1 pthread_mutex_lock(&mutex);
2 while (条件为假)
3 pthread_cond_wait(cond, mutex);
4 // 修改条件
5 pthread_mutex_unlock(&mutex);
- 给条件发送信号代码
1 pthread_mutex_lock(&mutex);
2 // 设置条件为真
3 pthread_cond_signal(cond);
4 pthread_mutex_unlock(&mutex);
2.5 生产者消费者模型
2.5.1 模型概述
生产者-消费者模型是多线程编程中最经典的并发设计模式,用于解决生产者和消费者之间速度不匹配的问题。该模型通过一个共享的缓冲区(队列)协调生产者和消费者:
核心组件
- 生产者:生成数据/任务的线程
- 消费者:处理数据/任务的线程
- 缓冲区:共享数据结构(通常是队列)
- 同步机制:互斥锁和条件变量
2.5.2 关键组件解析
- 阻塞队列 (BlockingQueue)
- 使用互斥锁 (
mutex_
) 保护共享队列 - 使用两个条件变量实现同步:
not_empty_
:消费者等待队列非空not_full_
:生产者等待队列有空间
- 支持阻塞操作 (
push
,pop
) 和非阻塞操作
- 使用互斥锁 (
- 生产者 (Producer)
- 每个生产者有唯一ID
- 生产固定数量的项目
- 每个项目包含生产者ID和序列号
- 随机延迟模拟生产过程
- 消费者 (Consumer)
- 每个消费者有唯一ID
- 持续从队列中取出项目
- 随机延迟模拟消费过程
- 收到特殊值 (-1) 时退出
2.5.3 模型工作原理
正常流程
队列满时
队列空时
2.5.4 模型优点
- 解耦生产与消费
- 生产者只需关注数据生成
- 消费者只需关注数据处理
- 双方通过队列交互,不直接依赖
- 平衡负载
- 缓冲队列平滑生产与消费的速度差异
- 防止生产者过快导致消费者过载
- 防止消费者过快导致生产者空闲
- 并发控制
- 支持多生产者多消费者并发工作
- 通过队列自动协调资源分配
- 流量控制
- 有限队列大小提供背压机制
- 防止系统过载导致内存耗尽
2.5.5 基于阻塞队列的生产者消费者模型代码示例
1 #pragma once2 3 #include <iostream>4 #include <string>5 #include <queue>6 #include <pthread.h>7 8 const int defaultcap = 5;9 10 template <typename T>11 class BlockQueue12 {13 private:14 bool IsFull() { return _q.size() >= _cap; }15 bool IsEmpty() { return _q.empty(); }16 17 public:18 BlockQueue(int cap = defaultcap)19 : _cap(cap), _csleep_num(0), _psleep_num(0)20 {21 pthread_mutex_init(&_mutex, nullptr);22 pthread_cond_init(&_full_cond, nullptr);23 pthread_cond_init(&_empty_cond, nullptr);24 }25 void Equeue(const T &in)26 {27 pthread_mutex_lock(&_mutex);28 // 生产者调用29 while(IsFull())30 {31 _psleep_num++;32 std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl; 33 pthread_cond_wait(&_full_cond, &_mutex); 34 _psleep_num--;35 } 36 // 100%确定:队列有空间37 _q.push(in);38 39 // 临时方案40 // v241 if(_csleep_num > 0)42 {43 pthread_cond_signal(&_empty_cond);44 std::cout << "唤醒消费者..." << std::endl;45 } 46 47 // pthread_cond_signal(&_empty_cond); // 可以48 pthread_mutex_unlock(&_mutex); // TODO49 // pthread_cond_signal(&_empty_cond); // 可以50 } 51 T Pop()52 {53 // 消费者调用54 pthread_mutex_lock(&_mutex);55 while(IsEmpty())56 {57 _csleep_num++;58 pthread_cond_wait(&_empty_cond, &_mutex);59 _csleep_num--;60 }61 T data = _q.front();62 _q.pop();63 64 if(_psleep_num > 0) 65 {66 pthread_cond_signal(&_full_cond);67 std::cout << "唤醒消费者" << std::endl;68 }69 70 // pthread_cond_signal(&_full_cond);71 pthread_mutex_unlock(&_mutex);72 return data;73 }74 ~BlockQueue()75 {76 pthread_mutex_destroy(&_mutex);77 pthread_cond_destroy(&_full_cond);78 pthread_cond_destroy(&_empty_cond);79 }80 81 private:82 std::queue<T> _q; // 临界资源83 int _cap; // 容器大小84 85 pthread_mutex_t _mutex;86 pthread_cond_t _full_cond;87 pthread_cond_t _empty_cond;88 89 int _csleep_num; // 消费者休眠的个数90 int _psleep_num; // 生产者休眠的个数91 };
这里只能看出单生产者单消费者,无法看出多生产者多消费者,但其实在使用上,只是在创建生产者消费者线程的数量不同而已。
单生产者单消费者
// 单生产者线程函数
void singleProducer(BlockQueue<int>* bq)
{for (int i = 0; i < 100; ++i) {bq->Equeue(i);}
}// 单消费者线程函数
void singleConsumer(BlockQueue<int>* bq)
{for (int i = 0; i < 100; ++i) {bq->Pop();}
}// 主函数
int main()
{BlockQueue<int> bq;pthread_t producer, consumer;pthread_create(&producer, nullptr, singleProducer, &bq);pthread_create(&consumer, nullptr, singleConsumer, &bq);pthread_join(producer, nullptr);pthread_join(consumer, nullptr);return 0;
}
多生产者多消费者
// 多生产者线程函数(多个线程调用)
void multiProducer(BlockQueue<int>* bq, int id)
{for (int i = 0; i < 50; ++i) {bq->Equeue(id * 100 + i); // 每个生产者生成不同的数据}
}// 多消费者线程函数(多个线程调用)
void multiConsumer(BlockQueue<int>* bq, int id)
{for (int i = 0; i < 50; ++i) {int data = bq->Pop();std::cout << "消费者" << id << "取出: " << data << std::endl;}
}// 主函数
int main()
{BlockQueue<int> bq;const int PRODUCERS = 3;const int CONSUMERS = 2;pthread_t producers[PRODUCERS], consumers[CONSUMERS];// 创建多个生产者线程for (int i = 0; i < PRODUCERS; ++i) {pthread_create(&producers[i], nullptr, multiProducer, &bq);}// 创建多个消费者线程for (int i = 0; i < CONSUMERS; ++i) {pthread_create(&consumers[i], nullptr, multiConsumer, &bq);}// 等待所有线程结束for (int i = 0; i < PRODUCERS; ++i) {pthread_join(producers[i], nullptr);}for (int i = 0; i < CONSUMERS; ++i) {pthread_join(consumers[i], nullptr);}return 0;
}
2.6 为什么 pthread_cond_wait 需要互斥量?
- 条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变得满足,并且友好的通知等待在条件变量上的线程。
- 条件不会无缘无故的突然变得满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
按照上面的说法,我们设计出如下的代码:先上锁,发现条件不满足,解锁,然后等待在条件变量上不就行了,如下代码:
//错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false)
{pthread_mutex_unlock(&mutex);//解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过pthread_cond_wait(&cond) ;pthread_mutex_lock(&mutex) ;pthread_mutex_unlock(&mutex);
}
- 由于解锁和等待不是原子操作。调用解锁之后,
pthread_cond_wait
之前,如果已经有其他线程获取到互斥量,摒弃条件满足,发送了信号,那么pthread_cond_wait
将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait
。所以解锁和等待必须是一个原子操作。 int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t *mutex);
进入该函数后,会去看条件量等于0
不?等于,就把互斥量变成1
,直到cond_wait
返回,把条件量改成1,把互斥量恢复成原样。
三、线程安全和重入问题
3.1 概念
线程安全:就是多个线程在访问共享资源时,能够正确地执行,不会相互干扰或破坏彼此的执行结果。一般而言,多个线程并发同一段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进行操作,并且没有锁保护的情况下,容易出现该问题。
重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则,是不可重入函数。
重入其实可以分为两种情况:
- 多线程重入函数
- 信号导致一个执行流重复进入函数
- 常见的线程不安全的情况
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
- 常见的线程安全的情况
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
- 常见不可重入的情况
- 调用了
malloc/free
函数,因为malloc
函数是用全局链表来管理堆的- 调用了标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
- 常见可重入的情况
- 不使用全局变量或静态变量
- 不使用用
malloc
或者new
开辟出的空间- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据
结论
- 可重入与线程安全联系
- 函数是可重入的,那就是线程安全的
- 函数是不可重入的,那就不能由多个线程使用,有可能引发线程安全问题
- 如果一个函数中有全局变量,那么这个函数既不是线程安全也不是可重入的。
- 可重入与线程安全区别
- 可重入函数是线程安全函数的一种
- 线程安全不一定是可重入的,而可重入函数则一定是线程安全的。
- 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重入函数若锁还未释放则会产生死锁,因此是不可重入的。
注意:
- 如果不考虑 信号导致一个执行流重复进入函数 这种重入情况,线程安全和重入在安全角度不做区分
- 但是线程安全侧重说明线程访问公共资源的安全情况,表现的是并发线程的特点
- 可重入描述的是一个函数是否能被重复进入,表示的是函数的特点