当前位置: 首页 > news >正文

Redis实现数据传输简介

Redis 可以通过其内置的数据结构和消息机制实现高效的数据传输,尤其适合跨进程、跨服务的实时通信。以下是详细的使用方法和示例,涵盖 队列、发布/订阅、流(Stream) 三种主要模式,并附上 C++ 实现代码


1. 使用 Redis List 实现队列传输

原理

  • 生产者:通过 LPUSH 或 RPUSH 将数据插入队列。
  • 消费者:通过 RPOP 或 LPOP 从队列取出数据。
  • 阻塞操作BLPOP/BRPOP 可避免轮询,节省 CPU 资源。

适用场景

  • 任务队列(如异步任务处理)。
  • 日志收集。
  • 跨服务数据同步。

示例代码(C++)

(1) 生产者(发送数据)
#include <hiredis/hiredis.h>
#include <iostream>
void producer() {
redisContext *c = redisConnect("127.0.0.1", 6379);
if (!c || c->err) {
std::cerr << "Connection error: " << (c ? c->errstr : "unknown") << std::endl;
return;
}
// 插入数据到队列头部(LPUSH)
redisReply *reply = (redisReply *)redisCommand(c, "LPUSH my_queue data1");
freeReplyObject(reply);
reply = (redisReply *)redisCommand(c, "LPUSH my_queue data2");
freeReplyObject(reply);
std::cout << "Producer: Sent data to queue." << std::endl;
redisFree(c);
}
(2) 消费者(接收数据)
void consumer() {
redisContext *c = redisConnect("127.0.0.1", 6379);
if (!c || c->err) {
std::cerr << "Connection error: " << (c ? c->errstr : "unknown") << std::endl;
return;
}
// 阻塞式获取数据(BRPOP,超时时间 0 表示无限等待)
redisReply *reply = (redisReply *)redisCommand(c, "BRPOP my_queue 0");
if (reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 2) {
std::cout << "Consumer: Received -> " << reply->element[1]->str << std::endl;
} else {
std::cout << "Consumer: Queue is empty or error occurred." << std::endl;
}
freeReplyObject(reply);
redisFree(c);
}
(3) 完整流程
int main() {
// 启动生产者(终端1)
// producer();
// 启动消费者(终端2)
consumer();
return 0;
}

2. 使用 Redis Pub/Sub 实现发布/订阅传输

原理

  • 发布者:通过 PUBLISH 向频道(channel)发送消息。
  • 订阅者:通过 SUBSCRIBE 监听频道并接收消息。
  • 特点
    • 实时性强,但消息不持久化(离线订阅者会丢失消息)。
    • 支持多对多通信(一个频道可被多个订阅者监听)。

适用场景

  • 实时通知(如用户上线提醒)。
  • 事件广播(如系统状态变更)。

示例代码(C++)

