当前位置: 首页 > news >正文

Redis 发布订阅模式详解:实现高效实时消息通信

概述

Redis 发布订阅(Pub/Sub)是一种消息通信模式,发送者(发布者)将消息发送到频道,而接收者(订阅者)可以接收它们。这种模式实现了消息的生产者和消费者之间的完全解耦,是构建实时消息系统的理想选择。

核心概念

1. 频道(Channel)

消息传输的通道,发布者向频道发送消息,订阅者从频道接收消息。

2. 发布者(Publisher)

向频道发送消息的客户端。

3. 订阅者(Subscriber)

订阅频道并接收消息的客户端。

4. 模式订阅(Pattern Subscription)

使用通配符订阅多个匹配的频道。

基本命令

发布消息

PUBLISH channel message
  • 将消息发送到指定频道

  • 返回接收到消息的订阅者数量

订阅频道

SUBSCRIBE channel1 channel2 ...
  • 订阅一个或多个频道

  • 进入订阅状态,阻塞等待消息

取消订阅

UNSUBSCRIBE [channel1 channel2 ...]
  • 取消订阅指定频道

  • 不指定参数则取消所有订阅

模式订阅

PSUBSCRIBE pattern1 pattern2 ...
  • 使用通配符订阅多个频道

  • 支持 *(匹配任意字符)和 ?(匹配单个字符)

查看订阅信息

PUBSUB CHANNELS [pattern]  # 查看活跃频道
PUBSUB NUMSUB [channel...] # 查看指定频道的订阅数
PUBSUB NUMPAT              # 查看模式订阅的数量

Spring Boot 中的实现

1. 配置 Redis 连接

# application.yml
spring:redis:host: localhostport: 6379password: database: 0

2. Redis 配置类

@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}@Beanpublic ChannelTopic smsTopic() {return new ChannelTopic("sms:user:accept:query");}
}

3. 消息发布者

@Component
public class MessagePublisher {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ChannelTopic smsTopic;/*** 向指定频道发布消息*/public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);}/*** 向预设频道发布消息*/public void publishSmsMessage(String userId) {redisTemplate.convertAndSend(smsTopic.getTopic(), userId);log.info("向频道 {} 发布消息: {}", smsTopic.getTopic(), userId);}
}

要理解 MessagePublisher 中预设频道指定频道的区别,核心是从「频道的定义时机」「使用场景」「灵活性与维护性」三个维度切入 —— 两者本质都是基于 Redis 发布订阅(Pub/Sub)机制传递消息,但在「频道值如何确定」和「适用场景」上完全不同。以下结合代码和 Redis Pub/Sub 原理详细讲解:

一、先明确基础:Redis 发布订阅(Pub/Sub)的核心逻辑

在分析两者区别前,需先回顾 Redis Pub/Sub 的基本概念:

  • 频道(Channel):消息的 “通道”,类似广播电台的 “频率”,发布者(Publisher)向某个频道发消息,所有订阅该频道的消费者(Subscriber)都会收到消息。
  • 发布者:调用 PUBLISH channel message 命令向频道发消息(对应代码中 redisTemplate.convertAndSend(...))。
  • 消费者:调用 SUBSCRIBE channel 命令订阅频道,实时接收该频道的消息。

MessagePublisher 的两个方法,本质都是封装了 Redis 的 PUBLISH 操作,区别仅在于「频道(channel)的值从哪来」。

二、指定频道(publish 方法):动态传入频道,灵活度高

/*** 向指定频道发布消息*/
public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);
}
 频道的定义方式:调用时动态传入
  • 「指定频道」的核心是:频道的名称(如 channel 参数的值)不是预先固定的,而是在调用 publish 方法时,由调用者根据业务需求动态传入
  • 示例:若业务需要向 “用户 123 的短信通知频道” 和 “订单 456 的状态通知频道” 发消息,调用方式如下
    // 向“sms_notify_user_123”频道发消息(用户123的短信通知)
    messagePublisher.publish("sms_notify_user_123", "您有一条新短信");
    // 向“order_status_456”频道发消息(订单456的状态更新)
    messagePublisher.publish("order_status_456", "订单已发货");
  • 这里的 sms_notify_user_123 和 order_status_456 都是 “指定频道”—— 调用时才确定具体值。
