Redis 发布订阅模式详解:实现高效实时消息通信
概述
Redis 发布订阅(Pub/Sub)是一种消息通信模式,发送者(发布者)将消息发送到频道,而接收者(订阅者)可以接收它们。这种模式实现了消息的生产者和消费者之间的完全解耦,是构建实时消息系统的理想选择。
核心概念
1. 频道(Channel)
消息传输的通道,发布者向频道发送消息,订阅者从频道接收消息。
2. 发布者(Publisher)
向频道发送消息的客户端。
3. 订阅者(Subscriber)
订阅频道并接收消息的客户端。
4. 模式订阅(Pattern Subscription)
使用通配符订阅多个匹配的频道。
基本命令
发布消息
PUBLISH channel message
将消息发送到指定频道
返回接收到消息的订阅者数量
订阅频道
SUBSCRIBE channel1 channel2 ...
订阅一个或多个频道
进入订阅状态,阻塞等待消息
取消订阅
UNSUBSCRIBE [channel1 channel2 ...]
取消订阅指定频道
不指定参数则取消所有订阅
模式订阅
PSUBSCRIBE pattern1 pattern2 ...
使用通配符订阅多个频道
支持
*
(匹配任意字符)和?
(匹配单个字符)
查看订阅信息
PUBSUB CHANNELS [pattern] # 查看活跃频道 PUBSUB NUMSUB [channel...] # 查看指定频道的订阅数 PUBSUB NUMPAT # 查看模式订阅的数量
Spring Boot 中的实现
1. 配置 Redis 连接
# application.yml
spring:redis:host: localhostport: 6379password: database: 0
2. Redis 配置类
@Configuration
public class RedisConfig {@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {RedisTemplate<String, Object> template = new RedisTemplate<>();template.setConnectionFactory(factory);template.setKeySerializer(new StringRedisSerializer());template.setValueSerializer(new GenericJackson2JsonRedisSerializer());return template;}@Beanpublic ChannelTopic smsTopic() {return new ChannelTopic("sms:user:accept:query");}
}
3. 消息发布者
@Component
public class MessagePublisher {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ChannelTopic smsTopic;/*** 向指定频道发布消息*/public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);}/*** 向预设频道发布消息*/public void publishSmsMessage(String userId) {redisTemplate.convertAndSend(smsTopic.getTopic(), userId);log.info("向频道 {} 发布消息: {}", smsTopic.getTopic(), userId);}
}
要理解 MessagePublisher
中预设频道和指定频道的区别,核心是从「频道的定义时机」「使用场景」「灵活性与维护性」三个维度切入 —— 两者本质都是基于 Redis 发布订阅(Pub/Sub)机制传递消息,但在「频道值如何确定」和「适用场景」上完全不同。以下结合代码和 Redis Pub/Sub 原理详细讲解:
一、先明确基础:Redis 发布订阅(Pub/Sub)的核心逻辑
在分析两者区别前,需先回顾 Redis Pub/Sub 的基本概念:
- 频道(Channel):消息的 “通道”,类似广播电台的 “频率”,发布者(Publisher)向某个频道发消息,所有订阅该频道的消费者(Subscriber)都会收到消息。
- 发布者:调用
PUBLISH channel message
命令向频道发消息(对应代码中redisTemplate.convertAndSend(...)
)。 - 消费者:调用
SUBSCRIBE channel
命令订阅频道,实时接收该频道的消息。
MessagePublisher
的两个方法,本质都是封装了 Redis 的 PUBLISH
操作,区别仅在于「频道(channel)的值从哪来」。
二、指定频道(publish
方法):动态传入频道,灵活度高
/*** 向指定频道发布消息*/
public void publish(String channel, Object message) {redisTemplate.convertAndSend(channel, message);
}
频道的定义方式:调用时动态传入
- 「指定频道」的核心是:频道的名称(如
channel
参数的值)不是预先固定的,而是在调用publish
方法时,由调用者根据业务需求动态传入。 - 示例:若业务需要向 “用户 123 的短信通知频道” 和 “订单 456 的状态通知频道” 发消息,调用方式如下
// 向“sms_notify_user_123”频道发消息(用户123的短信通知) messagePublisher.publish("sms_notify_user_123", "您有一条新短信"); // 向“order_status_456”频道发消息(订单456的状态更新) messagePublisher.publish("order_status_456", "订单已发货");
- 这里的
sms_notify_user_123
和order_status_456
都是 “指定频道”—— 调用时才确定具体值。
2. 核心特点
特点 | 说明 |
---|---|
灵活性极高 | 支持任意频道名称,可根据业务场景动态生成(如拼接用户 ID、订单 ID) |
无预设依赖 | 不需要提前定义频道,调用时直接传入即可,无配置或注入依赖 |
通用性强 | 可用于所有需要 Pub/Sub 的场景(不仅限于短信,还能用于订单、通知等) |
调用者需感知频道 | 调用者必须明确知道 “要向哪个频道发消息”,需自己管理频道名称的正确性 |
3. 适用场景
- 业务场景不固定,需要动态生成频道的场景(如 “按用户 ID 拆分频道”“按订单 ID 拆分频道”);
- 跨业务复用(同一方法可用于短信、订单、日志等不同模块的消息发布);
- 临时或一次性的消息发布(如调试时向特定测试频道发消息)。
三、预设频道(publishSmsMessage
方法):预先定义频道,专注特定业务
再看 publishSmsMessage
方法的代码:
@Autowired
private ChannelTopic smsTopic; // 注入预设的频道对象/*** 向预设频道发布消息*/
public void publishSmsMessage(String userId) {redisTemplate.convertAndSend(smsTopic.getTopic(), userId);log.info("向频道 {} 发布消息: {}", smsTopic.getTopic(), userId);
}
1. 频道的定义方式:启动时预先注入 / 配置
- 「预设频道」的核心是:频道的名称在项目启动前就已固定(通过配置或代码定义),并通过
ChannelTopic
类封装后注入到MessagePublisher
中,调用时无需传入频道,直接使用预设值。 - 关键依赖
ChannelTopic smsTopic
的定义逻辑(通常在配置类中):
@Configuration
public class RedisPubSubConfig {// 1. 从配置文件读取预设的短信频道名称(如 application.yml 中配置)@Value("${redis.pubsub.sms.topic}")private String smsTopicName;// 2. 定义预设频道的 ChannelTopic 对象,注入Spring容器@Beanpublic ChannelTopic smsTopic() {// 预设频道名称固定为配置文件中的值(如 "sms_user_accept_query")return new ChannelTopic(smsTopicName);}
}
此时 smsTopic.getTopic()
的值是固定的(如 sms_user_accept_query
),调用 publishSmsMessage
时,始终向这个预设频道发消息:
// 调用时只需传入 userId,频道固定为 smsTopic 对应的预设值
messagePublisher.publishSmsMessage("123");
// 实际效果:向 "sms_user_accept_query" 频道发布消息 "123"
2. 核心特点
特点 | 说明 |
---|---|
频道固定 | 频道名称在启动时就已确定,调用时无法修改,专注于特定业务(如短信) |
依赖预设配置 | 需要提前在配置文件或代码中定义频道,通过 Spring 注入 ChannelTopic |
业务关联性强 | 方法名(publishSmsMessage )和频道(smsTopic )都绑定 “短信” 业务,不可跨业务复用 |
调用者无需感知频道 | 调用者只需传入业务参数(如 userId ),无需关心具体频道名称,降低使用成本 |
3. 适用场景
- 业务场景固定,频道无需动态变化的场景(如你之前代码中的 “短信发送请求通知”—— 所有短信请求的消费通知,都通过同一个预设频道触发);
- 专注单一业务模块(如仅用于短信、仅用于订单),避免频道名称混乱;
- 团队协作场景:预先定义频道名称,所有开发者统一使用,避免因频道名称拼写错误导致消息无法接收(如统一用
sms_user_accept_query
,而非有人写sms_accept_user_query
)。
四、预设频道 vs 指定频道:核心区别对比
为了更清晰区分,整理成表格:
对比维度 | 预设频道(publishSmsMessage ) | 指定频道(publish ) |
---|---|---|
频道来源 | 启动时通过配置 / 代码预设,注入 ChannelTopic | 调用时由调用者动态传入 String 类型的频道名称 |
频道灵活性 | 低(固定不变,仅服务特定业务) | 高(可动态生成,支持任意场景) |
业务关联性 | 强(绑定单一业务,如短信) | 弱(通用,可跨业务复用) |
调用复杂度 | 低(只需传业务参数,如 userId ) | 高(需传 “频道 + 业务参数”,需确保频道名称正确) |
维护成本 | 低(频道统一管理,避免拼写错误) | 高(需手动管理频道名称,易因拼写错误导致消息丢失) |
典型使用场景 | 短信发送通知、订单状态变更(固定频道) | 按用户 / 订单拆分的动态通知、临时调试消息 |
4. 消息订阅者
@Component
public class MessageSubscriber extends MessageListenerAdapter {private static final Logger log = LoggerFactory.getLogger(MessageSubscriber.class);/*** 处理接收到的消息*/@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(message.getChannel());String body = new String(message.getBody());log.info("收到消息 - 频道: {}, 内容: {}", channel, body);// 根据不同的频道进行不同的处理handleMessage(channel, body);}/*** 消息处理逻辑*/private void handleMessage(String channel, String message) {switch (channel) {case "sms:user:accept:query":processSmsMessage(message);break;case "order:create:notice":processOrderMessage(message);break;default:log.warn("未知频道: {}", channel);}}private void processSmsMessage(String userId) {log.info("处理用户 {} 的短信消息", userId);// 具体的业务逻辑}private void processOrderMessage(String orderId) {log.info("处理订单 {} 的创建消息", orderId);// 具体的业务逻辑}
}
5. 消息监听容器配置
@Configuration
public class RedisPubSubConfig {@Autowiredprivate RedisConnectionFactory redisConnectionFactory;@Autowiredprivate MessageSubscriber messageSubscriber;@Beanpublic RedisMessageListenerContainer redisContainer() {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(redisConnectionFactory);// 添加频道订阅container.addMessageListener(messageSubscriber, Arrays.asList(new ChannelTopic("sms:user:accept:query"),new ChannelTopic("order:create:notice"),new ChannelTopic("user:status:update")));// 添加模式订阅container.addMessageListener(messageSubscriber, new PatternTopic("logs:*"));return container;}
}
高级特性
1. 模式匹配订阅
// 订阅所有以 sms: 开头的频道
PSUBSCRIBE sms:*// 订阅所有以 :notice 结尾的频道
PSUBSCRIBE *:notice// 订阅符合特定模式的频道
PSUBSCRIBE user:?:status
2. 消息格式设计
建议使用 JSON 格式的消息体:
{"eventType": "USER_SMS_RECEIVED","timestamp": 1621234567890,"data": {"userId": "12345","deviceId": "device_001","messageContent": "Hello World"},"version": "1.0"
}
3. 错误处理和重试机制
@Component
public class ReliableMessageSubscriber {@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 1000))public void handleMessageWithRetry(String message) {try {processMessage(message);} catch (Exception e) {log.error("消息处理失败,将进行重试", e);throw e; // 触发重试}}@Recoverpublic void recover(Exception e, String message) {log.error("消息处理最终失败,存入死信队列: {}", message, e);// 将消息存入死信队列deadLetterQueue.add(message);}
}
实战应用场景
1. 实时消息通知系统
// 发布消息
public void notifyUser(String userId, String message) {String channel = "user:notify:" + userId;redisTemplate.convertAndSend(channel, Map.of("type", "notification", "content", message, "time", System.currentTimeMillis()));
}// 订阅消息
@RedisListener(topic = "user:notify:*")
public void onUserNotification(String channel, Map<String, Object> message) {String userId = channel.substring("user:notify:".length());// 推送消息到用户界面webSocketService.sendToUser(userId, message);
}
2. 分布式系统事件总线
// 系统事件发布
public void publishEvent(SystemEvent event) {redisTemplate.convertAndSend("system:events", Map.of("eventType", event.getType(), "payload", event.getData()));
}// 事件处理
@RedisListener(topic = "system:events")
public void handleSystemEvent(Map<String, Object> event) {String eventType = (String) event.get("eventType");Object payload = event.get("payload");switch (eventType) {case "USER_CREATED":handleUserCreated(payload);break;case "ORDER_PAID":handleOrderPaid(payload);break;// 其他事件处理...}
}
3. 实时数据同步
// 数据变更时发布消息
@Transactional
public void updateUserProfile(User user) {userRepository.save(user);// 发布数据变更消息redisTemplate.convertAndSend("data:sync:user", Map.of("id", user.getId(), "action", "UPDATE", "timestamp", System.currentTimeMillis()));
}// 其他服务监听数据变更
@RedisListener(topic = "data:sync:user")
public void onUserDataChange(Map<String, Object> change) {// 更新本地缓存或索引cacheService.refreshUserCache((Long) change.get("id"));
}
2. 消息压缩
对于大消息,可以先压缩再发送:
public void publishCompressed(String channel, Object message) {String json = objectMapper.writeValueAsString(message);byte[] compressed = compress(json.getBytes());redisTemplate.convertAndSend(channel, compressed);
}
3. 批量处理
@Component
public class BatchMessageProcessor {private final List<String> messageBuffer = new ArrayList<>();private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();@PostConstructpublic void init() {scheduler.scheduleAtFixedRate(this::processBatch, 100, 100, TimeUnit.MILLISECONDS);}@RedisListener(topic = "high:frequency:channel")public void onMessage(String message) {synchronized (messageBuffer) {messageBuffer.add(message);}}private void processBatch() {List<String> batch;synchronized (messageBuffer) {batch = new ArrayList<>(messageBuffer);messageBuffer.clear();}if (!batch.isEmpty()) {businessService.processBatch(batch);}}
}
监控和运维
1. 监控指标
@Component
public class PubSubMonitor {@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Scheduled(fixedRate = 60000) // 每分钟监控一次public void monitorPubSub() {// 监控频道数量Set<String> channels = redisTemplate.execute((RedisCallback<Set<String>>) connection -> connection.pubSubChannels("*".getBytes()).stream().map(bytes -> new String(bytes)).collect(Collectors.toSet()));// 监控消息吞吐量Map<String, Long> messageStats = new HashMap<>();for (String channel : channels) {Long subscribers = redisTemplate.execute((RedisCallback<Long>) connection -> connection.pubSubNumSub(channel.getBytes()).get(channel.getBytes()));messageStats.put(channel, subscribers);}log.info("PubSub 监控 - 频道数: {}, 订阅统计: {}", channels.size(), messageStats);}
}
2. 异常处理
@Configuration
public class RedisErrorConfig {@Beanpublic RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(factory);container.setErrorHandler(new RedisPubSubErrorHandler());return container;}public class RedisPubSubErrorHandler implements ErrorHandler {@Overridepublic void handleError(Throwable t) {log.error("Redis Pub/Sub 错误", t);// 发送告警通知alertService.sendAlert("Redis Pub/Sub 异常", t.getMessage());}}
}
总结
Redis 发布订阅模式提供了强大的实时消息通信能力,具有以下优势:
高性能:基于内存操作,吞吐量高
实时性:消息即时推送,延迟低
解耦性:生产者和消费者完全解耦
灵活性:支持频道和模式订阅
扩展性:易于水平扩展
但在使用时也需要注意:
消息不持久化,重启会丢失
没有消息确认机制
不适合需要严格顺序的场景
通过合理的架构设计和代码实现,Redis Pub/Sub 可以成为构建实时应用系统的强大工具。