当前位置: 首页 > ds >正文

线程同步与互斥

系统11. 线程同步与互斥

1. 线程互斥

1-1 进程线程间的互斥相关背景概念

  • 临界资源:多线程执⾏流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起
    保护作⽤
  • 原⼦性(后⾯讨论如何实现):不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1-2 互斥量 mutex

  • ⼤部分情况,线程使⽤的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程⽆法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来⼀些问题。
    看下面一段代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{char *id = (char *)arg;while (1){if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;}else{break;}}return nullptr;
}
int main(void)
{pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void*)"thread 1");pthread_create(&t2, NULL, route, (void*)"thread 2");pthread_create(&t3, NULL, route, (void*)"thread 3");pthread_create(&t4, NULL, route, (void*)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);
}

在这里插入图片描述
下面我们来分析一下为什么会出现这样的情况。

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程(线程切换会保留自己的上下文数据当假设我们的ticket从内存加载到寄存器中进行减1操作并没有写回内存此时进程切换再次加载ticket到新进程的寄存器此时的值还是100这就造成了数据不一致问题
  • usleep 这个模拟漫⻓业务的过程,在这个漫⻓的业务过程中,可能有很多个线程会进⼊该代码段
  • –ticket 操作本⾝就不是⼀个原⼦操作我们可以看一下–ticket的反汇编代码
取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152 40064b: 8b 05 e3 04 20 00 mov 0x2004e3(%rip),%eax #
600b34 <ticket>
153 400651: 83 e8 01 sub $0x1,%eax
154 400654: 89 05 da 04 20 00 mov %eax,0x2004da(%rip) #
600b34 <ticket>

– 操作并不是原⼦操作,⽽是对应三条汇编指令:

  • load :将共享变量ticket从内存加载到寄存器中
  • update : 更新寄存器⾥⾯的值,执⾏-1操作
  • store :将新值,从寄存器写回共享变量ticket的内存地址
    因此我们要解决该问题就要进行多临界资源进行保护
  • 代码必须要有互斥⾏为:当代码进⼊临界区执⾏时,不允许其他线程进⼊该临界区。
  • 如果多个线程同时要求执⾏临界区的代码,并且临界区没有线程在执⾏,那么只能允许⼀个线程进⼊该临界区。
  • 如果线程不在临界区中执⾏,那么该线程不能阻⽌其他线程进⼊临界区。
    在这里插入图片描述

互斥量的接⼝
初始化互斥量
初始化互斥量有两种⽅法:

  • ⽅法1,静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
  • ⽅法2,动态分配:

    int pthread_mutex_init(pthread_mutex_t *restrict mutex, const
    pthread_mutexattr_t *restrict attr);
    参数:
    mutex:要初始化的互斥量
    attr:NULL
    

    销毁互斥量

    销毁互斥量需要注意:

    • 使⽤ PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁

    • 不要销毁⼀个已经加锁的互斥量

    • 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁

      int pthread_mutex_destroy(pthread_mutex_t *mutex)

      互斥量加锁和解锁:

      int pthread_mutex_lock(pthread_mutex_t *mutex);
      int pthread_mutex_unlock(pthread_mutex_t *mutex);
      返回值:成功返回0,失败返回错误号
      

      调⽤ pthread_ lock 时,可能会遇到以下情况:

      • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功

      • 发起函数调⽤时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到

      互斥量,那么pthread_ lock调⽤会陷⼊阻塞(执⾏流被挂起),等待互斥量解锁。

      改进上⾯的售票系统:

      #include <stdio.h>
      #include <stdlib.h>
      #include <string.h>
      #include <unistd.h>
      #include <pthread.h>
      int ticket = 100;
      pthread_mutex_t mutex;
      void *route(void *arg)
      {char *id = (char *)arg;while (1){pthread_mutex_lock(&mutex);if (ticket > 0){usleep(1000);printf("%s sells ticket:%d\n", id, ticket);ticket--;pthread_mutex_unlock(&mutex);}else{pthread_mutex_unlock(&mutex);break;}}return nullptr;
      }
      int main(void)
      {pthread_mutex_init(&mutex, NULL);pthread_t t1, t2, t3, t4;pthread_create(&t1, NULL, route, (void *)"thread 1");pthread_create(&t2, NULL, route, (void *)"thread 2");pthread_create(&t3, NULL, route, (void *)"thread 3");pthread_create(&t4, NULL, route, (void *)"thread 4");pthread_join(t1, NULL);pthread_join(t2, NULL);pthread_join(t3, NULL);pthread_join(t4, NULL);pthread_mutex_destroy(&mutex);
      }
      

1-3 互斥量实现原理探究

  • 经过上⾯的例⼦,⼤家已经意识到单纯的 i++ 或者 ++i 都不是原⼦的,有可能会有数据⼀致性问题
  • 为了实现互斥锁操作,⼤多数体系结构都提供了swap或exchange指令,该指令的作⽤是把寄存器和内存单元的数据相交换,由于只有⼀条指令,保证了原⼦性,即使是多处理器平台,访问内存的 总线周期也有先后,⼀个处理器上的交换指令执⾏时另⼀个处理器的交换指令只能等待总线周期。 现在我们把lock和unlock的伪代码改⼀下
    在这里插入图片描述

1-4 互斥量的封装

#pragma once
#include<iostream>
#include<pthread.h>
namespace MutexMoudle
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex,nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex& mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}

