【QT入门到晋级】进程间通信(IPC)-共享内存
前言
前面分享了几种IPC通信技术,都有成熟的交互机制(阻塞和非阻塞方式交互),而本文分享的共享内存,更像是系统提供了一张“白纸”,让多个进程自己构建管理及安全机制,而有些场景只需要简单的机制,此时反复利用的共享内存是非常高效的。
概述
共享内存是一种高效的进程间通信(IPC, Interprocess Communication) 机制,允许多个进程访问同一块物理内存区域,从而实现数据共享。它的核心原理涉及 内存映射、内核管理、进程地址空间映射等关键技术。
特性讲解
以下几种IPC的对比分析
特性 | 共享内存 | 管道 | Socket (本地) |
---|---|---|---|
数据拷贝次数 | 0次(直接访问) | 2次(用户态<->内核态) | 2次(用户态<->内核态) |
系统调用 | 无需(访问时) | 每次读写都需要 | 每次读写都需要 |
内核参与 | 仅初始映射和同步 | 全程中介 | 全程中介+协议栈 |
延迟 | 极低(纳秒级,接近RAM速度) | 较低(微秒级) | 高(微秒级甚至毫秒级) |
吞吐量 | 高(由内存带宽决定) | 中等 | 中等(受协议开销影响) |
复杂度 | 高(需自行处理同步和竞态条件) | 低 | 低 |
优点-访问快
共享内存是用户态提供虚拟地址空间映射到物理内存中,此时访问内存不需要IO切换,属于直接访问,即实现了数据零拷贝,而管道和socket都需要调用wirte/read,需要对数据进行用户态<->内核态之间的IO切换,所以IPC中,共享内存处理数据具备低延时、高吞吐的特性。
缺点-复杂度高
共享内存本身不提供任何进程间的同步机制。如果多个进程同时读写同一块区域,会导致数据竞争(Data Race)和混乱。开发者必须使用其他IPC机制(如信号量(Semaphore)、互斥锁(Mutex)、条件变量(Condition Variable))来保护共享内存区域,实现有序的访问。这些同步操作本身也会带来一些开销,但通常远低于数据拷贝的开销。
与之相比,管道和Socket套接字,编程上代码非常简洁,而且内置了同步(阻塞/非阻塞IO)机制,这些机制能满足大部分的通信交互场景。所以如果对延迟不是非常敏感,而更希望是开发起来简单、安全、可移植的话,同一台设备中的进程通信就使用管道,两台不同设备通信就用socket套接字。
总结:共享内存用更复杂的编程模型换取了极致的性能,而管道/Socket用一部分性能换取了更简单、更安全的编程模型。
场景应用
进程内读大文件
视频文件通常都会很大,如果用传统的read(),需要加载N个buff[buff_size]才能把视频文件加载到内存中,而mmap只需要一次映射即可。
#include <iostream>
#include <thread>
#include <atomic>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>// 原子变量控制读线程终止
std::atomic<bool> stop_reading(false);/*** 读线程函数:从内存映射区域读取视频帧* @param mapped_data 内存映射起始地址* @param file_size 视频文件总大小* @param frame_size 每帧数据大小(字节)*/
void read_frames(const char* mapped_data, size_t file_size, int frame_size) {// 按帧大小遍历整个文件for (size_t offset = 0; offset < file_size && !stop_reading; offset += frame_size) {// 计算当前帧起始位置const char* frame = mapped_data + offset;// 实际应用中这里应替换为真实的帧处理逻辑// 例如:解码H.264帧或渲染图像std::cout << "Read frame at offset: " << offset << std::endl;// 模拟30fps的视频播放速率(33ms/帧)std::this_thread::sleep_for(std::chrono::milliseconds(33));}
}int main(int argc, char** argv) {// 参数检查if (argc < 3) {std::cerr << "Usage: " << argv[0] << " <video_file> <frame_size>" << std::endl;return 1;}const char* filename = argv[1]; // 视频文件路径const int frame_size = std::stoi(argv[2]); // 每帧数据大小(字节)// 1. 打开视频文件(只读模式)int fd = open(filename, O_RDONLY);if (fd == -1) {perror("open failed");return 1;}// 2. 获取文件状态信息(主要需要文件大小)struct stat sb;if (fstat(fd, &sb) == -1) {perror("fstat failed");close(fd);return 1;}// 3. 创建内存映射(私有映射,避免修改原文件)char* mapped_data = static_cast<char*>(// mmap参数说明:// - NULL:由内核自动选择映射地址// - sb.st_size:映射区域长度(文件大小)// - PROT_READ:映射区域可读// - MAP_PRIVATE:私有映射(修改不写回文件)// - fd:文件描述符// - 0:偏移量(从文件开头映射)// 返回值:// - 成功:映射区域起始地址(转换为char*类型)// - 失败:MAP_FAILED(值为(void*)-1)mmap(NULL, sb.st_size, PROT_READ, MAP_PRIVATE, fd, 0));//通过fd句柄加载视频文件内容到共享内存中if (mapped_data == MAP_FAILED) {perror("mmap failed");close(fd);return 1;}// 4. 启动读线程(传递映射地址、文件大小和帧大小)std::thread reader(read_frames, mapped_data, sb.st_size, frame_size);// 5. 主线程等待用户输入终止std::cout << "Press Enter to stop reading..." << std::endl;std::cin.get();stop_reading = true; // 通知读线程停止// 6. 清理资源reader.join(); // 等待读线程结束munmap(mapped_data, sb.st_size); // 解除内存映射close(fd); // 关闭文件描述符return 0;
}
mmap与read/write性能对比可以参见博文【QT入门到晋级】内存-CSDN博客
多进程间共享内存
redis是一个高效的缓存数据库,实现热点数据快速响应,也是需要结合shm+mmap来实现的。项目中如果需要某些热点数据能够低延时的响应时(特别是大对象数据),就需要到shm+mmap的场景。
说明:以下代码实现,用到AI辅助实现。
共享内存数据结构定义
// shared_cache.h
#ifndef SHARED_CACHE_H
#define SHARED_CACHE_H#include <stdint.h>
#include <sys/types.h>
#include <pthread.h>#define SHM_CACHE_NAME "/shared_cache_db"
#define MAX_KEY_LENGTH 256
#define MAX_VALUE_SIZE 4096
#define HASH_TABLE_SIZE 1024// 缓存条目结构
typedef struct CacheEntry {char key[MAX_KEY_LENGTH];char value[MAX_VALUE_SIZE];size_t value_size;time_t expiry_time; // 过期时间(0表示永不过期)time_t last_accessed; // 最后访问时间uint32_t access_count; // 访问计数struct CacheEntry* next; // 哈希冲突时使用链表
} CacheEntry;// 共享内存缓存结构
typedef struct SharedCache {CacheEntry* hash_table[HASH_TABLE_SIZE]; // 哈希表pthread_rwlock_t rwlock; // 读写锁(支持多个读者或一个写者)uint64_t total_entries; // 总条目数uint64_t total_hits; // 总命中次数uint64_t total_misses; // 总未命中次数size_t memory_used; // 已使用内存大小size_t memory_limit; // 内存限制int initialized; // 初始化标志
} SharedCache;// 函数声明
int create_shared_cache();
SharedCache* attach_shared_cache();
void detach_shared_cache(SharedCache* cache);
void destroy_shared_cache();// 缓存操作API
int cache_set(SharedCache* cache, const char* key, const char* value, size_t value_size, time_t expiry);
char* cache_get(SharedCache* cache, const char* key, size_t* value_size);
int cache_delete(SharedCache* cache, const char* key);
int cache_exists(SharedCache* cache, const char* key);
uint64_t cache_stats_hits(SharedCache* cache);
uint64_t cache_stats_misses(SharedCache* cache);
void cache_clear_expired(SharedCache* cache);#endif // SHARED_CACHE_H
共享内存管理
// shared_cache.c
#include "shared_cache.h"
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <time.h>// 简单的哈希函数
static uint32_t hash_key(const char* key) {uint32_t hash = 5381;int c;while ((c = *key++)) {hash = ((hash << 5) + hash) + c; // hash * 33 + c}return hash % HASH_TABLE_SIZE;
}// 创建共享内存缓存
int create_shared_cache() {int shm_fd = shm_open(SHM_CACHE_NAME, O_CREAT | O_RDWR, 0666);if (shm_fd == -1) {perror("shm_open");return -1;}if (ftruncate(shm_fd, sizeof(SharedCache)) == -1) {perror("ftruncate");close(shm_fd);return -1;}close(shm_fd);return 0;
}// 附加到共享内存缓存
SharedCache* attach_shared_cache() {int shm_fd = shm_open(SHM_CACHE_NAME, O_RDWR, 0666);if (shm_fd == -1) {perror("shm_open");return NULL;}SharedCache* cache = mmap(NULL, sizeof(SharedCache), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);close(shm_fd);if (cache == MAP_FAILED) {perror("mmap");return NULL;}return cache;
}// 分离共享内存缓存
void detach_shared_cache(SharedCache* cache) {if (cache != NULL) {munmap(cache, sizeof(SharedCache));}
}// 销毁共享内存缓存
void destroy_shared_cache() {shm_unlink(SHM_CACHE_NAME);
}// 初始化共享内存缓存(第一次使用时调用)
void init_shared_cache(SharedCache* cache) {if (cache->initialized) {return;}// 初始化哈希表memset(cache->hash_table, 0, sizeof(cache->hash_table));// 初始化读写锁属性pthread_rwlockattr_t rwlock_attr;pthread_rwlockattr_init(&rwlock_attr);pthread_rwlockattr_setpshared(&rwlock_attr, PTHREAD_PROCESS_SHARED);// 初始化读写锁pthread_rwlock_init(&cache->rwlock, &rwlock_attr);// 初始化统计信息cache->total_entries = 0;cache->total_hits = 0;cache->total_misses = 0;cache->memory_used = 0;cache->memory_limit = 1024 * 1024 * 100; // 100MB默认限制cache->initialized = 1;
}// 设置缓存值
int cache_set(SharedCache* cache, const char* key, const char* value, size_t value_size, time_t expiry) {if (!cache->initialized) {init_shared_cache(cache);}if (value_size > MAX_VALUE_SIZE) {fprintf(stderr, "Value too large: %zu bytes (max: %d)\n", value_size, MAX_VALUE_SIZE);return -1;}// 计算哈希值uint32_t hash = hash_key(key);// 获取写锁pthread_rwlock_wrlock(&cache->rwlock);// 检查是否已存在相同键CacheEntry* entry = cache->hash_table[hash];CacheEntry* prev = NULL;while (entry != NULL) {if (strcmp(entry->key, key) == 0) {// 更新现有条目cache->memory_used -= entry->value_size;memcpy(entry->value, value, value_size);entry->value_size = value_size;entry->expiry_time = expiry;entry->last_accessed = time(NULL);cache->memory_used += value_size;pthread_rwlock_unlock(&cache->rwlock);return 0;}prev = entry;entry = entry->next;}// 创建新条目CacheEntry* new_entry = malloc(sizeof(CacheEntry));if (new_entry == NULL) {pthread_rwlock_unlock(&cache->rwlock);fprintf(stderr, "Failed to allocate memory for cache entry\n");return -1;}strncpy(new_entry->key, key, MAX_KEY_LENGTH - 1);new_entry->key[MAX_KEY_LENGTH - 1] = '\0';memcpy(new_entry->value, value, value_size);new_entry->value_size = value_size;new_entry->expiry_time = expiry;new_entry->last_accessed = time(NULL);new_entry->access_count = 0;new_entry->next = NULL;// 添加到哈希表if (prev == NULL) {cache->hash_table[hash] = new_entry;} else {prev->next = new_entry;}cache->total_entries++;cache->memory_used += sizeof(CacheEntry) + value_size;pthread_rwlock_unlock(&cache->rwlock);return 0;
}// 获取缓存值
char* cache_get(SharedCache* cache, const char* key, size_t* value_size) {if (!cache->initialized) {*value_size = 0;return NULL;}// 计算哈希值uint32_t hash = hash_key(key);// 获取读锁pthread_rwlock_rdlock(&cache->rwlock);CacheEntry* entry = cache->hash_table[hash];while (entry != NULL) {if (strcmp(entry->key, key) == 0) {// 检查是否过期if (entry->expiry_time > 0 && time(NULL) > entry->expiry_time) {cache->total_misses++;pthread_rwlock_unlock(&cache->rwlock);*value_size = 0;return NULL;}// 更新访问信息entry->last_accessed = time(NULL);entry->access_count++;cache->total_hits++;// 返回值*value_size = entry->value_size;// 需要复制值,因为调用者需要释放内存char* value_copy = malloc(entry->value_size);if (value_copy != NULL) {memcpy(value_copy, entry->value, entry->value_size);}pthread_rwlock_unlock(&cache->rwlock);return value_copy;}entry = entry->next;}cache->total_misses++;pthread_rwlock_unlock(&cache->rwlock);*value_size = 0;return NULL;
}// 检查键是否存在
int cache_exists(SharedCache* cache, const char* key) {if (!cache->initialized) {return 0;}uint32_t hash = hash_key(key);pthread_rwlock_rdlock(&cache->rwlock);CacheEntry* entry = cache->hash_table[hash];while (entry != NULL) {if (strcmp(entry->key, key) == 0) {int exists = (entry->expiry_time == 0 || time(NULL) <= entry->expiry_time);pthread_rwlock_unlock(&cache->rwlock);return exists;}entry = entry->next;}pthread_rwlock_unlock(&cache->rwlock);return 0;
}// 删除缓存项
int cache_delete(SharedCache* cache, const char* key) {if (!cache->initialized) {return -1;}uint32_t hash = hash_key(key);pthread_rwlock_wrlock(&cache->rwlock);CacheEntry* entry = cache->hash_table[hash];CacheEntry* prev = NULL;while (entry != NULL) {if (strcmp(entry->key, key) == 0) {if (prev == NULL) {cache->hash_table[hash] = entry->next;} else {prev->next = entry->next;}cache->memory_used -= sizeof(CacheEntry) + entry->value_size;cache->total_entries--;free(entry);pthread_rwlock_unlock(&cache->rwlock);return 0;}prev = entry;entry = entry->next;}pthread_rwlock_unlock(&cache->rwlock);return -1;
}// 获取命中次数统计
uint64_t cache_stats_hits(SharedCache* cache) {if (!cache->initialized) {return 0;}pthread_rwlock_rdlock(&cache->rwlock);uint64_t hits = cache->total_hits;pthread_rwlock_unlock(&cache->rwlock);return hits;
}// 获取未命中次数统计
uint64_t cache_stats_misses(SharedCache* cache) {if (!cache->initialized) {return 0;}pthread_rwlock_rdlock(&cache->rwlock);uint64_t misses = cache->total_misses;pthread_rwlock_unlock(&cache->rwlock);return misses;
}// 清理过期缓存项
void cache_clear_expired(SharedCache* cache) {if (!cache->initialized) {return;}time_t now = time(NULL);pthread_rwlock_wrlock(&cache->rwlock);for (int i = 0; i < HASH_TABLE_SIZE; i++) {CacheEntry* entry = cache->hash_table[i];CacheEntry* prev = NULL;while (entry != NULL) {if (entry->expiry_time > 0 && now > entry->expiry_time) {CacheEntry* to_delete = entry;if (prev == NULL) {cache->hash_table[i] = entry->next;} else {prev->next = entry->next;}entry = entry->next;cache->memory_used -= sizeof(CacheEntry) + to_delete->value_size;cache->total_entries--;free(to_delete);} else {prev = entry;entry = entry->next;}}}pthread_rwlock_unlock(&cache->rwlock);
}
生产者进程
// producer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include "shared_cache.h"// 模拟从数据库或外部源加载热点数据
void load_hot_data(SharedCache* cache) {printf("Producer: Loading hot data into cache...\n");// 模拟一些热点数据cache_set(cache, "user:1001:profile", "{\"name\":\"Alice\",\"email\":\"alice@example.com\"}", 45, 0);cache_set(cache, "user:1002:profile", "{\"name\":\"Bob\",\"email\":\"bob@example.com\"}", 43, 0);cache_set(cache, "product:2001:info", "{\"name\":\"Laptop\",\"price\":999.99}", 35, 0);cache_set(cache, "product:2002:info", "{\"name\":\"Phone\",\"price\":499.99}", 34, 0);// 设置一些有过期时间的数据time_t expiry = time(NULL) + 300; // 5分钟后过期cache_set(cache, "session:abc123", "user_id=1001&role=admin", 22, expiry);printf("Producer: Hot data loaded successfully\n");
}// 定期更新缓存中的数据
void update_cache_data(SharedCache* cache) {static int counter = 0;counter++;char key[50];char value[100];// 模拟更新一些数据snprintf(key, sizeof(key), "stats:request_count");snprintf(value, sizeof(value), "%d", counter);cache_set(cache, key, value, strlen(value) + 1, 0);printf("Producer: Updated %s = %s\n", key, value);
}int main() {printf("Starting producer process...\n");// 创建共享内存缓存if (create_shared_cache() != 0) {fprintf(stderr, "Failed to create shared cache\n");return 1;}// 附加到共享内存缓存SharedCache* cache = attach_shared_cache();if (cache == NULL) {fprintf(stderr, "Failed to attach to shared cache\n");return 1;}// 初始化缓存init_shared_cache(cache);// 加载初始热点数据load_hot_data(cache);// 主循环:定期更新缓存while (1) {sleep(10); // 每10秒更新一次// 清理过期数据cache_clear_expired(cache);// 更新缓存数据update_cache_data(cache);// 输出统计信息uint64_t hits = cache_stats_hits(cache);uint64_t misses = cache_stats_misses(cache);uint64_t total = hits + misses;double hit_rate = total > 0 ? (double)hits / total * 100 : 0;printf("Producer: Cache stats - Hits: %lu, Misses: %lu, Hit Rate: %.2f%%\n", hits, misses, hit_rate);}// 清理资源detach_shared_cache(cache);return 0;
}
消费者进程
// consumer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include "shared_cache.h"void process_user_request(SharedCache* cache, int user_id) {char key[50];snprintf(key, sizeof(key), "user:%d:profile", user_id);size_t value_size;char* value = cache_get(cache, key, &value_size);if (value != NULL) {printf("Consumer %d: Found user profile: %s\n", getpid(), value);free(value);} else {printf("Consumer %d: User profile not found in cache, would query DB\n", getpid());// 这里可以添加从数据库加载数据的逻辑,然后更新缓存}
}void process_product_request(SharedCache* cache, int product_id) {char key[50];snprintf(key, sizeof(key), "product:%d:info", product_id);size_t value_size;char* value = cache_get(cache, key, &value_size);if (value != NULL) {printf("Consumer %d: Found product info: %s\n", getpid(), value);free(value);} else {printf("Consumer %d: Product info not found in cache, would query DB\n", getpid());}
}void process_session_request(SharedCache* cache, const char* session_id) {char key[50];snprintf(key, sizeof(key), "session:%s", session_id);size_t value_size;char* value = cache_get(cache, key, &value_size);if (value != NULL) {printf("Consumer %d: Found session: %s\n", getpid(), value);free(value);} else {printf("Consumer %d: Session not found or expired\n", getpid());}
}int main() {printf("Starting consumer process %d...\n", getpid());// 附加到共享内存缓存SharedCache* cache = attach_shared_cache();if (cache == NULL) {fprintf(stderr, "Failed to attach to shared cache\n");return 1;}// 模拟处理请求int request_count = 0;while (request_count < 20) {// 模拟不同类型的请求int request_type = rand() % 3;switch (request_type) {case 0:process_user_request(cache, 1001 + (rand() % 3));break;case 1:process_product_request(cache, 2001 + (rand() % 3));break;case 2:process_session_request(cache, "abc123");break;}request_count++;sleep(1 + (rand() % 2)); // 随机等待1-2秒}// 输出消费者统计uint64_t hits = cache_stats_hits(cache);uint64_t misses = cache_stats_misses(cache);uint64_t total = hits + misses;double hit_rate = total > 0 ? (double)hits / total * 100 : 0;printf("Consumer %d: Final stats - Hits: %lu, Misses: %lu, Hit Rate: %.2f%%\n", getpid(), hits, misses, hit_rate);// 清理资源detach_shared_cache(cache);return 0;
}
编译及运行
# 编译共享缓存库
gcc -c -fPIC shared_cache.c -o shared_cache.o -lpthread
gcc -shared -o libsharedcache.so shared_cache.o -lpthread# 编译生产者
gcc -o producer producer.c -L. -lsharedcache -lpthread# 编译消费者
gcc -o consumer consumer.c -L. -lsharedcache -lpthread