2. 核心特点
特点说明
灵活性极高支持任意频道名称,可根据业务场景动态生成(如拼接用户 ID、订单 ID)
无预设依赖不需要提前定义频道,调用时直接传入即可,无配置或注入依赖
通用性强可用于所有需要 Pub/Sub 的场景(不仅限于短信,还能用于订单、通知等)
调用者需感知频道调用者必须明确知道 “要向哪个频道发消息”,需自己管理频道名称的正确性
3. 适用场景
  • 业务场景不固定,需要动态生成频道的场景(如 “按用户 ID 拆分频道”“按订单 ID 拆分频道”);
  • 跨业务复用(同一方法可用于短信、订单、日志等不同模块的消息发布);
  • 临时或一次性的消息发布(如调试时向特定测试频道发消息)。

三、预设频道(publishSmsMessage 方法):预先定义频道,专注特定业务

再看 publishSmsMessage 方法的代码:

@Autowired
private ChannelTopic smsTopic; // 注入预设的频道对象/*** 向预设频道发布消息*/
public void publishSmsMessage(String userId) {redisTemplate.convertAndSend(smsTopic.getTopic(), userId);log.info("向频道 {} 发布消息: {}", smsTopic.getTopic(), userId);
}
1. 频道的定义方式:启动时预先注入 / 配置
  • 「预设频道」的核心是:频道的名称在项目启动前就已固定(通过配置或代码定义),并通过 ChannelTopic 类封装后注入到 MessagePublisher 中,调用时无需传入频道,直接使用预设值
  • 关键依赖 ChannelTopic smsTopic 的定义逻辑(通常在配置类中):
@Configuration
public class RedisPubSubConfig {// 1. 从配置文件读取预设的短信频道名称(如 application.yml 中配置)@Value("${redis.pubsub.sms.topic}")private String smsTopicName;// 2. 定义预设频道的 ChannelTopic 对象,注入Spring容器@Beanpublic ChannelTopic smsTopic() {// 预设频道名称固定为配置文件中的值(如 "sms_user_accept_query")return new ChannelTopic(smsTopicName);}
}

此时 smsTopic.getTopic() 的值是固定的(如 sms_user_accept_query),调用 publishSmsMessage 时,始终向这个预设频道发消息:

// 调用时只需传入 userId,频道固定为 smsTopic 对应的预设值
messagePublisher.publishSmsMessage("123"); 
// 实际效果:向 "sms_user_accept_query" 频道发布消息 "123"
2. 核心特点
特点说明
频道固定频道名称在启动时就已确定,调用时无法修改,专注于特定业务(如短信)
依赖预设配置需要提前在配置文件或代码中定义频道,通过 Spring 注入 ChannelTopic
业务关联性强方法名(publishSmsMessage)和频道(smsTopic)都绑定 “短信” 业务,不可跨业务复用
调用者无需感知频道调用者只需传入业务参数(如 userId),无需关心具体频道名称,降低使用成本
3. 适用场景
  • 业务场景固定,频道无需动态变化的场景(如你之前代码中的 “短信发送请求通知”—— 所有短信请求的消费通知,都通过同一个预设频道触发);
  • 专注单一业务模块(如仅用于短信、仅用于订单),避免频道名称混乱;
  • 团队协作场景:预先定义频道名称,所有开发者统一使用,避免因频道名称拼写错误导致消息无法接收(如统一用 sms_user_accept_query,而非有人写 sms_accept_user_query)。

四、预设频道 vs 指定频道:核心区别对比

为了更清晰区分,整理成表格:

