当前位置: 首页 > backend >正文

C++11线程间通信同步与Linux中MySQL连接池实现

前言

条件竞争一般是指两个及两个以上线程共享数据内存,并抢着完成各自的任务。线程在抢夺资源完成自己的任务时,可能会破坏其中共享数据的不变量,这种情况就是恶性条件竞争,这种破坏不变量的竞争应该尽量去避免。
互斥锁保证某一时刻,只能在一处获取锁,当当前线程拥有锁时,其他线程如果想获取锁,那么必须等待直到对方解锁。C++11中的互斥锁:std::mutex, std::lock_guard, std::unique_lock。

1. 互斥锁

实际上,一般不使用std::mutex,需要手动加解锁的操作容易在程序开发过程中产生问题。
在可以使用lock_guard()的场景就使用lock_guard(),需要更灵活的场景如条件变量时才使用unique_lock()
使用锁时需要先定义一个全局的mutex供线程使用。

1.1 std::mutex

作用

  • 互斥锁(Mutual Exclusion),用于保护共享资源,防止多线程同时访问。
  • 提供基本的加锁(lock())和解锁(unlock())操作。

特点

  • 手动管理:需要显式调用 lock() 和 unlock()。
  • 异常不安全:若在 lock() 后、unlock() 前发生异常,可能导致死锁。
  • 不可复制/移动:不能复制或移动互斥锁对象。
#include <mutex>
#include <thread>std::mutex mtx;
int shared_data = 0;void increment() {mtx.lock();++shared_data;  // 临界区mtx.unlock();
}

若忘记调用 unlock(),会导致死锁
不推荐直接使用,更推荐使用 RAII 包装类(如 lock_guard 或 unique_lock)。

1.2 std::lock_guard

作用

  • RAII(Resource Acquisition Is Initialization)风格的互斥锁包装器。
  • 构造时自动加锁,析构时自动解锁,确保异常安全。

特点

  • 轻量级:无额外开销,性能高效。
  • 不可手动控制:不能提前解锁,作用域结束时自动释放。
  • 不可复制/移动。

std::lock_guard在离开作用域时会自动析构解锁。

std::mutex mtx;void safe_increment() {std::lock_guard<std::mutex> lock(mtx);  // 自动加锁++shared_data;  // 临界区
}  // 自动解锁(即使发生异常)

适用场景

  • 简单的临界区保护,无需复杂锁管理。

1.3 std::unique_lock

作用

  • 更灵活的 RAII 互斥锁包装器,支持延迟加锁、手动解锁和条件变量。

特点

  • 灵活性高:支持以下操作:

    • 延迟加锁defer_lock)。

    • 手动加锁/解锁(lock(), unlock())

    • 尝试加锁(try_lock())。

    • 与条件变量(std::condition_variable)配合使用。

  • 可移动:可以通过 std::move 转移所有权。

  • 稍重:相比 lock_guard 有轻微性能开销。

std::unique_lock<std::mutex> lock(mtx, std::defer_lock); // 延迟加锁
提供std::defer_lock参数后即不在构造时立刻加锁,而是随后手动调用lock()加锁,可手动解锁也可等待析构自动解锁

std::mutex mtx;void flexible_increment() {std::unique_lock<std::mutex> lock(mtx, std::defer_lock);  // 延迟加锁// ... 其他非临界区操作 ...lock.lock();  // 显式加锁++shared_data;lock.unlock();  // 显式解锁(可选)
}

1.3.1 std::defer_lock参数

这个参数表示暂时先不lock,之后手动去lock,但是使用之前也不允许lock
一般用来搭配unique_lock的成员函数去使用

#include <iostream>
#include <mutex>std::mutex mlock;void work1(int& s) {for (int i = 1; i <= 5000; i++) {std::unique_lock<std::mutex> munique(mlock, std::defer_lock);munique.lock();s += i;munique.unlock();         // 这里可以不用unlock,可以通过unique_lock的析构函数unlock}
}void work2(int& s) {for (int i = 5001; i <= 10000; i++) {std::unique_lock<std::mutex> munique(mlock, std::defer_lock);munique.lock();s += i;munique.unlock();}
}int main()
{int ans = 0;std::thread t1(work1, std::ref(ans));std::thread t2(work2, std::ref(ans));t1.join();t2.join();std::cout << ans << std::endl;return 0;
}