2. 线程同步

2-1 条件变量

  • 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如⼀个线程访问队列时,发现队列为空,它只能等待,只到其它线程将⼀个节点添加到队列中(此时我们唤醒一个线程)。这种情况就需要⽤到条件变量。

2-2 同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。在线程场景下,这种问题也不难理

2-3 条件变量函数

//初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL//销毁
int pthread_cond_destroy(pthread_cond_t *cond)//等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
//参数:
//cond:要在这个条件变量上等待
//mutex:互斥量
//等待前释放锁 → 允许其他线程修改条件。
//唤醒后重新加锁 → 确保线程在锁的保护下继续操作共享数据。
//这种设计保障了多线程环境中条件判断与数据访问的原子性和安全性。//唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);

案例:

// #include <stdio.h>
// #include <stdlib.h>
// #include <string.h>
// #include <unistd.h>
// #include <pthread.h>
// int ticket = 100;
// pthread_mutex_t mutex;
// void *route(void *arg)
// {
//     char *id = (char *)arg;
//     while (1)
//     {
//         pthread_mutex_lock(&mutex);
//         if (ticket > 0)
//         {
//             usleep(1000);
//             printf("%s sells ticket:%d\n", id, ticket);
//             ticket--;
//             pthread_mutex_unlock(&mutex);
//         }
//         else
//         {
//             pthread_mutex_unlock(&mutex);
//             break;
//         }
//     }
//     return nullptr;
// }
// int main(void)
// {
//     pthread_mutex_init(&mutex, NULL);
//     pthread_t t1, t2, t3, t4;
//     pthread_create(&t1, NULL, route, (void *)"thread 1");
//     pthread_create(&t2, NULL, route, (void *)"thread 2");
//     pthread_create(&t3, NULL, route, (void *)"thread 3");
//     pthread_create(&t4, NULL, route, (void *)"thread 4");
//     pthread_join(t1, NULL);
//     pthread_join(t2, NULL);
//     pthread_join(t3, NULL);
//     pthread_join(t4, NULL);
//     pthread_mutex_destroy(&mutex);
// }#include <iostream>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
//全局锁和全局条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
void *active(void *arg)
{std::string name = static_cast<const char *>(arg);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex);std::cout << name << " 活动..." << std::endl;pthread_mutex_unlock(&mutex);}
}
int main(void)
{pthread_t t1, t2;pthread_create(&t1, NULL, active, (void *)"thread-1");pthread_create(&t2, NULL, active, (void *)"thread-2");sleep(3); // 可有可⽆,这⾥确保两个线程已经在运⾏while (true){// 对⽐测试//pthread_cond_signal(&cond); // 唤醒⼀个线程pthread_cond_broadcast(&cond); // 唤醒所有线程sleep(1);}pthread_join(t1, NULL);pthread_join(t2, NULL);return 0;
}
/*
$ ./cond
thread-1 活动...
thread-2 活动...
thread-1 活动...
thread-1 活动...
thread-2 活动...
*/

2-4 生产者消费者模型

  • 321原则便于记忆
    1. 3种关系消费者之间互斥,生产者之间互斥,生产者消费之间互斥和同步
    1. 2种角色:消费者和生产者
    1. 1个交易场所
      在这里插入图片描述
      为什么要使用生产者消费模型
      ⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。

2-5 基于 BlockingQueue 的生产者消费者模型

