引入 Kafka 消息队列解耦热点操作
✨ 在在线视频平台中引入 Kafka 消息队列解耦热点操作的实践
📌 背景简介
在我们构建 NovaTube 在线视频分享平台过程中,点赞、评论、弹幕、播放记录等用户操作频繁,尤其在用户量上升后,出现了以下问题:
- 点赞请求峰值达到上千 QPS,数据库写入压力骤增
- 弹幕写入请求频繁,偶发出现主从同步延迟
- 播放记录写入直接影响视频播放体验
- 用户评论操作影响接口整体响应时间
这些操作共同的特点是:
- 写操作频繁
- 实时性要求不高
- 可异步处理
于是,我们决定引入 Kafka 消息队列对这些热点写操作进行解耦和异步化处理,优化系统性能、增强可扩展性与容错能力。
🎯 技术目标
- 降低数据库压力,防止写操作击穿
- 提高接口响应速度,提升用户体验
- 实现异步解耦、模块职责清晰
- 为后续功能扩展预留处理链路(如点赞反作弊、弹幕审核)
🏗️ 系统架构设计
以下是 NovaTube 在点赞场景下的 Kafka 解耦架构图:
[Client]↓ 点赞请求
[网关层] + [Sentinel限流]↓
[主站服务 Controller]↓
KafkaTemplate.send() 发送到 topic: like-events↓
[Kafka Broker]↓
[点赞消费者服务]↓
MySQL/Redis 数据持久化 + 缓存更新
说明:
- 点赞请求不再直接写库,而是快速封装为事件消息,发送至 Kafka
- 消费者服务异步消费,集中处理点赞写入逻辑
- 可通过 Redis 实时展示点赞数,定时同步至数据库
🔧 实现过程详解
1. 点赞事件模型定义
@Data
public class LikeEvent {private Long userId;private String videoId;private Long timestamp;
}
2. 生产者:发送点赞消息
@RestController
public class LikeController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/like")public ResponseEntity<?> like(@RequestParam String videoId, HttpServletRequest request) {Long userId = getUserIdFromRequest(request);LikeEvent event = new LikeEvent(userId, videoId, System.currentTimeMillis());kafkaTemplate.send("like-events", JSON.toJSONString(event));return ResponseEntity.ok("点赞成功");}
}
特点:
- 非阻塞、快速响应
- 即使 Kafka 临时阻塞也可降级处理(本地缓存/失败重试)
3. 消费者:异步写入数据库
@Component
public class LikeConsumer {@Autowiredprivate LikeService likeService;@KafkaListener(topics = "like-events", groupId = "nova-like-group")public void consumeLikeEvent(String message) {LikeEvent event = JSON.parseObject(message, LikeEvent.class);likeService.saveUserLike(event.getUserId(), event.getVideoId());}
}
4. 点赞写入逻辑
@Service
public class LikeService {@Autowiredprivate LikeRepository likeRepository;public void saveUserLike(Long userId, String videoId) {if (!likeRepository.existsByUserIdAndVideoId(userId, videoId)) {likeRepository.save(new UserLike(userId, videoId));}}
}
可进一步优化:
- 增加幂等校验
- Redis缓存点赞状态
- 批量消费与批量入库
💡 异步解耦的优势
优势 | 说明 |
---|---|
⚡ 提升响应速度 | 请求快速返回,主链路无阻塞 |
🔧 解耦架构 | 点赞、弹幕、评论等模块独立演化 |
🔁 削峰填谷 | 高并发场景下缓冲请求,防止写库瓶颈 |
🔐 容错增强 | Kafka 持久化保障消息不丢失,可重试 |
📈 可扩展性好 | 易于接入日志分析、反作弊、通知等模块 |
🧠 可扩展的场景
除了点赞,我们还将 Kafka 运用到以下场景中:
功能模块 | Kafka Topic | 说明 |
---|---|---|
弹幕发送 | danmu-events | 异步写入 + 敏感词审核 |
评论发布 | comment-events | 异步发布 + 通知推送 |
播放记录 | play-record-events | 异步记录观看历史,减少实时写库压力 |
📊 Kafka 运维与监控建议
- 使用 Kafka Manager / Confluent Control Center 管理 Topic、分区、副本
- 接入 Prometheus + Grafana 实时监控消费速率、堆积量、延迟等指标
- 设置 消费者组分区均衡策略,保证高可用
- 引入 死信队列(DLQ) 处理消费异常数据
🔚 总结
引入 Kafka 解耦点赞等热点操作,是 NovaTube 性能优化的重要一环。
它实现了:
- 架构上的解耦与职责划分
- 用户体验上的提升
- 系统性能上的增强
- 业务扩展上的灵活性
在现代高并发系统中,这是极具实用价值的一种架构模式,适用于所有频繁、可延迟、可容错的非核心路径操作。
🧾 附:简历中的一句话
基于 Kafka 构建点赞、评论、弹幕等热点操作的异步处理链路,实现系统解耦与削峰填谷,提升响应性能与高并发下的稳定性。
🚀 Kafka 在视频平台的多场景应用实践:弹幕、评论与播放记录异步解耦
作者:NovaTube 后端开发者
时间:2025 年 6 月
标签:Kafka、异步架构、弹幕、评论、播放记录、解耦设计
🧭 引言
在 NovaTube 视频平台中,我们不仅为视频提供播放服务,还支持用户实时发送弹幕、评论互动、记录观看历史。这些功能虽然核心链路简单,但在高并发场景下却会对系统产生明显负载:
- 弹幕发送:高并发、高实时性、海量数据涌入
- 评论发布:有写库操作、需要审核、还可能触发通知等
- 播放记录:请求频繁、通常伴随每次播放/跳转、写入延迟可接受
为了应对这些压力,我们选择将这三类操作使用 Kafka 进行异步解耦处理,目标是:主流程快响应、后台异步处理、系统更加稳定与可扩展。
📦 核心设计原则
场景 | 是否频繁 | 实时性要求 | 可异步化 | 备注 |
---|---|---|---|---|
弹幕发送 | 非常频繁 | 中等 | ✅ | 需要实时展示但写入可延迟 |
评论发布 | 频繁 | 一般 | ✅ | 可异步入库+后台审核+通知 |
播放记录 | 高频 | 低 | ✅ | 无需强一致性,延迟可接受 |
✅ 这些操作天然适合用 Kafka 异步处理,具有良好的“削峰填谷”特性。
🎯 架构概览图
┌────────────┐│ 客户端 │└────┬───────┘│┌──────────▼──────────┐│ SpringBoot 控制层 │└──────────┬──────────┘▼┌─────────────────────┐│ KafkaTemplate 发送 │└─────────────────────┘▼┌────────────┐│ Kafka Broker│└────┬───────┘▼┌──────────────────────────────┐│ 弹幕/评论/播放记录消费者服务 │└────┬──────────────┬──────────┘▼ ▼持久化入库 审核/通知等
1️⃣ 弹幕发送异步化处理
💡 背景:
- 用户每秒可发起大量弹幕请求
- 若每次弹幕都同步写库,可能对主库造成写入压力
- 弹幕在页面实时展示,但持久化延迟可接受
🔧 实现步骤
Controller 层快速响应
@PostMapping("/danmu/send")
public ResponseEntity<?> sendDanmu(@RequestBody DanmuRequest req) {DanmuEvent event = new DanmuEvent(req.getUserId(), req.getVideoId(), req.getContent());kafkaTemplate.send("danmu-events", JSON.toJSONString(event));return ResponseEntity.ok("发送成功");
}
消费者异步写入 + 敏感词处理
@KafkaListener(topics = "danmu-events", groupId = "danmu-consumers")
public void consumeDanmu(String msg) {DanmuEvent event = JSON.parseObject(msg, DanmuEvent.class);if (!containsSensitiveWord(event.getContent())) {danmuRepository.save(event.toEntity());} else {log.warn("屏蔽敏感弹幕:{}", event);}
}
2️⃣ 评论发布异步处理链路
💡 背景:
- 用户发布评论时,既要写数据库,还可能通知作者、审核内容
- 多个任务耦合在主流程,影响响应速度
🔧 技术设计
请求快速入队
@PostMapping("/comment/publish")
public ResponseEntity<?> publish(@RequestBody CommentRequest req) {CommentEvent event = new CommentEvent(req.getUserId(), req.getVideoId(), req.getContent());kafkaTemplate.send("comment-events", JSON.toJSONString(event));return ResponseEntity.ok("评论成功");
}
消费者链路拆分:
@KafkaListener(topics = "comment-events", groupId = "comment-group")
public void consumeComment(String msg) {CommentEvent event = JSON.parseObject(msg, CommentEvent.class);commentRepository.save(event.toEntity()); // 持久化auditService.asyncAudit(event); // 审核notifyService.notifyAuthor(event); // 通知作者
}
🧠 好处:写入、审核、通知等子任务可独立扩展,互不影响。
3️⃣ 播放记录异步收集
💡 背景:
- 用户每播放/拖动一次,客户端都会上报播放信息
- 高并发环境下,若同步写库会带来极大压力
- 播放记录用于推荐和用户历史回顾,非强实时
🔧 方案设计
@PostMapping("/video/record")
public ResponseEntity<?> recordPlay(@RequestBody PlayLogRequest req) {PlayRecordEvent event = new PlayRecordEvent(req.getUserId(), req.getVideoId(), req.getTimestamp());kafkaTemplate.send("play-record-events", JSON.toJSONString(event));return ResponseEntity.ok().build();
}
异步消费者持久化或写入 Elasticsearch
@KafkaListener(topics = "play-record-events", groupId = "record-group")
public void consumePlayRecord(String msg) {PlayRecordEvent event = JSON.parseObject(msg, PlayRecordEvent.class);playRecordService.saveRecord(event);
}
可根据需要定向入库至 MySQL、Elasticsearch、ClickHouse 等。
✅ 技术优势总结
优势 | 说明 |
---|---|
⚡ 响应更快 | 控制层快速返回,异步处理耗时任务 |
💔 系统解耦 | 各模块职责清晰,便于扩展、测试、部署 |
📉 削峰填谷 | 高并发写操作统一缓冲,防止系统过载 |
🧩 易于链路拓展 | 后续可接入 AI 审核、内容推荐、数据分析等服务 |
🔁 提高容错性 | 消息存储于 Kafka,失败可重试 |
📘 简历中的一句话描述
利用 Kafka 构建弹幕发送、评论发布、播放记录等高频操作的异步处理链路,实现写操作解耦、响应提速与高并发场景下系统稳定性提升。
🧩 可选优化方向
- 支持批量消费入库
- Kafka Topic 按用户/视频维度分区,提升吞吐
- 接入死信队列(DLQ)容错异常事件
- 消息格式标准化 + schema 管理(如 Avro)
📌 总结
本篇博客分享了如何利用 Kafka 在一个中大型视频平台中实现多类高频操作的异步解耦,帮助系统在高并发下保持高可用与高性能。这一架构思想同样适用于电商(下单、支付)、社交(私信、点赞)、游戏(日志、成就)等场景。
非常好,以下是关于 Kafka 在 Spring Boot 项目中的结构位置、如何调用和使用 的详细讲解,包括 组件职责划分、调用流程、配置方法以及关键代码解析。本讲解既适合入门者了解项目结构,也适合用于博客/面试准备。
📦 Kafka 在 Spring Boot 项目中的结构与使用详解(含代码)
一、Kafka 在项目中的整体结构位置
在一个典型的 Spring Boot 项目中,Kafka 主要扮演 消息解耦中间件 的角色,负责“接收请求 -> 异步发送消息 -> 消费处理任务”的链路。结构如下:
📂 src└── 📂 main├── 📂 controller // 控制层:接收请求,调用KafkaProducer├── 📂 service // 业务层:处理逻辑或封装消费者回调├── 📂 kafka│ ├── KafkaProducer // 封装 KafkaTemplate 发送逻辑│ ├── KafkaConsumer // 监听 Kafka 消费消息│ └── KafkaConfig // Kafka 连接配置├── 📂 dto // 数据传输对象(Event/Request)└── 📂 repository // 数据持久化
二、调用流程图(以评论发布为例)
用户请求↓
@Controller -> /comment/publish↓
KafkaProducer.send("comment-events", msg)↓
Kafka Broker 消息队列↓
@KafkaListener(topic="comment-events") -> KafkaConsumer↓
处理逻辑(如持久化、通知等)
三、Kafka 的使用方式 & 关键代码
1️⃣ Kafka 配置(KafkaConfig.java)
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(config);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
application.yml 配置
spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-consumer-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2️⃣ Kafka 生产者封装(KafkaProducer.java)
@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void send(String topic, String message) {kafkaTemplate.send(topic, message);}
}
3️⃣ 控制器调用 Kafka(CommentController.java)
@RestController
@RequestMapping("/comment")
public class CommentController {@Autowiredprivate KafkaProducer kafkaProducer;@PostMapping("/publish")public ResponseEntity<?> publishComment(@RequestBody CommentRequest req) {CommentEvent event = new CommentEvent(req.getUserId(), req.getVideoId(), req.getContent());String json = JSON.toJSONString(event);kafkaProducer.send("comment-events", json); // 👈 调用 Kafkareturn ResponseEntity.ok("评论成功,正在后台处理");}
}
4️⃣ 消费者监听处理(KafkaCommentConsumer.java)
@Component
public class KafkaCommentConsumer {@Autowiredprivate CommentService commentService;@KafkaListener(topics = "comment-events", groupId = "comment-group")public void consumeComment(String message) {CommentEvent event = JSON.parseObject(message, CommentEvent.class);commentService.processComment(event); // 处理评论逻辑(写库、审核、通知)}
}
5️⃣ 业务服务层(CommentService.java)
@Service
public class CommentService {@Autowiredprivate CommentRepository commentRepository;public void processComment(CommentEvent event) {Comment comment = new Comment();comment.setUserId(event.getUserId());comment.setVideoId(event.getVideoId());comment.setContent(event.getContent());comment.setCreateTime(LocalDateTime.now());commentRepository.save(comment);}
}
四、完整流程回顾(图解)
POST /comment/publish↓
[Controller] 收到请求↓
KafkaProducer.send("comment-events", json)↓
Kafka 中转存储(解耦主流程)↓
KafkaConsumer 监听到消息↓
调用 CommentService.processComment()↓
MySQL 写入评论表
五、总结:Kafka 在 Spring Boot 中的定位
模块 | 作用 |
---|---|
KafkaTemplate | 生产者 API,负责消息发送 |
@KafkaListener | 消费者注解,监听 Kafka Topic |
KafkaConfig | 统一配置 Kafka 客户端参数 |
Producer 封装类 | 隔离业务与发送逻辑 |
Consumer 封装类 | 解耦消费逻辑,便于扩展与重试 |
DTO/Event | 标准化传输格式(如 JSON) |
🧠 面试/简历描述模板
在 Spring Boot 项目中集成 Kafka,用于实现评论、弹幕、播放记录等模块的异步消息处理。封装 Kafka 生产者与消费者逻辑,通过
KafkaTemplate
发送消息、@KafkaListener
异步消费,解耦主流程、提升系统吞吐能力。
Kafka Broker 是什么、消息队列是怎么体现的, Kafka 调用的整体逻辑流程
🧩 一、Kafka Broker 是什么?
Kafka Broker 是 Kafka 的核心组件之一,它是 Kafka 服务端的实例,负责:
- 接收消息(Producer 发来)
- 存储消息(分布式日志持久化)
- 投递消息给消费者(Consumer)
一个 Kafka 集群由多个 Broker 组成,每个 Broker 负责维护一部分数据(即部分 Topic 的分区)。
👉 可以简单理解:一个 Broker 就是一个 Kafka 节点,它是 Kafka 消息的存储与转发中心。
例如:
Producer -----> Kafka Broker(1) ----> Consumer↑存储的是某个 Topic 的某个分区
📦 二、Kafka 的“消息队列”是如何体现的?
Kafka 并不是传统意义上“链表结构”的消息队列,而是基于 “Topic + Partition + Offset” 组成的 持久化分布式消息队列。
关键概念:
概念 | 含义 |
---|---|
Topic | 类似一个消息主题,比如 "comment-events" |
Partition | 每个 Topic 可以被划分成多个分区,每个分区是一个有序队列 |
Offset | 消息在分区中的位置编号(偏移量) |
🔁 队列特性的体现方式:
- Kafka 的每个 分区本质上就是一个追加写的消息队列,消费时是按照 offset 顺序读取的。
- 每个消费者可以通过 offset 控制从哪个位置消费。
- 消息存储在磁盘上,可以 重复消费,具备 高可用、持久化、可回溯 的能力。
🔄 三、Kafka 的完整调用流程是怎样的?
以“用户发表评论”场景为例,Kafka 在系统中经历以下流程:
1️⃣ Producer 发送消息
kafkaTemplate.send("comment-events", messageJson);
- 将业务消息(如评论)发送给 Kafka 的 某个 Topic。
- Kafka 客户端选择 Topic 的某个 Partition,将消息写入。
2️⃣ Kafka Broker 接收并存储
- Kafka Broker 收到消息,写入对应的 Topic 分区文件中(顺序追加)。
- 数据以高效二进制日志文件形式存储(支持高并发写入)。
3️⃣ Consumer 拉取消息
@KafkaListener(topics = "comment-events")
public void consume(String message) { ... }
- 消费者通过轮询方式,从指定 Topic 的分区中拉取消息。
- Kafka 保证每个消费者组可以独立消费,互不影响。
- 消费者提交 offset,Kafka 不会自动删除消息(支持回放)。
🧠 图解整个调用过程
[业务代码]↓ 调用 KafkaProducer.send()
Producer─────────────→ Kafka Broker↑分区日志队列(消息队列本质)↓
@KafkaListener 拉取消息
Consumer↓
持久化 or 业务处理
🧠 总结:通俗理解 Kafka 的机制
概念 | 解释 |
---|---|
Kafka Broker | Kafka 的核心服务节点,负责接收/存储/分发消息 |
消息队列特性 | Topic → Partition → Offset 结构形成有序、可回溯、高并发的消息通道 |
调用流程 | 发送(Producer)→ 存储(Broker)→ 消费(Consumer) |
技术优势 | 高性能、可持久化、异步解耦、支持海量并发与分布式部署 |
✅ Kafka Broker 在代码中的体现位置
虽然 Kafka Broker 是运行在服务器端的后台服务,不是直接在代码中“写出来”的一个类或对象,但在代码中配置连接 Kafka Broker 的地址,就等于是告诉应用程序:
“Kafka Broker 在这里,所有消息收发请连接这个地址。”
🧩 所以,Broker 在代码中的体现,主要是它的地址体现在配置文件里。
🧾 1. 在 application.yml
或 application.properties
中体现 Broker 地址
spring:kafka:bootstrap-servers: localhost:9092
这句配置就告诉了 Kafka 客户端(Producer、Consumer):
Kafka Broker 正在
localhost:9092
上运行。
所有消息都要发送/接收给这个地址的 Kafka 服务。
如果是连接一个集群,可能会是:
spring:kafka:bootstrap-servers: broker1:9092,broker2:9092,broker3:9092
💡 2. 它在 KafkaTemplate
和 @KafkaListener
背后被使用
当用如下代码发送消息:
kafkaTemplate.send("comment-events", "your message");
或者监听消费:
@KafkaListener(topics = "comment-events")
public void consume(String msg) {...
}
这些 Kafka 客户端组件背后,Spring Boot 会使用在配置文件中定义的 bootstrap-servers
,自动构建连接到 Kafka Broker 的连接客户端。
🔍 示例代码对应关系总结
Kafka 功能 | 代码体现 | 实际作用 |
---|---|---|
Broker 地址配置 | bootstrap-servers: localhost:9092 | Kafka 客户端通过这个地址连接 Broker |
Producer 发送消息 | kafkaTemplate.send(...) | 使用配置好的 Kafka 客户端发送消息到 Broker |
Consumer 消费消息 | @KafkaListener(...) | Spring Boot 通过配置的地址监听 Kafka Broker 的消息 |
🧠 总结一句话
Kafka Broker 本身不在业务代码中显式出现,它通过配置文件中的 bootstrap-servers
地址体现,Producer 和 Consumer 都基于这个地址与 Kafka Broker 建立连接、收发消息。