1.3.2 try_lock( )成员函数

可以避免一些不必要的等待,会判断当前mutex能否被lock,如果不能被lock,可以先去执行其他代码
举个例子来说就是如果有一个线程被lock,而且执行时间很长,那么另一个线程一般会被阻塞在那里,反而会造成时间的浪费。那么使用了try_to_lock后,如果被锁住了,它不会在那里阻塞等待,它可以先去执行其他没有被锁的代码

void work1(int& s) {for (int i = 1; i <= 5000; i++) {std::unique_lock<std::mutex> munique(mlock, std::defer_lock);if (munique.try_lock() == true) {s += i;}else {// 处理一些没有共享内存的代码}}
}

适用场景

  • 需要手动控制锁的获取/释放。
  • 配合条件变量实现复杂的同步逻辑。
  • 需要转移锁的所有权(如返回锁或存储在容器中)。
特性std::mutexstd::lock_guardstd::unique_lock
RAII❌ 需手动加锁/解锁✅ 自动加锁/解锁✅ 自动加锁/解锁(可手动干预)
延迟加锁❌ 不支持❌ 不支持✅ 支持(通过 std::defer_lock)
手动解锁✅ 支持❌ 不支持✅ 支持
条件变量支持❌ 不支持❌ 不支持✅ 支持
性能开销极低略高于 lock_guard
适用场景基础互斥操作简单临界区保护复杂锁管理或条件变量

1.4 最佳实践

优先使用 lock_guard:在简单的临界区保护中,使用 lock_guard 以确保代码简洁和安全。

需要灵活性时用 unique_lock:当需要延迟加锁、手动控制或配合条件变量时,选择 unique_lock。

避免直接操作 std::mutex:除非需要极底层控制,否则优先使用 RAII 包装类。

缩小临界区范围:尽量减少锁的持有时间,避免在锁内执行耗时操作(如 I/O)。

防止死锁:按固定顺序获取多个锁,或使用 std::lock 一次性获取多个锁。

示例:配合条件变量

std::mutex mtx;
std::condition_variable cv;
bool data_ready = false;void consumer() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return data_ready; });  // 自动释放锁,等待通知// 处理数据...
}void producer() {{std::lock_guard<std::mutex> lock(mtx);data_ready = true;}cv.notify_one();  // 通知消费者
}

2. 条件变量

在 C++11 中,条件变量(std::condition_variable)是用于多线程间同步的核心工具,它允许线程阻塞等待某个条件成立,或唤醒其他等待的线程。条件变量通常与互斥锁(std::mutex)结合使用,用于实现复杂的线程协作逻辑(如生产者-消费者模型)。以下是详细说明和用法:

2.1 条件变量的核心作用

线程阻塞与唤醒

  • 让线程在某个条件不满足时主动阻塞并释放锁。
  • 当其他线程修改条件后,通过通知(notify_one 或 notify_all)唤醒等待的线程。

避免忙等待:通过阻塞而非轮询减少 CPU 资源浪费。

2.2 C++11 中的条件变量类

std::condition_variable:

  • 必须与 std::mutex 配合使用。
  • 轻量级,性能较高。

std::condition_variable_any:

  • 可与任何满足「基本锁」概念的类型配合使用(如 std::shared_mutex)。
  • 灵活性更高,但可能有额外性能开销。

2.3 核心接口

等待条件:wait()

  • wait(lock, predicate):
    • 阻塞当前线程,直到 predicate 返回 true。
    • 内部流程:
      1. 释放锁:释放和unique_lock关联的互斥锁,允许其他线程访问和修改条件变量相关的共享数据
      2. 进入等待队列
      3. 等待条件或通知(条件或通知其中之一发生)
      4. 重新获取锁:线程唤醒后,会尝试获取之前释放的互斥锁,如果锁被其他线程获取,该线程阻塞直至获取锁
      5. 检查条件: 如果条件不成立,线程会重新进入等待队列并释放锁,直至条件成立
      6. 继续执行作用域内剩下语句

