3.2-C++基础组件
目录
一、原子操作与锁
原子操作原子性的实现
锁的理解
二、无锁队列设计
三、定时器
初识定时器
手写定时器
四、死锁检测组件
理论实现
代码实现
五、分布式锁
六、内存泄漏检测组件
理论
代码
一、原子操作与锁
原子操作原子性的实现
单核单处理情况下,只需要屏蔽中断,即可保证原子性。
多核多处理器情况下,在硬件提供原子指令从指令层杜绝中断以外,通过MESI协议确保在执行原子指令的瞬间,只有一个核心拥有数据的独占修改权,其他核心的副本全部失效。
MESI协议小剧场:
角色介绍:MESI 的四种状态
首先,复习一下四个状态,这是协议的语言:
M (Modified):脏数据。只有本核心有副本,且与主内存不一致。拥有“独占修改权”。
E (Exclusive):干净数据。只有本核心有副本,但与主内存一致。也拥有“独占权”,但还未修改。
S (Shared):干净数据。多个核心都有副本,都与主内存一致。大家只有“读取权”,没有“修改权”。
I (Invalid):无效数据。副本已过期,不能使用。相当于没有副本。
战场:CPU缓存、缓存总线和嗅探器
每个CPU核心都有自己的缓存。所有缓存都连接到一个共享的总线上。每个缓存都有一个嗅探器(Snooper),它的任务是时刻监视总线上的所有通信消息。
战斗目标:获取独占权(E或M状态)
假设有两个核心:Core 1 和 Core 2。它们缓存中都有变量
X
的副本,且状态均为 S (Shared)。现在,Core 1 要执行一个原子操作(如X++
)。为了原子性地修改
X
,Core 1 必须先将它的缓存行状态从 S 提升为 E 或 M。MESI协议通过以下步骤确保这一点:
战斗过程:一次完整的“独占权夺取”
第1步:Core 1 发出“读请求”并寻求独占(Read For Ownership, RFO)
Core 1 需要修改
X
,但它当前的缓存副本是 S 状态,没有修改权。于是,它在总线上广播一条消息:“我要读地址A的数据,并且我想要独占权限!”(Read-For-Ownership for address A)
这条RFO请求是夺取独占权的发令枪。
第2步:其他核心的嗅探器拦截请求并采取行动
总线上所有其他核心(这里主要是Core 2)的嗅探器都看到了这条RFO消息。
Core 2 的嗅探器 检查自己是否有地址A的缓存。
有,且状态为S:Core 2 知道自己有一个共享副本,但现在另一个核心要求独占权。
行动:Core 2 必须将自己的这个缓存行状态立即从 S 降级为 I (Invalid)。这意味着Core 2 的
X
副本作废了。响应:Core 2 通过总线回复一个确认(Acknowledge) 消息,意思是:“收到命令,我的副本已无效化。”
第3步:Core 1 收集响应并完成升级
Core 1 等待所有其他核心的确认响应。
一旦收到所有确认(表明世界上再也没有其他核心拥有
X
的有效副本了),Core 1 的夺取行动就成功了。现在,Core 1 可以将自己缓存中
X
的状态从 S 提升为 E (Exclusive)。此时此刻,在取得独占权的这个瞬间:
Core 1:是全世界唯一一个拥有
X
有效缓存副本的核心,状态为 E。它获得了宝贵的独占修改权。Core 2:它的
X
副本状态为 I,已经完全失效。如果Core 2 现在试图读X
,它会产生缓存未命中,必须重新向总线发起请求。主内存:数据依然是旧值。
执行原子操作
现在,Core 1 可以安全地执行原子操作了:
它执行底层的原子指令(例如,一条硬件的
LOCK INC
指令)。它将
X
的值加1。因为这个修改,它缓存行的状态从 E 变为 M (Modified)。现在数据是“脏”的,与主内存不一致。
在整个原子指令执行的极短时间内,由于缓存行处于 E/M 状态,缓存协议确保了绝对的数据独占性。 其他核心根本无法访问这个数据,因为它们的副本是 I(无效的),任何访问尝试都会导致缓存未命中,而总线又被Core 1的原子操作所“锁定”或协调。
后续:数据同步
当Core 2 之后需要读取
X
时:
Core 2 发现自己的副本是 I,于是在总线上发出“读请求”。
Core 1 的嗅探器看到这个请求,发现自己有最新的 M 状态数据。
Core 1 拦截这个读请求,将自己缓存中的最新数据直接通过总线写回给Core 2,同时也写回主内存以更新。
现在,Core 1 和 Core 2 的缓存行状态都变为 S (Shared),数据保持一致。
锁的理解
自旋锁和互斥锁的区别:
1)等待策略:自旋锁会忙等待,检测到临界资源被加了自旋锁会while(lock->flag)
互斥锁会进入内核态休眠,检测到临界资源不可用会通过futex系统调用进入内核态
2)上下文切换:自旋锁一直都在占用当前核心,无上下文切换。
互斥锁有。
3)使用场景:使用过程都是 加锁-操作-解锁。所以当对临界资源的操作时间长的时候,选择互斥锁。当对临界资源操作时间短的时候,选择自旋锁,减少线程切换开销。
操作时间长短的定义:以陷入内核态+切换线程+切回用户态的总时间为标准。
二、无锁队列设计
有锁队列是什么?
通过互斥锁或其他同步机制保证线程安全的队列,比如之前实现的线程池。
有锁队列中锁的局限有:线程切换、死锁、性能瓶颈。
无锁队列是什么?
属于非阻塞队列,通过原子操作来实现线程安全的队列。
无锁队列的类型有:
1.lock-free(无锁),贴近我们的理解,每次至少一个线程成功,其他申请的线程重试,依赖的是cas等原子操作。
CAS是什么?
核心机制,包含三个操作数:内存位置(V)、预期原值(A)、新值(B)。
只有当位置 V 的值等于预期原值 A 时,才会将位置 V 的值更新为新值 B。每次都返回V值,如果等于A说明修改成功,如果不等于,说明在这个期间有其他线程修改了V,操作失败。
2.wait-free(无等待),所有的线程必成功,无重试,依赖exchange操作。
有锁队列和无锁队列的直观区别?
相同:每次都只能一个线程访问临界区的资源。
不同:
1.有阻塞与无阻塞:有锁队列的没有进入临界区的线程,只能阻塞等待。且如果访问临界区的线程崩溃了,就会造成死锁。无锁队列每个线程随时都可以尝试CAS操作,没有进入临界区的线程可以去先干别的事,循环重试。
2.锁与原子操作的区别。
无锁队列的实现
1.volatile关键字:保证可见性,某线程修改变量后能被其他线程看见。确保的是当前线程内代码与“外部代理”(硬件、中断、信号)之间的正确交互。禁止编译器优化掉指令,编译器会在每次读到volatile修饰的变量的时候都重新加载数据,不因为本线程内没有修改该变量的代码而不再读取。
2.内存屏障:由于CPU为了性能的提升,会重排代码指令,但是如果是多线程的情况下直接重排代码,会发生异常行为。内存屏障强制在屏障处建立一个“同步点”(最常见的是 Release-Acquire 配对),屏障前的操作必须全部完成且可见后,才能执行屏障后的操作,从而在多线程中维护了逻辑的正确性。
3.原子操作:上面两项技术都不保证原子性,需要原子操作来保证竞争合法。
关系1:原子操作往往内置了内存屏障,如:
std::atomic<int> counter(0);// 线程 1
counter.store(42, std::memory_order_release); // 写操作,包含一个 release 屏障// 线程 2
int value = counter.load(std::memory_order_acquire); // 读操作,包含一个 acquire 屏障
关系2:volatile与他们几乎无关,唯一与原子操作结合的地方是在“需要操作的变量本身是内存映射的硬件寄存器,并且需要在多个线程之间以原子方式访问它。”
volatile
用于告诉编译器:“别优化,这个地址是硬件,每次都必须读写内存”。atomic
用于告诉 CPU:“这个操作需要原子性和多线程同步语义”。
三、定时器
初识定时器
定时器是什么?
定时器是一个项目中,组织管理延时任务的模块,帮助延时任务高效的度过延迟的那段时间,而非sleep阻塞等待。
为什么要有定时器?
如果没有定时器的话,想要3s后给用户弹出一个通知,调用sleep(3),那这3s线程就会被阻塞,也无法去处理其他任务,这是低效的行为。
定时器是怎么“高效”的?
参考epoll的事件通知方法,如果能够做到延迟的这段时间线程去处理其他任务,在时间结束后可以给到线程一个通知,就可以最大限度利用cpu资源。
定时器的实现架构?
基于上面高效的理解,定时器需要一个数据结构存储延时事件 + 一个触发机制。
实现定时器的容器是什么?
对于这个存储的数据结构,需要能够根据插入事件触发的时间自动进行排序,所以可以是红黑树实现的multiset / multimap,或者是最小堆。还可以根据执行顺序进行组织,比如时间轮。
时间轮是什么?
通过空间换时间。将时间分成三级,时、分、秒,每一级都有对应的格子数,每个格子存储的是一个链表,当时间指针走到对应格子的时候执行链表中的所有事件。如果要添加一个1h5min20s后执行的任务,就先挂到(当前小时 + 1) % 24的格子下面,当走到这个格子下面的时候,将他重新映射到(当前分钟 + 5) % 60的格子下面,当分针也走到这个格子的时候,再降级到对应秒针格子下面。再走到对应格子的时候,执行对应事件。
实现定时器的触发机制是什么?
将定时器的超时处理转为io处理,Linux内核提供了timerfd,以此来在io多路复用中注册一个读事件,比如select的FD_SET、epoll的epoll_ctl,在发起io时系统调用陷入内核态处理,在io响应时返回用户态。
手写定时器
思路:需要实现一个定时器timer类,里面有容器存储定时事件,每个定时事件也是一个类(结构体),有自己的执行时间timeout,有自己的回调函数callback。通过timer类下WaitTime函数计算容器中所有事件里最小的等待时间diff,作为epoll的第四个参数,这样epoll就会规定在diff时间后返回。而将io处理转为事件处理的是一个timer类下的Handle函数,在epoll等待最小时间返回后,表明有事件可以执行了,那么调用Handle函数,在函数内调用事件的回调函数,容器也删除这个事件。
代码:
#include <sys/epoll.h>
#include <sys/timerfd.h>
#include <time.h>
#include <unistd.h>
#include <functional>
#include <chrono>
#include <set>
#include <memory>
#include <iostream>class TimerNode {
public:friend class Timer;TimerNode(uint64_t timeout, std::function<void()> callback):timeout_(timeout),callback_(std::move(callback)){}//move避免拷贝
private:uint64_t timeout_;std::function<void()> callback_;
};
class Timer {
public:~Timer() {}static uint64_t GetCurrentTime() {using namespace std::chrono;return duration_cast<milliseconds>(steady_clock::now.time_since_epoch()).count();//steady_clock不受系统时间调整影响}TimerNode* AddTimerNode(uint64_t diff, std::function<void()> cb) {TimerNode* node = new TimerNode(GetCurrentTime + diff, std::move(cb));//node的类型是指针if (timer_map_.empty() == 0 && node->timeout_ > timer_map_.rbegin()->first) {auto it = timer_map_.emplace_hint(timer_map_.crbegin().base(), std::make_pair(node->timeout_, std::move(node)));return it->second;}else {auto it = timer_map_.insert(std::make_pair(node->timeout_,node));return it->second;}}void DelTimeout(TimerNode* node) {auto it = timer_map_.equal_range(node->timeout_);//先找值相同的 比直接遍历更快for (auto iter = it.first; iter != it.second; iter++) {if (iter->second == node) {timer_map_.erase(iter);break;}}}int WaitTime() {//最小等待时间auto iter = timer_map_.begin();if (iter == timer_map_.end()) {return -1;}uint64_t diff = iter->first - GetCurrentTime();return diff > 0 ? diff : 0;}void HandleTimeout() {auto iter = timer_map_.begin();while (iter != timer_map_.end() && iter->first <= GetCurrentTime()) {iter->second->callback_();//指向事件的指针 调用事件函数iter = timer_map_.erase(iter);}}
private:std::multimap<uint64_t, TimerNode*> timer_map_;
};int main() {int epfd = epoll_create(0);if (epfd == -1) {std::cerr << "epoll create error: " << errno << std::endl;return -1;}Timer timer;int i;timer.AddTimerNode(1000, [&]() {//[&]只会捕获 lambda 体内实际使用的变量std::cout << "timeout 1 second: " << i++ << std::endl;});epoll_event evs[512];while (1) {int n = epoll_wait(epfd, evs, 512, timer.WaitTime());if (n == -1) {std::cerr << "epoll wait error: " << errno << std::endl;break;}timer.HandleTimeout();//epoll与定时器容器的 “链接手段”}return 0;
}
四、死锁检测组件
理论实现
问题引入:有A、B、C、D、E五个线程,mtx1~5,五个临界资源。如何检测这五个线程间是否存在死锁?
问题解决:如果能够用一条边(A->2)的形式表示资源请求;且知道2在被占用情况下,对应占用的线程编号(假设是B),那么就可以构建一条(A->B)边,就可以通过判断是否存在环,检测是否存在死锁情况。
1.涉及到资源请求的代码表示,就是图的一条边。
2.涉及到资源占有的线程查询,就是查询 <mtx,threadid>结构体数组表,找到对应资源的占用线程id。
代码实现
1.原生的临界资源上锁/解锁 pthread_mutex_lock/unlock
,不提供上面我们解决问题需要的线程对资源的请求和持有关系,所以需要hook改写。对于每个线程之间可能要构建边,所以在创建线程的时候,也要创建一个v结点
int pthread_mutex_lock(pthread_mutex_t *mutex) {pthread_t selfid = pthread_self();lock_before((uint64_t)selfid, (uint64_t)mutex);查找结构体数组,看mutex是否已经被占有,以及占有的线程id是多少当前selfid 与 占有线程id 两个点构建一个图的边pthread_mutex_lock_f(mutex); //真正的加锁lock_after((uint64_t)selfid, (uint64_t)mutex);在结构体数组中,增加一个当前selfid与mutex的 item在图中 删除selfid 与 之前的占有线程id 两个点构建的边}int pthread_mutex_unlock(pthread_mutex_t *mutex) {pthread_mutex_unlock_f(mutex);pthread_t selfid = pthread_self();unlock_after((uint64_t)selfid, (uint64_t)mutex); 删除结构体数组中 <mutex ,占用线程id>的item}int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr,void *(*start_routine)(void *), void *restrict arg) {pthread_create_f(thread, attr, start_routine, arg);构建一个图的结点struct source_type v1;v1.id = *thread;v1.type = PROCESS;add_vertex(v1);
}// init
void init_hook(void) {if (!pthread_mutex_lock_f)pthread_mutex_lock_f = dlsym(RTLD_NEXT, "pthread_mutex_lock");if (!pthread_mutex_unlock_f)pthread_mutex_unlock_f = dlsym(RTLD_NEXT, "pthread_mutex_unlock");if (!pthread_create_f) {pthread_create_f = dlsym(RTLD_NEXT, "pthread_create");}}
2.lock_before 、 lock_after、unlock_after的实现,与结构体数组和图都有关。结构体数组需要提供三个接口:增、删、查;图需要提供三个接口:增(点、边)、删(边)、dfs判环。
表提供的接口:
struct rela_node_s {pthread_mutex_t *mtx;pthread_t thid;
};struct rela_node_s rela_table[MAX] = {0};pthread_t search_rela_table(pthread_mutex_t *mtx) {int i = 0;for (i = 0;i < MAX;i ++) {if (mtx == rela_table[i].mtx) {return rela_table[i].thid;}}return 0;
} int del_rela_table(pthread_mutex_t *mtx, pthread_t tid) {int i = 0;for (i = 0;i < MAX;i ++) {if ((mtx == rela_table[i].mtx) && (tid == rela_table[i].thid)) {rela_table[i].mtx = NULL;rela_table[i].thid = 0;return 0;}}return -1;
}int add_rela_table(pthread_mutex_t *mtx, pthread_t tid) {int i = 0;for (i = 0;i < MAX;i ++) {if ((rela_table[i].mtx == NULL) && (rela_table[i].thid == 0)) {rela_table[i].mtx = mtx;rela_table[i].thid = tid;return 0;}}return -1;
}三个函数的实现,理解上份代码注释即可
五、分布式锁
分布式锁是什么?
分布式锁和普通的mutex锁作用一样,也是限制对临界资源的访问,只是管理的范围更大,比如不同网段的分布式集群要对同一个资源进行互斥访问,则需要一个存储在数据中心(如数据库)上的分布式锁,实现互斥。Redis,mysql中都有分布式锁的实现。
Redis是什么?
Redis是一个基于存储,集可持久化、分布式与一体的键值对存储系统,不是Mysql那样的关系型数据库,没有复杂的表连接,优势是高速读写和灵活的数据结构。
分布式锁的特性?
锁超时处理,因为是结点间进程通信,需要通过tcp/udp网络连接,当发起锁请求的结点宕机,在超时的时候有一个“超权限”的结点,来帮助解锁。
互斥性,通过比如一个owner字段,设置持有锁的对象id,可以默认为0时,锁空闲。
可用性,如果存储锁的结点宕机了,要保证锁的可用性。开多个备份点,主结点宕机进行主从切换。
容错性,即多个备份点间的数据一致性,可以通过Raft一致性算法。或者redlock红锁。
redlock是什么?
一个基于 Redis 的分布式锁算法,通过向多个独立节点申请锁,并遵循“多数派”原则来提升可靠性,规避单点故障风险。优点是保障了容错性,缺点是访问多个节点性能下降,部署多个Redis复杂。
Redis分布式锁的实现?
互斥性的实现:由于是key-value型,setnx rlock 10001即对rlock加锁,别的用户再执行setnx rlock 10002会返回失败。
超时处理实现:SET rlock 10001 NX EX 10,NX是当当前键不存在的时候设置,EX 10设置锁过期时间为10s,10s内不DEL rlock,也会自动删除锁。
可用性实现:Redis本身就是集群,有备份
容错性:即redlock的核心思想。部署多个独立的Redis节点,这些节点独立运行没有主从关系;客户端申请锁必须向所有节点申请,只有超过一半的节点都返回成功,才算获得锁成功。
六、内存泄漏检测组件
理论
内存泄漏的原因是什么?
根本原因都是malloc了,但是没有free.
如何才能判断是否存在内存泄漏?有内存泄漏了又如何确定在代码的哪一行?
程序的运行结果不会指示内存泄漏信息,只会显示业务信息。我们需要宏定义改写实现自己的malloc和free函数(获得行号的关键)。在原始的每一次malloc,都会创建一个ptr指针指向分配空间,而我们的malloc,需要创建一个以分配地址为名的文件,并且将void *ptr = malloc(size);在代码中的行号通过编译器自带的__LINE__ 写入到文件里,代码运行的文件名也可以通过__FILE__写入进去。对应的,在我们的free(ptr)函数中,每次调用都要删除名为ptr所指向地址的文件。最终程序运行结束即可通过是否存在文件判断是否存在内存泄漏,通过文件内容确定内存泄漏在代码具体行数。
代码
#define _GNU_SOURCE
#include <dlfcn.h>
#include <link.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>//#define malloc(size) nMalloc(size)
//#define free(ptr) nFree(ptr)void *nMalloc(size_t size, const char *filename, const char *funcname, int line) {void *ptr = malloc(size);char buff[128] = {0};snprintf(buff, 128, "./block/%p.mem", ptr);FILE* fp = fopen(buff, "w");if (!fp) {free(ptr);return NULL;}fprintf(fp, "[+][%s:%s:%d] %p: %ld malloc\n", filename, funcname, line, ptr, size);fflush(fp);fclose(fp);return ptr;
}void nFree(void *ptr, const char *filename, const char *funcname, int line) {char buff[128] = {0};snprintf(buff, 128, "./block/%p.mem", ptr);if (unlink(buff) < 0) { // no existprintf("double free: %p\n", ptr);return ;}return free(ptr);}#define malloc(size) nMalloc(size, __FILE__, __func__, __LINE__)
#define free(ptr) nFree(ptr, __FILE__, __func__, __LINE__)int main() {size_t size = 5;void *p1 = malloc(size);void *p2 = malloc(size * 2);void *p3 = malloc(size * 3);free(p1);free(p3);}