在多线程编程中阻塞队列(Blocking Queue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
在这里插入图片描述
C++ queue模拟阻塞队列的⽣产消费模型:

#include <queue>
#include <iostream>
#include <pthread.h>
const int MAXCAPACITY = 5;
namespace BlockModule
{template <typename T>class BlockQueue{private:bool IsEmpty(){return _q.size() == 0;}bool IsFull(){return _q.size() == MAXCAPACITY;}public:BlockQueue() : _cap(MAXCAPACITY), _csleep_num(0), _psleep_num(0){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_emptycond, nullptr);pthread_cond_init(&_fullcond, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_fullcond);pthread_cond_destroy(&_emptycond);}void Equeue(const T &in){// 多个生产者pthread_mutex_lock(&_mutex);// 生产者进行入队//这里用while是因为当我们一次唤醒多个线程//前一个线程可能已经把队列干满了//但是最后一个进程醒来将会继续入队导致队列满while (IsFull()){// 生产者要去等待// 等待把锁释放可以给消费者_psleep_num++;std::cout << "生产者,进入休眠了: _psleep_num" << _psleep_num << std::endl;pthread_cond_wait(&_fullcond, &_mutex);_psleep_num--;}// 此时插入数据_q.push(in);// 此时队列一定有数据唤醒消费者去消费if (_csleep_num > 0){std::cout << "唤醒消费者..." << std::endl;pthread_cond_signal(&_emptycond);}//解锁pthread_mutex_unlock(&_mutex);}T Pop(){//消费者会进行操作//我们要维护消费者之间的互斥pthread_mutex_lock(&_mutex);//队列有数据才可以进行没有去等待while(IsEmpty()){//此时线程去等待_csleep_num++;std::cout<<"消费者,进入休眠了:"<<_csleep_num<<std::endl;//消费者休眠把锁释放pthread_cond_wait(&_emptycond,&_mutex);_csleep_num--;}//此时一定有数据T data=_q.front();_q.pop();std::cout<<"消费者拿走了一个任务"<<std::endl;//拿走一个数据此时队列一定不为满if(_psleep_num>0){std::cout<<"唤醒生产者"<<std::endl;pthread_cond_signal(&_fullcond);}pthread_mutex_unlock(&_mutex);return data;}private:std::queue<T> _q;int _cap;pthread_mutex_t _mutex;pthread_cond_t _fullcond;  // 队列满生产者去改条件变量等待pthread_cond_t _emptycond; // 消费者等待int _psleep_num; // 消费者休眠的个数int _csleep_num; // 生产者休眠的个数};
}

2-6 为什么 pthread_cond_wait 需要互斥量

  • 条件等待是线程间同步的⼀种⼿段,如果只有⼀个线程,条件不满⾜,⼀直等下去都不会满⾜,所以必须要有⼀个线程通过某些操作,改变共享变量,使原先不满⾜的条件变得满⾜,并且友好的通知等待在条件变量上的线程(从而唤醒等待线程获得锁,去访问临界资源)。
  • 条件不会⽆缘⽆故的突然变得满⾜了,必然会牵扯到共享数据的变化。所以⼀定要⽤互斥锁来保护。没有互斥锁就⽆法安全的获取和修改共享数据。
  • 其实就是当我们要访问临街资源的时候一定涉及到共享资源例如我们的消费者,生产者都会从队列取任务,此时一定我们要维护我们生产者消费者之间的互斥关系
    在这里插入图片描述
  • 按照上⾯的说法,我们设计出如下的代码:先上锁,发现条件不满⾜,解锁,然后等待在条件变量上不就⾏了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
pthread_mutex_unlock(&mutex);
//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
  • 由于解锁和等待不是原⼦操作。调⽤解锁之后,pthread_cond_wait 之前,如果已经有其他线程获取到互斥量,摒弃条件满⾜,发送了信号,那么 pthread_cond_wait 将错过这个信号,可能会导致线程永远阻塞在这个 pthread_cond_wait 。所以解锁和等待必须是⼀个原⼦操作。
  • int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t *mutex); 进⼊该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_ wait返回,把条件量改成1,把互斥量恢复成原样。

场景设定
假设有两个线程:

  1. 线程A(消费者):等待条件count > 0,满足后打印count。
  2. 线程B(生产者):将count从0增加到1,然后发送条件信号。

共享变量:
int count = 0; // 共享条件
pthread_mutex_t mutex; // 互斥锁
pthread_cond_t cond; // 条件变量
错误代码的执行流程
以下是原错误代码的消费者线程逻辑:

// 线程A的代码(错误版本)
pthread_mutex_lock(&mutex);
while (count <= 0) {       // 条件检查:count是否大于0?pthread_mutex_unlock(&mutex);  // 解锁pthread_cond_wait(&cond);      // 等待条件变量(错误!未传递mutex)pthread_mutex_lock(&mutex);    // 重新加锁
}
printf("count = %d\n", count);
pthread_mutex_unlock(&mutex);

假设线程B的代码如下:

// 线程B的代码(生产者)
pthread_mutex_lock(&mutex);
count = 1;                 // 修改条件
pthread_cond_signal(&cond); // 发送信号
pthread_mutex_unlock(&mutex);

时间线:信号如何丢失?
我们模拟以下执行顺序:

线程A(消费者)启动:

  • 加锁mutex。

  • 检查count <= 0(此时count=0,条件为真)。进入循环体,解锁mutex。

  • 准备调用pthread_cond_wait,但尚未执行(操作系统调度暂停线程A)。

线程B(生产者)启动:

  • 加锁mutex(因为线程A已解锁)。

  • 设置count = 1。

  • 发送信号pthread_cond_signal(&cond)。
    解锁mutex。

  • 此时信号已经发出,但线程A尚未进入等待状态!

线程A恢复执行:

  • 调用pthread_cond_wait(&cond),但此时信号已被错过,按照原先逻辑消费者此时会去打印但是错过信号就算count大于0也会进行等待。

  • 线程A永久阻塞在pthread_cond_wait中,即使count已经是1。


关键问题

  • 信号丢失的根本原因:线程A在解锁后、等待前的极短时间窗口内,线程B可能修改条件并发送信号。由于线程A尚未进入等待状态,信号会完全丢失。
  • 条件检查不再受保护:线程A在重新加锁前,无法确保条件未被其他线程修改。
    正确代码
// 线程A的正确代码
pthread_mutex_lock(&mutex);
while (count <= 0) {pthread_cond_wait(&cond, &mutex); // 原子地解锁+等待,唤醒时自动加锁
}
printf("count = %d\n", count);
pthread_mutex_unlock(&mutex);
  1. 线程A加锁后检查条件。
  2. 若条件不满足,调用pthread_cond_wait,原子地释放锁并进入等待。
  3. 线程B修改条件并发送信号时,线程A要么:
  • 已经处于等待状态(能正常接收信号),或
  • 仍在持有锁(线程B会阻塞在pthread_mutex_lock,直到线程A进入等待并释放锁)。
  1. 信号永远不会丢失,条件检查始终在锁的保护下完成。

2-7 条件变量使用规范

  • 确保pthread_cond_wait在内部原子地释放锁并进入等待,确保信号不会丢失。
  • 所有对共享条件的访问和修改都在锁的保护下进行。
  • 处理虚假唤醒:使用while循环(而非if)检查条件,防止虚假唤醒导致逻辑错误。

2-8 条件变量的封装

//Mutex.cpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexMoudle
{class Mutex{public:pthread_mutex_t *Get(){return &_mutex;}Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){pthread_mutex_lock(&_mutex);}void Unlock(){pthread_mutex_unlock(&_mutex);}~Mutex(){pthread_mutex_destroy(&_mutex);}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex) : _mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}//cond.hpp
#pragma once#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"using namespace MutexMoudle;namespace CondModule
{class Cond{public:Cond(){pthread_cond_init(&_cond, nullptr);}void Wait(Mutex &mutex){int n = pthread_cond_wait(&_cond, mutex.Get());(void)n;}void Signal(){// 唤醒在条件变量下等待的一个线程int n = pthread_cond_signal(&_cond);(void)n;}void Broadcast(){// 唤醒所有在条件变量下等待的线程int n = pthread_cond_broadcast(&_cond);(void)n;}~Cond(){pthread_cond_destroy(&_cond);}private:pthread_cond_t _cond;};
};

2-9 POSIX信号量

POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。信号量可以把临界资源分为几块来进行访问我们的互斥锁其实是个二元信号量,信号量本质就是一个计数器代表我们把临界资源分为几部分进行使用,可以理解为去电影院看电影,只要我们有票就算我们不去,那个座位也会给我留着,因此信号量就是对临街资源的预定机制,大小代表资源的个数。
在这里插入图片描述
在这里插入图片描述


初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表⽰线程间共享,⾮零表⽰进程间共享
value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1int sem_post(sem_t *sem);//V()
2-9-1 基于环形队列的⽣产消费模型
  • 环形队列采⽤数组模拟,⽤模运算来模拟环状特性
    *在这里插入图片描述

  • 环形结构起始状态和结束状态都是⼀样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态
    在这里插入图片描述

  • 但是我们现在有信号量这个计数器,就很简单的进⾏多线程间的同步过程。


信号量的封装:

#pragma once
#include <iostream>
#include <semaphore.h>namespace SemModule
{class Sem{public:Sem(int n){sem_init(&_sem,0,n);}~Sem(){sem_destroy(&_sem);}void P(){//等待信号量信号值-1sem_wait(&_sem);}void V(){sem_post(&_sem);}private:sem_t _sem;};
}
	//RunningQueue.hpp
#include "sem.hpp"
#include <vector>
using namespace SemModule;
const int MAXQUEUECAP = 5;
namespace RunningQueueModule
{template <typename T>class RuningQueue{private:void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}public:RuningQueue(): _capacity(MAXQUEUECAP),_pindex(0),_cindex(0),_q(MAXQUEUECAP),_room_sem(MAXQUEUECAP),_data_sem(0){pthread_mutex_init(&_cmutex, nullptr);pthread_mutex_init(&_pmutex, nullptr);}~RuningQueue(){pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}void Enqueue(const T &in){// 生产者//先预定资源_room_sem.P();Lock(_pmutex);_q[_pindex]=in;_pindex++;_pindex%=_capacity;_data_sem.V();Unlock(_pmutex);}void Pop(T *out){// 消费者_data_sem.P();Lock(_cmutex);*out=_q[_cindex];_cindex++;_cindex%=_capacity;_room_sem.V();Unlock(_cmutex);}private:std::vector<T> _q; // 模拟环形队列int _capacity;int _pindex;             // 生产者下标int _cindex;             // 消费者下标pthread_mutex_t _pmutex; // 维护生产者之间的互斥关系的锁pthread_mutex_t _cmutex; // 消费者Sem _room_sem; // ⽣产者关⼼Sem _data_sem; // 消费者关⼼};
}

3. 线程池

3-1 日志与策略模式

什么是设计模式
⼤佬们针对⼀些经典的常⻅的场景, 给定了⼀些对应的解决⽅案, 这个就是 设计模式
⽇志认识
计算机中的⽇志是记录系统和软件运⾏中发⽣事件的⽂件,主要作⽤是监控运⾏状态、记录异常信息,帮助快速定位问题并⽀持程序员进⾏问题修复。它是系统维护、故障排查和安全管理的重要⼯具。
⽇志格式以下⼏个指标是必须得有的

  • 时间戳
  • ⽇志等级
  • ⽇志内容
    我们本次设计的日志如下格式`
[可读性很好的时间] [⽇志等级] [进程pid] [打印对应⽇志的⽂件名][⾏号] - 消息内容,⽀持可
变参数
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [17] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [18] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [20] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [21] - hello world
[2024-08-04 12:27:03] [WARNING] [202938] [main.cc] [23] - hello world
#pragma once
#include <iostream>
#include <string>
#include "Mutex.hpp"
#include <filesystem> // C++17, 需要⾼版本编译器和-std=c++17
#include <fstream>
#include <memory>
#include <unistd.h>
#include <sstream>
namespace LogModule
{std::string sep = "\r\n";using namespace MutexMoudle;// 定义日志的刷新方式class LogStrategy{public:virtual ~LogStrategy() = default;                     // 策略的构造函数virtual void SyncLog(const std::string &message) = 0; // 不同模式核⼼是刷};// 显示器刷新class ScreenStrtegy : public LogStrategy{public:~ScreenStrtegy() {};void SyncLog(const std::string &message) override{LockGuard lock(_mutex);std::cout << message << sep;}private:Mutex _mutex;};const std::string defaultpath = "./";const std::string defaultfile = "log.log";class FileLogStrategy : public LogStrategy{public:FileLogStrategy(std::string path = defaultpath, std::string file = defaultfile): _path(path), _file(file){LockGuard lock(_mutex);// 先构建我们的路径if (std::filesystem::exists(_path))return;try{std::filesystem::create_directories(_path);}catch (const std::filesystem::filesystem_error &e){std::cerr << e.what() << '\n';}}~FileLogStrategy() {}void SyncLog(const std::string &message) override{LockGuard lock(_mutex);std::string drc = _path + (_path.back() == '/' ? "" : "/") + _file;std::ofstream out(drc.c_str(), std::ios::app); // 追加⽅式if (!out.is_open())return;out << message << sep;out.close();}private:std::string _path;std::string _file;Mutex _mutex;};// ⽇志等级enum class LogLevel{DEBUG,INFO,WARNING,ERROR,FATAL};// ⽇志转换成为字符串std::string LogLevelToString(LogLevel level){switch (level){case LogLevel::DEBUG:return "DEBUG";case LogLevel::INFO:return "INFO";case LogLevel::WARNING:return "WARNING";case LogLevel::ERROR:return "ERROR";case LogLevel::FATAL:return "FATAL";default:return "UNKNOWN";}}std::string GetCurtime(){time_t tm = time(nullptr);struct tm curr;localtime_r(&tm, &curr);char buffer[1024];snprintf(buffer, sizeof(buffer), "%4d-%02d-%02d %02d:%02d:%02d",curr.tm_year + 1900, curr.tm_mon, curr.tm_mday,curr.tm_hour, curr.tm_min, curr.tm_sec);return std::string(buffer);}// Log负责日志的策略选择以及刷新class Log{public:Log(){UseConsoleStrategy();}void UseConsoleStrategy(){_ptr = std::make_unique<ScreenStrtegy>();}void UseFileStrategy(){_ptr = std::make_unique<FileLogStrategy>();}// 改类负责形成完整的语句class logmessgge{public:logmessgge(LogLevel &type, std::string filename, int line, Log &log): _type(type),_curr_time(GetCurtime()),_pid(getpid()),_filename(filename),_line(line),_log(log){// 完成日志左边的写std::stringstream in;in<< "[" << _curr_time << "]" << " "<< "[" << LogLevelToString(_type) << "]"<<" "<< "[" << _pid << "]" << " "<< "[" << _filename << "]" << " "<< "[" << _line << "]" << " " << "-";_loginfo = in.str();}~logmessgge(){// 实现自动调用我们的刷新逻辑// 因此我们要传入我们的外部类Log_log._ptr->SyncLog(_loginfo);}template <typename T>logmessgge &operator<<(const T &data){// 支持重载std::stringstream in;in << data;_loginfo += in.str();return *this; // 为了支持连续进行输入}private:LogLevel _type;         // ⽇志等级std::string _curr_time; // ⽇志时间pid_t _pid;             //std::string _filename;  // 对应的⽂件名int _line;              // 对应的⽂件⾏号std::string _loginfo;Log &_log; // 外部类对象引用方便我们自动刷新};~Log() {}// Log(level)<<"hellowore"// 我们想要如上面调用我们的日志进行我们的()重载// 返回值写成拷贝返回// 临时对象这样我们进行日志写入会自动// 调用logmessge的析构函数进行刷新// 返回值为logmessage是为了调用我们的<<// 因此该函数就是支持把Log可以转化为logmessage类型//函数返回一个临时对象的引用时,//临时对象会在函数返回后立即被析构,导致返回的引用成为悬垂引用(Dangling Reference)//但是我们想要的是调用<<之后在结束因此//我们进行拷贝这样就延长了临时对象的生命周期//logmessgge operator()(LogLevel type, std::string filename, int line){return logmessgge(type, filename, line, *this);}private:std::unique_ptr<LogStrategy> _ptr;};// 全局的日志对象Log mylog;
// 调用operator()
//具体就是先调用()去构造logmessage的信息接着调用>>完成对右边部分的填充接着析构调用刷新
#define LOG(level) mylog(level, __FILE__, __LINE__)// 提供选择使⽤何种⽇志策略的⽅法
#define ENABLE_CONSOLE_LOG_STRATEGY() mylog.UseConsoleStrategy()
#define ENABLE_FILE_LOG_STRATEGY() mylog.UseFileStrategy()
}

时间的获得:
在这里插入图片描述
在这里插入图片描述

3-2 线程池设计

线程池:
⼀种线程使⽤模式。线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并发处理器、处理器内核、内存、⽹络sockets等的数量。

线程池的应⽤场景:

  • 需要⼤量的线程来完成任务,且完成任务的时间⽐较短。 ⽐如WEB服务器完成⽹⻚请求这样的任务,使⽤线程池技术是⾮常合适的。因为单个任务⼩,⽽任务数量巨⼤,你可以想象⼀个热⻔⽹站的点击次数。 但对于⻓时间的任务,⽐如⼀个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间⽐线程的创建时间⼤多了。
  • 对性能要求苛刻的应⽤,⽐如要求服务器迅速响应客⼾请求。
  • 接受突发性的⼤量请求,但不⾄于使服务器因此产⽣⼤量线程的应⽤。突发性⼤量客⼾请求,在没有线程池情况下,将产⽣⼤量线程,虽然理论上⼤部分操作系统线程数⽬最⼤值不是问题,短时间内产⽣⼤量线程可能使内存到达极限,出现错误

线程池的种类:

  • 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执⾏任务对象中的任务接⼝
  • 浮动线程池,其他同上
    在这里插入图片描述
    PthreadPool.hpp
#pragma once
#include "Mutex.hpp"
#include "cond.hpp"
#include "log.hpp"
#include "pthread.hpp"
#include <vector>
#include <queue>
namespace PthreadPoolModule
{using namespace MutexMoudle;using namespace CondModule;using namespace LogModule;using namespace PthreadModlue;template <typename T>class PthreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if (_sleepnum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}PthreadPool(int num = 5): _num(num),_isrunning(false),_sleepnum(0){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){ HandlerTask(); });}}public:// 创建单例static PthreadPool<T> *GetInstance(){// 加锁防止多消费者if (_ins == nullptr){LockGuard lock(_insmutex);LOG(LogLevel::DEBUG) << "获取单例....";if (_ins == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";_ins = new PthreadPool<T>();_ins->Start();}}return _ins;}void Start(){if (_isrunning)return;_isrunning = true;LOG(LogLevel::INFO) << "线程池开始启动···";for (auto &ch : _threads){ch.Start();LOG(LogLevel::INFO) << "新线程" << ch.Name() + "启动了";}}void Stop(){if (!_isrunning)return;_isrunning = false;// 进程停止// 可能有的在休眠// 有的在处理任务// 我们要把休眠的全部唤醒// 并且把任务队列的任务全部执行完// 才可以退出线程池//  唤醒所有的线层WakeUpAllThread();}void Join(){for (auto &ch : _threads){ch.Join();}}bool Enqueue(const T &in){if (_isrunning){LockGuard lock(_mutex);{_taskq.push(in);if (_threads.size() == _sleepnum)WakeUpOne();return true;}}return false;}// 该函数结束就会进行线程就运行完毕了void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));LOG(LogLevel::INFO) << name << " 处理任务...";// 消费者// 多个线程就是消费者while (true){T task;LockGuard lock(_mutex);{while (_taskq.empty() && _isrunning){// 去进行等待_sleepnum++;_cond.Wait(_mutex);_sleepnum--;}if (!_isrunning && _taskq.empty()){LOG(LogLevel::DEBUG) << "线程池退出";break;}LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";task = _taskq.front();_taskq.pop();}// 在临界区外面执行任务可有多个线程共同执行task();}}private:std::vector<Pthread> _threads;int _num;Mutex _mutex;Cond _cond;std::queue<T> _taskq;bool _isrunning; // 用来标记线程退出int _sleepnum;   // 睡眠的线程数static PthreadPool<T> *_ins; // 单例指针// 我们用的单例在开始时没有我们的锁因此我们这把锁要定义为静态的static Mutex _insmutex;};// 静态成员的初始化template <typename T>PthreadPool<T> *PthreadPool<T>::_ins = nullptr;template <typename T>Mutex PthreadPool<T>::_insmutex;
}

3-3 线程安全的单例模式

  • 单例模式概念及特点:
    某些类, 只应该具有⼀个对象(实例), 就称之为单例.
    例如⼀个男⼈只能有⼀个媳妇.
    在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要⽤⼀个单例的类来管理这些数据.
  • 饿汉实现方式和懒汉实现方式(本质就是一个类内成员会在对象实例化时才申请内存但是静态成员在程序开始就申请内存了)

饿汉⽅式实现单例模式:

template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
}

懒汉⽅式实现单例模式:

template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
  • 懒汉方式线程安全版本实现(双重判定、加锁、 volatile 关键字)
// 懒汉模式, 线程安全
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.
lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};

3-4 单例式线程池

#pragma once
#include "Mutex.hpp"
#include "cond.hpp"
#include "log.hpp"
#include "pthread.hpp"
#include <vector>
#include <queue>
namespace PthreadPoolModule
{using namespace MutexMoudle;using namespace CondModule;using namespace LogModule;using namespace PthreadModlue;template <typename T>class PthreadPool{private:void WakeUpAllThread(){LockGuard lockguard(_mutex);if (_sleepnum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程";}void WakeUpOne(){_cond.Signal();LOG(LogLevel::INFO) << "唤醒一个休眠线程";}PthreadPool(int num = 5): _num(num),_isrunning(false),_sleepnum(0){for (int i = 0; i < _num; i++){_threads.emplace_back([this](){ HandlerTask(); });}}public:// 创建单例static PthreadPool<T> *GetInstance(){// 加锁防止多消费者if (_ins == nullptr){LockGuard lock(_insmutex);LOG(LogLevel::DEBUG) << "获取单例....";if (_ins == nullptr){LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";_ins = new PthreadPool<T>();_ins->Start();}}return _ins;}void Start(){if (_isrunning)return;_isrunning = true;LOG(LogLevel::INFO) << "线程池开始启动···";for (auto &ch : _threads){ch.Start();LOG(LogLevel::INFO) << "新线程" << ch.Name() + "启动了";}}void Stop(){if (!_isrunning)return;_isrunning = false;// 进程停止// 可能有的在休眠// 有的在处理任务// 我们要把休眠的全部唤醒// 并且把任务队列的任务全部执行完// 才可以退出线程池//  唤醒所有的线层WakeUpAllThread();}void Join(){for (auto &ch : _threads){ch.Join();}}bool Enqueue(const T &in){if (_isrunning){LockGuard lock(_mutex);{_taskq.push(in);if (_threads.size() == _sleepnum)WakeUpOne();return true;}}return false;}// 该函数结束就会进行线程就运行完毕了void HandlerTask(){char name[128];pthread_getname_np(pthread_self(), name, sizeof(name));LOG(LogLevel::INFO) << name << " 处理任务...";// 消费者// 多个线程就是消费者while (true){T task;LockGuard lock(_mutex);{while (_taskq.empty() && _isrunning){// 去进行等待_sleepnum++;_cond.Wait(_mutex);_sleepnum--;}if (!_isrunning && _taskq.empty()){LOG(LogLevel::DEBUG) << "线程池退出";break;}LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";task = _taskq.front();_taskq.pop();}// 在临界区外面执行任务可有多个线程共同执行task();}}private:std::vector<Pthread> _threads;int _num;Mutex _mutex;Cond _cond;std::queue<T> _taskq;bool _isrunning; // 用来标记线程退出int _sleepnum;   // 睡眠的线程数static PthreadPool<T> *_ins; // 单例指针// 我们用的单例在开始时没有我们的锁因此我们这把锁要定义为静态的static Mutex _insmutex;};// 静态成员的初始化template <typename T>PthreadPool<T> *PthreadPool<T>::_ins = nullptr;template <typename T>Mutex PthreadPool<T>::_insmutex;
}

4. 线程安全和重入问题

线程安全:就是多个线程在访问共享资源时,能够正确地执⾏,不会相互⼲扰或破坏彼此的执⾏结果。⼀般⽽⾔,多个线程并发同⼀段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进⾏操作,并且没有锁保护的情况下,容易出现该问题。
重⼊:同⼀个函数被不同的执⾏流调⽤,当前⼀个流程还没有执⾏完,就有其他的执⾏流再次进⼊,我们称之为重⼊。⼀个函数在重⼊的情况下,运⾏结果不会出现任何不同或者任何问题,则该函数被称为可重⼊函数,否则,是不可重⼊函数


可重⼊与线程安全联系:

  • 函数是可重⼊的,那就是线程安全的
  • 函数是不可重⼊的,那就不能由多个线程使⽤,有可能引发线程安全问题
  • 如果⼀个函数中有全局变量,那么这个函数既不是线程安全也不是可重⼊的。

可重⼊与线程安全区别:

  • 可重⼊函数是线程安全函数的⼀种
  • 线程安全不⼀定是可重⼊的,⽽可重⼊函数则⼀定是线程安全的。
  • 如果将对临界资源的访问加上锁,则这个函数是线程安全的,但如果这个重⼊函数若锁还未释放则会产⽣死锁,因此是不可重⼊的,比如在单进程的时候我们持有锁进入临界区但此时捕捉到信号去执行信号处理方法,而信号处理方法就是我们正在线程指向的方法,则该方法就被重入了但此时锁已经被申请了此时信号处理方法就会一直等在哪里,因此此时就不是可重入函数。

注意

  • 如果不考虑 信号导致⼀个执⾏流重复进⼊函数 这种重⼊情况,线程安全和重⼊在安全⻆度不做区分
  • 但是线程安全侧重说明线程访问公共资源的安全情况,表现的是并发线程的特点
  • 可重⼊描述的是⼀个函数是否能被重复进⼊,表⽰的是函数的特点

5. 常见锁概念

5-1 死锁

  • 死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站⽤不会释放的资源⽽处于的⼀种永久等待状态。
  • 为了⽅便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进⾏后续资源的访问
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

5-2 死锁四个必要条件

  • 互斥条件:⼀个资源每次只能被⼀个执⾏流使⽤

  • 请求与保持条件:⼀个执⾏流因请求资源⽽阻塞时,对已获得的资源保持不放
    在这里插入图片描述

  • 不剥夺条件:⼀个执⾏流已获得的资源,在末使⽤完之前,不能强⾏剥夺
    这里我们理解该条件可以结合上锁的函数:
    在这里插入图片描述
    其中lock就是阻塞式申请锁,申请不到就去阻塞等待
    trylock时如果锁已经被占用,就会去释放对方的锁再让自己去申请锁(这样就破坏了不剥夺条件,避免死锁)
    在这里插入图片描述

  • 循环等待条件:若⼲执⾏流之间形成⼀种头尾相接的循环等待资源的关系
    在这里插入图片描述

5-3 避免死锁

  • 破坏循环等待条件的方法(资源一次性分配、使用超时机制、加锁顺序一致)
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unistd.h>
// 定义两个共享资源(整数变量)和两个互斥锁
int shared_resource1 = 0;
int shared_resource2 = 0;
std::mutex mtx1, mtx2;
// ⼀个函数,同时访问两个共享资源
void access_shared_resources()
{
// std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
// std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
// // 使⽤ std::lock 同时锁定两个互斥锁
// std::lock(lock1, lock2);
// 现在两个互斥锁都已锁定,可以安全地访问共享资源
int cnt = 10000;
while (cnt)
{
++shared_resource1;
++shared_resource2;
cnt--;
}
// 当离开 access_shared_resources 的作⽤域时,lock1 和 lock2 的析构函数会被
⾃动调⽤
// 这会导致它们各⾃的互斥量被⾃动解锁
}
// 模拟多线程同时访问共享资源的场景
void simulate_concurrent_access()
{
std::vector<std::thread> threads;
// 创建多个线程来模拟并发访问
for (int i = 0; i < 10; ++i)
{
threads.emplace_back(access_shared_resources);
}
// 等待所有线程完成
for (auto &thread : threads)
{
thread.join();
}
// 输出共享资源的最终状态
std::cout << "Shared Resource 1: " << shared_resource1 << std::endl;
std::cout << "Shared Resource 2: " << shared_resource2 << std::endl;
}
int main()
{
simulate_concurrent_access();
return 0;
}
$ ./a.out // 不⼀次申请
Shared Resource 1: 94416
Shared Resource 2: 94536$ ./a.out // ⼀次申请
Shared Resource 1: 100000
Shared Resource 2: 100000
  • 避免锁未释放的场景

6. STL、智能指针和线程安全

6-1 STL 中的容器是否是线程安全的

不是.
原因是, STL 的设计初衷是将性能挖掘到极致, ⽽⼀旦涉及到加锁保证线程安全, 会对性能造成巨⼤的影响.
⽽且对于不同的容器, 加锁⽅式的不同, 性能可能也不同(例如hash表的锁表和锁桶).
因此 STL 默认不是线程安全. 如果需要在多线程环境下使⽤, 往往需要调⽤者⾃⾏保证线程安全

6-2 智能指针是否是线程安全的

  • 对于 unique_ptr, 由于只是在当前代码块范围内⽣效, 因此不涉及线程安全问题.(只是unique_ptr本身不涉及线程安全,不代表指向的资源是线程安全的)
  • 对于 shared_ptr, 多个对象需要共⽤⼀个引⽤计数变量, 所以会存在线程安全问题. 但是标准库实现的时候考虑到了这个问题, 基于原⼦操作(CAS)的⽅式保证 shared_ptr 能够⾼效, 原⼦的操作引⽤计数.
http://www.xdnf.cn/news/2017.html

相关文章:

  • ElementUi的Dropdown下拉菜单的详细介绍及使用
  • 软件测试大模型Agent探索(dify:chatflow+企业微信机器人)
  • 【C++类和数据抽象】复制构造函数
  • SIEMENS PLC 程序 GRAPH 程序解读 车型入库
  • 【深度强化学习 DRL 快速实践】近端策略优化 (PPO)
  • mybatis-plus里的com.baomidou.mybatisplus.core.override.MybatisMapperProxy 类的详细解析
  • Python Cookbook-6.8 避免属性读写的冗余代码
  • (PYTHON)函数
  • 多物理场耦合低温等离子体装置求解器PASSKEy2
  • uniapp小程序开发入门01-快速搭建一个空白的项目并预览它
  • 多模态(3):实战 GPT-4o 视频理解
  • 线上图书借阅小程序源码介绍
  • 系统测试的技术要求
  • 基于Docker的Flask项目部署完整指南
  • 基于C#+Unity实现遇见李白小游戏
  • 《强势量价关系》速读笔记
  • 【信息系统项目管理师】高分论文:论人力资源管理与成本管理(医院信息系统)
  • 【Python数据分析】Pandas模块之pd.concat 函数
  • 【Agent】LangManus深度解析:AI自动化框架的对比与langgraph原理
  • openwrt查询网关的命令
  • Flink部署与应用——部署方式介绍
  • 更智能的银行体验:生成式 AI 与语义搜索的实际应用
  • windows服务器及网络:搭建FTP服务器
  • 【AIGC】基础篇:VS Code 配置 Python 命令行参数调试debug超详细教程
  • DeepSeek 赋能全流程数据治理:构建智能化数据价值链
  • vue3中的effectScope有什么作用,如何使用?如何自动清理
  • 传感器模块有助于加速嵌入式视觉开发
  • 最新大模型算法的研究进展与应用探索
  • SIEMENS PLC程序解读 -BLKMOV (指定长度数据批量传输)
  • 六、web自动化测试02