如果没有通知,即使条件为true,线程也不会被执行。如果通知了,但是条件不成立,仍然被阻塞。(使用pred可以防止虚假唤醒发生,即唤醒了但是所需wait的容器/内存还未准备好)
wait有两个重载,wait(lock)或wait(lock,pred),pred是一个可调用对象(如函数、lambda表达式)

简而言之,wait使线程进入等待状态,直到有notify_one或者notify_all唤醒它。等待期间,线程释放与之关联的互斥锁,允许其他线程访问共享数据。线程被唤醒时,获取锁并继续执行作用域内的语句。

唤醒线程

  • notify_one():唤醒一个等待的线程(若有多个,随机选择一个)。
  • notify_all():唤醒所有等待的线程。

2.4 示例:生产者-消费者模型

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;  // 数据队列
bool finished = false;  // 用于表示生产者是否已经完成数据生成void producer(int id) {for (int i = 0; i < 5; ++i) {std::this_thread::sleep_for(std::chrono::seconds(1));  // 模拟数据生成过程std::lock_guard<std::mutex> lock(mtx);data_queue.push(i);std::cout << "Producer " << id << " produced data " << i << std::endl;cv.notify_one();  // 通知消费者线程}
}void consumer(int id) {while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, []{return !data_queue.empty() || finished;});  // 等待数据生成while (!data_queue.empty()) {int data = data_queue.front();data_queue.pop();std::cout << "Consumer " << id << " consumed data " << data << std::endl;// 模拟数据处理过程std::this_thread::sleep_for(std::chrono::seconds(1));}if (data_queue.empty() && finished) {break;}}
}int main() {std::thread producers[2];std::thread consumers[2];for (int i = 0; i < 2; ++i) {producers[i] = std::thread(producer, i + 1);consumers[i] = std::thread(consumer, i + 1);}for (auto &p : producers) {p.join();}finished = true;cv.notify_all();for (auto &c : consumers) {c.join();}return 0;
}

关键机制解释
(1)为什么用 while 检查条件?
虚假唤醒(Spurious Wakeup):
即使没有 notify,线程也可能被唤醒(受系统调度影响)。
必须通过循环检查条件,确保条件真正成立。

(2)锁的管理
wait() 内部会释放锁,允许其他线程操作共享资源。
唤醒后重新获取锁,确保后续操作线程安全。

(3)notify_one vs notify_all
notify_one:效率高,但需确保仅一个线程能处理条件(如单一消费者)。
notify_all:更安全,但可能引发多个线程竞争(需结合条件判断)。

2.5 注意事项

始终在修改条件时持有锁:

{std::lock_guard<std::mutex> lock(mtx);data_ready = true; // 修改条件
}
cv.notify_one();       // 通知

修改条件和发送通知需原子化,避免竞争。

避免死锁:

  • 确保所有路径都能释放锁或唤醒等待线程。
  • 不要遗漏 notify 调用。

条件变量的生命周期:

  • 确保条件变量在所有线程结束前保持有效。
  1. 条件变量与互斥锁的协作
    条件变量本身不管理共享资源,需通过互斥锁保护共享状态。

经典模式:

std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return condition; }); // 等待条件成立
// 操作共享资源...
lock.unlock(); // 可选:提前释放锁
cv.notify_one(); // 通知其他线程

2.6 条件变量总结

适用场景

  • 生产者-消费者队列:队列空/满时的线程同步。
  • 任务调度:线程池中任务的分发与执行。
  • 资源池管理:如数据库连接池的分配与回收。

与信号量的区别

特性条件变量信号量
C++标准支持C++11C++20
灵活性需手动管理条件,更底层直接通过计数器控制资源访问
适用场景复杂条件判断(如队列空/满)简单资源计数限制
协作机制需结合互斥锁和条件检查独立的计数操作(acquire/release)

条件变量是 C++11 多线程编程中实现复杂同步逻辑的核心工具。通过结合互斥锁和条件检查,可以高效解决线程间协作问题(如生产者-消费者模型)。使用时需注意:

  1. 始终用循环检查条件(防止虚假唤醒)。
  2. 在修改条件时持有锁。
  3. 合理选择 notify_one 或 notify_all。

3. MySQL连接池实现