(1) 发布者
#include <hiredis/hiredis.h>
#include <iostream>
void publisher() {
redisContext *c = redisConnect("127.0.0.1", 6379);
if (!c || c->err) {
std::cerr << "Connection error: " << (c ? c->errstr : "unknown") << std::endl;
return;
}
// 发布消息到频道
redisReply *reply = (redisReply *)redisCommand(c, "PUBLISH my_channel Hello, Redis!");
if (reply && reply->type == REDIS_REPLY_INTEGER) {
std::cout << "Publisher: Sent message to " << reply->integer << " subscribers." << std::endl;
}
freeReplyObject(reply);
redisFree(c);
}
(2) 订阅者
#include <hiredis/hiredis.h>
#include <iostream>
#include <thread>
#include <atomic>
std::atomic<bool> running(true);
void subscriber() {
redisContext *c = redisConnect("127.0.0.1", 6379);
if (!c || c->err) {
std::cerr << "Connection error: " << (c ? c->errstr : "unknown") << std::endl;
return;
}
// 订阅频道(hiredis 需要轮询获取消息)
while (running) {
redisReply *reply = (redisReply *)redisCommand(c, "SUBSCRIBE my_channel");
if (reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 3) {
std::string type(reply->element[0]->str);
if (type == "message") {
std::cout << "Subscriber: Received -> " << reply->element[2]->str << std::endl;
}
}
freeReplyObject(reply);
// 实际项目中建议用异步库(如 cpp_redis)或事件驱动模型
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
redisFree(c);
}
(3) 完整流程(需多线程)
#include <thread>
int main() {
// 启动订阅者(终端1)
std::thread sub_thread(subscriber);
// 启动发布者(终端2)
publisher();
// 停止订阅者(实际项目中可通过信号或条件变量控制)
std::this_thread::sleep_for(std::chrono::seconds(1));
running = false;
sub_thread.join();
return 0;
}

注意hiredis 的 SUBSCRIBE 需要轮询,实际项目中建议使用异步库(如 cpp_redis)或事件驱动模型。


3. 使用 Redis Stream 实现消息队列(高级)

原理

  • 生产者:通过 XADD 向流(stream)添加消息。
  • 消费者:通过 XREAD 或消费者组(XGROUP CREATE + XREADGROUP)读取消息。
  • 特点
    • 消息持久化(可回溯)。
    • 支持消费者组(避免重复消费)。
    • 类似 Kafka 的轻量级实现。

适用场景

  • 需要消息确认和重试的场景(如订单处理)。
  • 高可靠性要求的日志流。

示例代码(C++)

(1) 生产者
#include <hiredis/hiredis.h>
#include <iostream>
void stream_producer() {
redisContext *c = redisConnect("127.0.0.1", 6379);
if (!c || c->err) {
std::cerr << "Connection error: " << (c ? c->errstr : "unknown") << std::endl;
return;
}
// 添加消息到流(* 表示自动生成 ID)
redisReply *reply = (redisReply *)redisCommand(c, "XADD mystream * field1 value1 field2 value2");
if (reply && reply->type == REDIS_REPLY_STRING) {
std::cout << "Stream Producer: Added message with ID -> " << reply->str << std::endl;
}
freeReplyObject(reply);
redisFree(c);
}
(2) 消费者(简单读取)
void stream_consumer_simple() {
redisContext *c = redisConnect("127.0.0.1", 6379);
if (!c || c->err) {
std::cerr << "Connection error: " << (c ? c->errstr : "unknown") << std::endl;
return;
}
// 读取流中的最新消息(COUNT 1 表示最多 1 条)
redisReply *reply = (redisReply *)redisCommand(c, "XREAD COUNT 1 STREAMS mystream 0");
if (reply && reply->type == REDIS_REPLY_ARRAY && reply->elements == 1) {
redisReply *stream_array = reply->element[0];
if (stream_array->type == REDIS_REPLY_ARRAY && stream_array->elements == 2) {
std::string stream_name(stream_array->element[0]->str);
redisReply *messages = stream_array->element[1];
if (messages->type == REDIS_REPLY_ARRAY && messages->elements > 0) {
std::cout << "Stream Consumer: Received message ID -> "
<< messages->element[0]->element[0]->str << std::endl;
}
}
}
freeReplyObject(reply);
redisFree(c);
}
(3) 消费者组(高级用法)
// 创建消费者组(只需执行一次)
void create_consumer_group() {
redisContext *c = redisConnect("127.0.0.1", 6379);
redisCommand(c, "XGROUP CREATE mystream mygroup 0 MKSTREAM");
redisFree(c);
}
// 消费者组读取消息
void stream_consumer_group() {
redisContext *c = redisConnect("127.0.1", 6379);
if (!c || c->err) return;
// 从消费者组读取消息(BLOCK 0 表示无限阻塞)
redisReply *reply = (redisReply *)redisCommand(
c, "XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >"
);
// 处理消息...
freeReplyObject(reply);
redisFree(c);
}

4. 三种传输方式对比

方式持久化消费者组延迟适用场景
List简单队列、任务调度
Pub/Sub极低实时通知、事件广播
Stream高可靠性消息队列

5. 最佳实践建议

  1. 简单队列:优先用 List + BRPOP
  2. 实时通知:用 Pub/Sub,但需容忍消息丢失。
  3. 高可靠性队列:用 Stream + 消费者组。
  4. 性能优化
    • 批量操作(MGET/MSET)。
    • 使用 Pipeline 减少网络往返。
  5. 错误处理
    • 检查 Redis 命令返回值(如 NULL 或错误类型)。
    • 重试机制(如指数退避)。

总结

  • Redis List:适合简单队列,实现容易。
  • Redis Pub/Sub:适合实时通知,但消息不持久化。
  • Redis Stream:适合高可靠性消息队列,功能最强大。
  • C++ 客户端:推荐 cpp_redis 或 redis-plus-plus 简化异步操作。

根据业务需求选择合适的模式,并结合错误处理和性能优化,即可高效使用 Redis 进行数据传输!

http://www.xdnf.cn/news/1214695.html

相关文章:

  • jmeter读取上游接口并遍历数组数据并进行压测
  • 【Qt】QTime::toString(“hh:mm:ss.zzz“) 显示乱码的原因与解决方案
  • 学习游戏制作记录(冻结敌人时间与黑洞技能)7.30
  • 基于C-MTEB/CMedQAv2-rerankingv的Qwen3-1.7b模型微调-demo
  • 深度学习与图像处理案例 │ 图像分类(智能垃圾分拣器)
  • 通达OA服务器无公网IP网络,如何通过内网穿透实现外网远程办公访问OA系统
  • 三十二、【Linux网站服务器】搭建httpd服务器演示虚拟主机配置、网页重定向功能
  • [25-cv-08377]Hublot手表商标带着14把“死神镰刀“来收割权!卖家速逃!
  • Dify 从入门到精通(第 4/100 篇):快速上手 Dify 云端:5 分钟创建第一个应用
  • Python爬虫04_Requests豆瓣电影爬取
  • 下拉加载问题
  • 电商项目_核心业务_分布式事务
  • 【AI论文】单一领域能否助力其他领域?一项基于数据的、通过强化学习实现多领域推理的研究
  • 少林寺用什么数据库?
  • web:html表单提交数据
  • 亚马逊广告进阶指南:如何合理调配预算
  • 网络的学习 2 Socket
  • 深入剖析 RocketMQ 分布式事务:原理、流程与实践
  • GitPython02-Git使用方式
  • 大模型对比评测:Qwen2.5 VS Gemini 2.0谁更能打?
  • 《C++二叉搜索树原理剖析:从原理到高效实现教学》
  • 基于 Amazon Bedrock 与 Anthropic Claude 3 智能文档处理方案:从扫描件提取到数据入库全流程实践
  • 智能Agent场景实战指南 Day 26:Agent评估与性能优化
  • Python正则表达式精准匹配独立单词技巧
  • 【Dolphinscheduler】docker搭建dolphinscheduler集群并与安全的CDH集成
  • python | numpy小记(八):理解 NumPy 中的 `np.meshgrid`
  • 嵌入式linux驱动开发:什么是Linux驱动?深度解析与实战入门
  • 如何通过IT-Tools与CPolar构建无缝开发通道?
  • OriGene:一种可自进化的虚拟疾病生物学家,实现治疗靶点发现自动化
  • 【ESP32设备通信】-LAN8720与ESP32集成