对比维度预设频道(publishSmsMessage指定频道(publish
频道来源启动时通过配置 / 代码预设,注入 ChannelTopic调用时由调用者动态传入 String 类型的频道名称
频道灵活性低(固定不变,仅服务特定业务)高(可动态生成,支持任意场景)
业务关联性强(绑定单一业务,如短信)弱(通用,可跨业务复用)
调用复杂度低(只需传业务参数,如 userId高(需传 “频道 + 业务参数”,需确保频道名称正确)
维护成本低(频道统一管理,避免拼写错误)高(需手动管理频道名称,易因拼写错误导致消息丢失)
典型使用场景短信发送通知、订单状态变更(固定频道)按用户 / 订单拆分的动态通知、临时调试消息

4. 消息订阅者

@Component
public class MessageSubscriber extends MessageListenerAdapter {private static final Logger log = LoggerFactory.getLogger(MessageSubscriber.class);/*** 处理接收到的消息*/@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String body = new String(message.getBody());log.info("收到消息 - 频道: {}, 内容: {}", channel, body);// 根据不同的频道进行不同的处理handleMessage(channel, body);}/*** 消息处理逻辑*/private void handleMessage(String channel, String message) {switch (channel) {case "sms:user:accept:query":processSmsMessage(message);break;case "order:create:notice":processOrderMessage(message);break;default:log.warn("未知频道: {}", channel);}}private void processSmsMessage(String userId) {log.info("处理用户 {} 的短信消息", userId);// 具体的业务逻辑}private void processOrderMessage(String orderId) {log.info("处理订单 {} 的创建消息", orderId);// 具体的业务逻辑}
}

5. 消息监听容器配置

@Configuration
public class RedisPubSubConfig {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Autowiredprivate MessageSubscriber messageSubscriber;@Beanpublic RedisMessageListenerContainer redisContainer() {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 添加频道订阅container.addMessageListener(messageSubscriber, Arrays.asList(new ChannelTopic("sms:user:accept:query"),new ChannelTopic("order:create:notice"),new ChannelTopic("user:status:update")));// 添加模式订阅container.addMessageListener(messageSubscriber, new PatternTopic("logs:*"));return container;}
}

高级特性

1. 模式匹配订阅

// 订阅所有以 sms: 开头的频道
PSUBSCRIBE sms:*

// 订阅所有以 :notice 结尾的频道  
PSUBSCRIBE *:notice

// 订阅符合特定模式的频道
PSUBSCRIBE user:?:status

2. 消息格式设计

建议使用 JSON 格式的消息体:

{"eventType": "USER_SMS_RECEIVED","timestamp": 1621234567890,"data": {"userId": "12345","deviceId": "device_001","messageContent": "Hello World"},"version": "1.0"
}

3. 错误处理和重试机制

@Component
public class ReliableMessageSubscriber {@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))public void handleMessageWithRetry(String message) {try {processMessage(message);} catch (Exception e) {log.error("消息处理失败,将进行重试", e);throw e; // 触发重试}}@Recoverpublic void recover(Exception e, String message) {log.error("消息处理最终失败,存入死信队列: {}", message, e);// 将消息存入死信队列deadLetterQueue.add(message);}
}

实战应用场景

1. 实时消息通知系统

// 发布消息
public void notifyUser(String userId, String message) {String channel = "user:notify:" + userId;redisTemplate.convertAndSend(channel, Map.of("type", "notification", "content", message, "time", System.currentTimeMillis()));
}// 订阅消息
@RedisListener(topic = "user:notify:*")
public void onUserNotification(String channel, Map<String, Object> message) {String userId = channel.substring("user:notify:".length());// 推送消息到用户界面webSocketService.sendToUser(userId, message);
}

2. 分布式系统事件总线

// 系统事件发布
public void publishEvent(SystemEvent event) {redisTemplate.convertAndSend("system:events", Map.of("eventType", event.getType(), "payload", event.getData()));
}// 事件处理
@RedisListener(topic = "system:events")
public void handleSystemEvent(Map<String, Object> event) {String eventType = (String) event.get("eventType");Object payload = event.get("payload");switch (eventType) {case "USER_CREATED":handleUserCreated(payload);break;case "ORDER_PAID":handleOrderPaid(payload);break;// 其他事件处理...}
}

3. 实时数据同步

// 数据变更时发布消息
@Transactional
public void updateUserProfile(User user) {userRepository.save(user);// 发布数据变更消息redisTemplate.convertAndSend("data:sync:user", Map.of("id", user.getId(), "action", "UPDATE", "timestamp", System.currentTimeMillis()));
}// 其他服务监听数据变更
@RedisListener(topic = "data:sync:user")
public void onUserDataChange(Map<String, Object> change) {// 更新本地缓存或索引cacheService.refreshUserCache((Long) change.get("id"));
}

2. 消息压缩

对于大消息,可以先压缩再发送:

public void publishCompressed(String channel, Object message) {String json = objectMapper.writeValueAsString(message);byte[] compressed = compress(json.getBytes());redisTemplate.convertAndSend(channel, compressed);
}

3. 批量处理

@Component
public class BatchMessageProcessor {private final List<String> messageBuffer = new ArrayList<>();private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();@PostConstructpublic void init() {scheduler.scheduleAtFixedRate(this::processBatch, 100, 100, TimeUnit.MILLISECONDS);}@RedisListener(topic = "high:frequency:channel")public void onMessage(String message) {synchronized (messageBuffer) {messageBuffer.add(message);}}private void processBatch() {List<String> batch;synchronized (messageBuffer) {batch = new ArrayList<>(messageBuffer);messageBuffer.clear();}if (!batch.isEmpty()) {businessService.processBatch(batch);}}
}

监控和运维

1. 监控指标

@Component
public class PubSubMonitor {@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Scheduled(fixedRate = 60000) // 每分钟监控一次public void monitorPubSub() {// 监控频道数量Set<String> channels = redisTemplate.execute((RedisCallback<Set<String>>) connection -> connection.pubSubChannels("*".getBytes()).stream().map(bytes -> new String(bytes)).collect(Collectors.toSet()));// 监控消息吞吐量Map<String, Long> messageStats = new HashMap<>();for (String channel : channels) {Long subscribers = redisTemplate.execute((RedisCallback<Long>) connection -> connection.pubSubNumSub(channel.getBytes()).get(channel.getBytes()));messageStats.put(channel, subscribers);}log.info("PubSub 监控 - 频道数: {}, 订阅统计: {}", channels.size(), messageStats);}
}

2. 异常处理

@Configuration
public class RedisErrorConfig {@Beanpublic RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);container.setErrorHandler(new RedisPubSubErrorHandler());return container;}public class RedisPubSubErrorHandler implements ErrorHandler {@Overridepublic void handleError(Throwable t) {log.error("Redis Pub/Sub 错误", t);// 发送告警通知alertService.sendAlert("Redis Pub/Sub 异常", t.getMessage());}}
}

总结

Redis 发布订阅模式提供了强大的实时消息通信能力,具有以下优势:

  1. 高性能:基于内存操作,吞吐量高

  2. 实时性:消息即时推送,延迟低

  3. 解耦性:生产者和消费者完全解耦

  4. 灵活性:支持频道和模式订阅

  5. 扩展性:易于水平扩展

但在使用时也需要注意:

  • 消息不持久化,重启会丢失

  • 没有消息确认机制

  • 不适合需要严格顺序的场景

通过合理的架构设计和代码实现,Redis Pub/Sub 可以成为构建实时应用系统的强大工具。

http://www.xdnf.cn/news/1449721.html

相关文章:

  • TDengine TIMEDIFF() 函数用户使用手册
  • 66认知诊断模型发展与NeuralCD框架笔记
  • FPGA笔试面试常考问题及答案汇总
  • 无穿戴动捕如何深度结合AI数据分析,实现精准动作评估?
  • DOM常见的操作有哪些?
  • 还在 @AfterEach 里手动 deleteAll()?你早就该试试这个测试数据清理 Starter 了
  • leetcode110. 平衡二叉树
  • mysql常见面试题
  • [光学原理与应用-376]:ZEMAX - 优化 - 概述
  • 代码随想录算法训练营第四天|链表part02
  • SQLint3 模块如何使用
  • PostgreSQL 技术峰会哈尔滨站活动回顾|深度参与 IvorySQL 开源社区建设的实践与思考
  • 农业XR数字融合工作站,赋能农业专业实践学习
  • 刻意练习实践说明使用手册
  • 正则表达式的使用
  • Java jar 如何防止被反编译?代码写的太烂,害怕被人发现
  • TDD测试驱动开发+Python案例解析
  • 在linux下使用MySQL常用的命令集合
  • 通义实验室发布AgentScope 1.0新一代智能体开发框架
  • 嵌入式第四十二天(数据库,网页设计)
  • Spring Boot集成Kafka常见业务场景最佳实践实战指南
  • Java全栈工程师的面试实战:从基础到复杂问题的完整解析
  • 安卓APP备案的三要素包名,公钥,签名md5值详细获取方法-优雅草卓伊凡
  • 鹧鸪云软件:光伏施工管理一目了然,进度尽在掌握
  • 涉私数据安全与可控匿名化利用机制研究(下)
  • Selenium WebUI 自动化“避坑”指南——从常用 API 到 10 大高频问题
  • 本地化AI问答:告别云端依赖,用ChromaDB + HuggingFace Transformers 搭建离线RAG检索系统
  • 科技信息差(9.3)
  • uni app 的app端 写入运行日志到指定文件夹。
  • Linux学习:生产者消费者模型