介绍了互斥锁和条件变量的简单例子后,看一下实际上的运用
这里附上Linux下MySQL连接池的实现源码
connectionpool.h文件

#pragma once
#include <string>
#include <queue>
#include <mutex>
#include <iostream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <memory>
#include <functional>
#include <stdlib.h>
#include "connection.h"
using namespace std;/*
实现连接池功能模块
*/
class ConnectionPool
{
public:// 删除拷贝构造函数和赋值运算符 = delete 代表函数禁用, 也可以将其访问权限设置为私有ConnectionPool(const ConnectionPool& obj) = delete;ConnectionPool(const ConnectionPool&& obj) = delete;ConnectionPool& operator=(const ConnectionPool& obj) = delete;// 获取连接池对象实例static ConnectionPool* getConnectionPool();// 给外部提供接口,从连接池中获取一个可用的空闲连接shared_ptr<Connection> getConnection();
private:// 单例#1 构造函数私有化ConnectionPool();// 从配置文件中加载配置项bool loadConfigFile(); // 运行在独立的线程中,专门负责生产新连接void produceConnectionTask();// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收void scannerConnectionTask();string _ip; // mysql的ip地址unsigned short _port; // mysql的端口号 3306string _username; // mysql登录用户名string _password; // mysql登录密码string _dbname; // 连接的数据库名称int _initSize; // 连接池的初始连接量int _maxSize; // 连接池的最大连接量int _maxIdleTime; // 连接池最大空闲时间int _connectionTimeout; // 连接池获取连接的超时时间queue<Connection*> _connectionQue; // 存储mysql连接的队列mutex _queueMutex; // 维护连接队列的线程安全互斥锁atomic_int _connectionCnt; // 记录连接所创建的connection连接的总数量 condition_variable cv; // 设置条件变量,用于连接生产线程和连接消费线程的通信
};

connectionpool.cpp文件

#include <fstream>
#include <sstream>
#include "connectionpool.h"
#include <muduo/base/Logging.h>// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{static ConnectionPool pool; // lock和unlockreturn &pool;
}// 辅助函数:去除字符串的前后空白
string trim(const string& str)
{size_t start = str.find_first_not_of(" \t");size_t end = str.find_last_not_of(" \t");return (start == string::npos || end == string::npos) ? "" : str.substr(start, end - start + 1);
}bool ConnectionPool::loadConfigFile()
{//setenv("MYSQL_CONF_PATH","/home/kyros1ee/QtEnviroment/WeChat-main/chatserver/conf/mysql.conf",1);// 设置环境变量或绝对路径const char* configPath = getenv("MYSQL_CONF");	//if (!configPath){LOG_ERROR << " mysql.conf MYSQL_CONF_PATH not set!";return false;}ifstream file(configPath);if (!file.is_open()){LOG_ERROR << "mysql.conf 文件不存在!";return false;}string line;while (getline(file, line)){// 忽略空行和注释行if (line.empty() || line.find('=') == string::npos)continue;if (line.back() == '\r') {line.pop_back();}istringstream iss(line);string key, value;if (getline(iss, key, '=') && getline(iss, value)){// 去除可能存在的前后空白key = trim(key);value = trim(value);if (key == "ip") _ip = value;else if (key == "port") _port = stoi(value);else if (key == "username") _username = value;else if (key == "password") _password = value;else if (key == "dbname") _dbname = value;else if (key == "initSize") _initSize = stoi(value);else if (key == "maxSize") _maxSize = stoi(value);else if (key == "maxIdleTime") _maxIdleTime = stoi(value);else if (key == "connectionTime") _connectionTimeout = stoi(value);}}return true;
}// 连接池的构造
ConnectionPool::ConnectionPool()
{// 加载配置项了if (!loadConfigFile()){return;}// 创建初始数量的连接for (int i = 0; i < _initSize; ++i){Connection *p = new Connection();p->connect(_ip, _port, _username, _password, _dbname);p->refreshAliveTime(); // 刷新一下开始空闲的起始时间_connectionQue.push(p);_connectionCnt++;}// 启动一个新的线程,作为连接的生产者 linux thread => pthread_createthread produce(std::bind(&ConnectionPool::produceConnectionTask, this));produce.detach();// 启动一个新的定时线程,扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收thread scanner(std::bind(&ConnectionPool::scannerConnectionTask, this));scanner.detach();
}// 运行在独立的线程中,专门负责生产新连接
void ConnectionPool::produceConnectionTask()
{for (;;){unique_lock<mutex> lock(_queueMutex);while (!_connectionQue.empty()){cv.wait(lock); // 队列不空,此处生产线程进入等待状态}// 连接数量没有到达上限,继续创建新的连接if (_connectionCnt < _maxSize){Connection *p = new Connection();p->connect(_ip, _port, _username, _password, _dbname);p->refreshAliveTime(); // 刷新一下开始空闲的起始时间_connectionQue.push(p);_connectionCnt++;}// 通知消费者线程,可以消费连接了cv.notify_all();}
}// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{unique_lock<mutex> lock(_queueMutex);while (_connectionQue.empty()){// sleepif (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout))){if (_connectionQue.empty()){LOG_INFO << "获取空闲连接超时了...获取连接失败!";return nullptr;}}}/*shared_ptr智能指针析构时,会把connection资源直接delete掉,相当于调用connection的析构函数,connection就被close掉了。这里需要自定义shared_ptr的释放资源的方式,把connection直接归还到queue当中*/shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection *pcon) {// 这里是在服务器应用线程中调用的,所以一定要考虑队列的线程安全操作unique_lock<mutex> lock(_queueMutex);pcon->refreshAliveTime(); // 刷新一下开始空闲的起始时间_connectionQue.push(pcon);});_connectionQue.pop();cv.notify_all();  // 消费完连接以后,通知生产者线程检查一下,如果队列为空了,赶紧生产连接return sp;
}// 扫描超过maxIdleTime时间的空闲连接,进行对于的连接回收
void ConnectionPool::scannerConnectionTask()
{for (;;){// 通过sleep模拟定时效果this_thread::sleep_for(chrono::seconds(_maxIdleTime));// 扫描整个队列,释放多余的连接unique_lock<mutex> lock(_queueMutex);while (_connectionCnt > _initSize){Connection *p = _connectionQue.front();if (p->getAliveeTime() >= (_maxIdleTime * 1000)){_connectionQue.pop();_connectionCnt--;delete p; // 调用~Connection()释放连接}else{break; // 队头的连接没有超过_maxIdleTime,其它连接肯定没有}}}
}
http://www.xdnf.cn/news/2633.html

相关文章:

  • XLSX.utils.sheet_to_json设置了blankrows:true,但无法获取到开头的空白行
  • JDBC 使用流程详解
  • rag增强检索-基于关键词检索的混合检索模式
  • vue响应式原理——vue2和vue3的响应式实现区别
  • 非结构化数据解析
  • wsl(8) -- 图形界面
  • 封装el-autocomplete,接口调用
  • Ubuntu安装brew
  • OSI 模型(开放系统互联模型)
  • FEKO许可安装
  • CCF推荐学术会议-C(网络与信息安全):SAC 2025
  • Python学习之路(六)-图像识别
  • 数字化转型的未来趋势:从工具到生态,聚焦生态合作、绿色转型与全球化布局
  • Vue3 Element Plus el-tabs数据刷新方法
  • 更快的图像局部修改与可控生成:Flex.2-preview
  • 航顺 芯片 开发记录 (一) 2025年4月27日19:23:32
  • 【博客系统】博客系统第二弹:实现博客列表接口
  • T检验、F检验及样本容量计算学习总结
  • 通过示例学习:连续 XOR
  • SpringBoot驾校报名小程序实现
  • 详细PostMan的安装和基本使用方法
  • 【SF】在 Android 显示系统中,图层合成方式 Device 和 Client 的区别
  • 文章记单词 | 第50篇(六级)
  • Zookeeper HA集群搭建
  • 昂瑞微蓝牙OM6621系列对比选型指南
  • 《代码整洁之道》第8章 边界 - 笔记
  • NCCL 通信与调试
  • Grok发布了Grok Studio 和 Workspaces两个强大的功能。该如何使用?如何使用Grok3 API?
  • 深度学习与SLAM特征提取融合:技术突破与应用前景
  • 深入解读:2025 数字化转型管理 参考架构