当前位置: 首页 > news >正文

引入 Kafka 消息队列解耦热点操作

在在线视频平台中引入 Kafka 消息队列解耦热点操作的实践

📌 背景简介

在我们构建 NovaTube 在线视频分享平台过程中,点赞、评论、弹幕、播放记录等用户操作频繁,尤其在用户量上升后,出现了以下问题:

  • 点赞请求峰值达到上千 QPS,数据库写入压力骤增
  • 弹幕写入请求频繁,偶发出现主从同步延迟
  • 播放记录写入直接影响视频播放体验
  • 用户评论操作影响接口整体响应时间

这些操作共同的特点是:

  • 写操作频繁
  • 实时性要求不高
  • 可异步处理

于是,我们决定引入 Kafka 消息队列对这些热点写操作进行解耦和异步化处理,优化系统性能、增强可扩展性与容错能力。


🎯 技术目标

  • 降低数据库压力,防止写操作击穿
  • 提高接口响应速度,提升用户体验
  • 实现异步解耦、模块职责清晰
  • 为后续功能扩展预留处理链路(如点赞反作弊、弹幕审核)

🏗️ 系统架构设计

以下是 NovaTube 在点赞场景下的 Kafka 解耦架构图:

[Client]↓ 点赞请求
[网关层] + [Sentinel限流]↓
[主站服务 Controller]↓
KafkaTemplate.send() 发送到 topic: like-events↓
[Kafka Broker]↓
[点赞消费者服务]↓
MySQL/Redis 数据持久化 + 缓存更新

说明:

  • 点赞请求不再直接写库,而是快速封装为事件消息,发送至 Kafka
  • 消费者服务异步消费,集中处理点赞写入逻辑
  • 可通过 Redis 实时展示点赞数,定时同步至数据库

🔧 实现过程详解

1. 点赞事件模型定义

@Data
public class LikeEvent {private Long userId;private String videoId;private Long timestamp;
}

2. 生产者:发送点赞消息

@RestController
public class LikeController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/like")public ResponseEntity<?> like(@RequestParam String videoId, HttpServletRequest request) {Long userId = getUserIdFromRequest(request);LikeEvent event = new LikeEvent(userId, videoId, System.currentTimeMillis());kafkaTemplate.send("like-events", JSON.toJSONString(event));return ResponseEntity.ok("点赞成功");}
}

特点:

  • 非阻塞、快速响应
  • 即使 Kafka 临时阻塞也可降级处理(本地缓存/失败重试)

3. 消费者:异步写入数据库

@Component
public class LikeConsumer {@Autowiredprivate LikeService likeService;@KafkaListener(topics = "like-events", groupId = "nova-like-group")public void consumeLikeEvent(String message) {LikeEvent event = JSON.parseObject(message, LikeEvent.class);likeService.saveUserLike(event.getUserId(), event.getVideoId());}
}

4. 点赞写入逻辑

@Service
public class LikeService {@Autowiredprivate LikeRepository likeRepository;public void saveUserLike(Long userId, String videoId) {if (!likeRepository.existsByUserIdAndVideoId(userId, videoId)) {likeRepository.save(new UserLike(userId, videoId));}}
}

可进一步优化:

  • 增加幂等校验
  • Redis缓存点赞状态
  • 批量消费与批量入库

💡 异步解耦的优势

优势说明
⚡ 提升响应速度请求快速返回,主链路无阻塞
🔧 解耦架构点赞、弹幕、评论等模块独立演化
🔁 削峰填谷高并发场景下缓冲请求,防止写库瓶颈
🔐 容错增强Kafka 持久化保障消息不丢失,可重试
📈 可扩展性好易于接入日志分析、反作弊、通知等模块

🧠 可扩展的场景

除了点赞,我们还将 Kafka 运用到以下场景中:

功能模块Kafka Topic说明
弹幕发送danmu-events异步写入 + 敏感词审核
评论发布comment-events异步发布 + 通知推送
播放记录play-record-events异步记录观看历史,减少实时写库压力

📊 Kafka 运维与监控建议

  • 使用 Kafka Manager / Confluent Control Center 管理 Topic、分区、副本
  • 接入 Prometheus + Grafana 实时监控消费速率、堆积量、延迟等指标
  • 设置 消费者组分区均衡策略,保证高可用
  • 引入 死信队列(DLQ) 处理消费异常数据

🔚 总结

引入 Kafka 解耦点赞等热点操作,是 NovaTube 性能优化的重要一环。

它实现了:

  • 架构上的解耦与职责划分
  • 用户体验上的提升
  • 系统性能上的增强
  • 业务扩展上的灵活性

在现代高并发系统中,这是极具实用价值的一种架构模式,适用于所有频繁、可延迟、可容错的非核心路径操作。


🧾 附:简历中的一句话

基于 Kafka 构建点赞、评论、弹幕等热点操作的异步处理链路,实现系统解耦与削峰填谷,提升响应性能与高并发下的稳定性。


🚀 Kafka 在视频平台的多场景应用实践:弹幕、评论与播放记录异步解耦

作者:NovaTube 后端开发者
时间:2025 年 6 月
标签:Kafka、异步架构、弹幕、评论、播放记录、解耦设计


🧭 引言

在 NovaTube 视频平台中,我们不仅为视频提供播放服务,还支持用户实时发送弹幕、评论互动、记录观看历史。这些功能虽然核心链路简单,但在高并发场景下却会对系统产生明显负载:

  • 弹幕发送:高并发、高实时性、海量数据涌入
  • 评论发布:有写库操作、需要审核、还可能触发通知等
  • 播放记录:请求频繁、通常伴随每次播放/跳转、写入延迟可接受

为了应对这些压力,我们选择将这三类操作使用 Kafka 进行异步解耦处理,目标是:主流程快响应、后台异步处理、系统更加稳定与可扩展


📦 核心设计原则

场景是否频繁实时性要求可异步化备注
弹幕发送非常频繁中等需要实时展示但写入可延迟
评论发布频繁一般可异步入库+后台审核+通知
播放记录高频无需强一致性,延迟可接受

✅ 这些操作天然适合用 Kafka 异步处理,具有良好的“削峰填谷”特性。


🎯 架构概览图

               ┌────────────┐│   客户端   │└────┬───────┘│┌──────────▼──────────┐│ SpringBoot 控制层   │└──────────┬──────────┘▼┌─────────────────────┐│ KafkaTemplate 发送   │└─────────────────────┘▼┌────────────┐│ Kafka Broker│└────┬───────┘▼┌──────────────────────────────┐│ 弹幕/评论/播放记录消费者服务 │└────┬──────────────┬──────────┘▼              ▼持久化入库       审核/通知等

1️⃣ 弹幕发送异步化处理

💡 背景:

  • 用户每秒可发起大量弹幕请求
  • 若每次弹幕都同步写库,可能对主库造成写入压力
  • 弹幕在页面实时展示,但持久化延迟可接受

🔧 实现步骤

Controller 层快速响应
@PostMapping("/danmu/send")
public ResponseEntity<?> sendDanmu(@RequestBody DanmuRequest req) {DanmuEvent event = new DanmuEvent(req.getUserId(), req.getVideoId(), req.getContent());kafkaTemplate.send("danmu-events", JSON.toJSONString(event));return ResponseEntity.ok("发送成功");
}
消费者异步写入 + 敏感词处理
@KafkaListener(topics = "danmu-events", groupId = "danmu-consumers")
public void consumeDanmu(String msg) {DanmuEvent event = JSON.parseObject(msg, DanmuEvent.class);if (!containsSensitiveWord(event.getContent())) {danmuRepository.save(event.toEntity());} else {log.warn("屏蔽敏感弹幕:{}", event);}
}

2️⃣ 评论发布异步处理链路

💡 背景:

  • 用户发布评论时,既要写数据库,还可能通知作者、审核内容
  • 多个任务耦合在主流程,影响响应速度

🔧 技术设计

请求快速入队
@PostMapping("/comment/publish")
public ResponseEntity<?> publish(@RequestBody CommentRequest req) {CommentEvent event = new CommentEvent(req.getUserId(), req.getVideoId(), req.getContent());kafkaTemplate.send("comment-events", JSON.toJSONString(event));return ResponseEntity.ok("评论成功");
}
消费者链路拆分:
@KafkaListener(topics = "comment-events", groupId = "comment-group")
public void consumeComment(String msg) {CommentEvent event = JSON.parseObject(msg, CommentEvent.class);commentRepository.save(event.toEntity());  // 持久化auditService.asyncAudit(event);            // 审核notifyService.notifyAuthor(event);         // 通知作者
}

🧠 好处:写入、审核、通知等子任务可独立扩展,互不影响。


3️⃣ 播放记录异步收集

💡 背景:

  • 用户每播放/拖动一次,客户端都会上报播放信息
  • 高并发环境下,若同步写库会带来极大压力
  • 播放记录用于推荐和用户历史回顾,非强实时

🔧 方案设计

@PostMapping("/video/record")
public ResponseEntity<?> recordPlay(@RequestBody PlayLogRequest req) {PlayRecordEvent event = new PlayRecordEvent(req.getUserId(), req.getVideoId(), req.getTimestamp());kafkaTemplate.send("play-record-events", JSON.toJSONString(event));return ResponseEntity.ok().build();
}
异步消费者持久化或写入 Elasticsearch
@KafkaListener(topics = "play-record-events", groupId = "record-group")
public void consumePlayRecord(String msg) {PlayRecordEvent event = JSON.parseObject(msg, PlayRecordEvent.class);playRecordService.saveRecord(event);
}

可根据需要定向入库至 MySQL、Elasticsearch、ClickHouse 等。


✅ 技术优势总结

优势说明
⚡ 响应更快控制层快速返回,异步处理耗时任务
💔 系统解耦各模块职责清晰,便于扩展、测试、部署
📉 削峰填谷高并发写操作统一缓冲,防止系统过载
🧩 易于链路拓展后续可接入 AI 审核、内容推荐、数据分析等服务
🔁 提高容错性消息存储于 Kafka,失败可重试

📘 简历中的一句话描述

利用 Kafka 构建弹幕发送、评论发布、播放记录等高频操作的异步处理链路,实现写操作解耦、响应提速与高并发场景下系统稳定性提升。


🧩 可选优化方向

  • 支持批量消费入库
  • Kafka Topic 按用户/视频维度分区,提升吞吐
  • 接入死信队列(DLQ)容错异常事件
  • 消息格式标准化 + schema 管理(如 Avro)

📌 总结

本篇博客分享了如何利用 Kafka 在一个中大型视频平台中实现多类高频操作的异步解耦,帮助系统在高并发下保持高可用与高性能。这一架构思想同样适用于电商(下单、支付)、社交(私信、点赞)、游戏(日志、成就)等场景。


非常好,以下是关于 Kafka 在 Spring Boot 项目中的结构位置、如何调用和使用 的详细讲解,包括 组件职责划分、调用流程、配置方法以及关键代码解析。本讲解既适合入门者了解项目结构,也适合用于博客/面试准备。


📦 Kafka 在 Spring Boot 项目中的结构与使用详解(含代码)

一、Kafka 在项目中的整体结构位置

在一个典型的 Spring Boot 项目中,Kafka 主要扮演 消息解耦中间件 的角色,负责“接收请求 -> 异步发送消息 -> 消费处理任务”的链路。结构如下:

📂 src└── 📂 main├── 📂 controller       // 控制层:接收请求,调用KafkaProducer├── 📂 service          // 业务层:处理逻辑或封装消费者回调├── 📂 kafka│    ├── KafkaProducer  // 封装 KafkaTemplate 发送逻辑│    ├── KafkaConsumer  // 监听 Kafka 消费消息│    └── KafkaConfig    // Kafka 连接配置├── 📂 dto              // 数据传输对象(Event/Request)└── 📂 repository       // 数据持久化

二、调用流程图(以评论发布为例)

用户请求↓
@Controller -> /comment/publish↓
KafkaProducer.send("comment-events", msg)↓
Kafka Broker 消息队列↓
@KafkaListener(topic="comment-events") -> KafkaConsumer↓
处理逻辑(如持久化、通知等)

三、Kafka 的使用方式 & 关键代码

1️⃣ Kafka 配置(KafkaConfig.java)

@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(config);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

application.yml 配置

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-consumer-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2️⃣ Kafka 生产者封装(KafkaProducer.java)

@Component
public class KafkaProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void send(String topic, String message) {kafkaTemplate.send(topic, message);}
}

3️⃣ 控制器调用 Kafka(CommentController.java)

@RestController
@RequestMapping("/comment")
public class CommentController {@Autowiredprivate KafkaProducer kafkaProducer;@PostMapping("/publish")public ResponseEntity<?> publishComment(@RequestBody CommentRequest req) {CommentEvent event = new CommentEvent(req.getUserId(), req.getVideoId(), req.getContent());String json = JSON.toJSONString(event);kafkaProducer.send("comment-events", json); // 👈 调用 Kafkareturn ResponseEntity.ok("评论成功,正在后台处理");}
}

4️⃣ 消费者监听处理(KafkaCommentConsumer.java)

@Component
public class KafkaCommentConsumer {@Autowiredprivate CommentService commentService;@KafkaListener(topics = "comment-events", groupId = "comment-group")public void consumeComment(String message) {CommentEvent event = JSON.parseObject(message, CommentEvent.class);commentService.processComment(event); // 处理评论逻辑(写库、审核、通知)}
}

5️⃣ 业务服务层(CommentService.java)

@Service
public class CommentService {@Autowiredprivate CommentRepository commentRepository;public void processComment(CommentEvent event) {Comment comment = new Comment();comment.setUserId(event.getUserId());comment.setVideoId(event.getVideoId());comment.setContent(event.getContent());comment.setCreateTime(LocalDateTime.now());commentRepository.save(comment);}
}

四、完整流程回顾(图解)

POST /comment/publish↓
[Controller] 收到请求↓
KafkaProducer.send("comment-events", json)↓
Kafka 中转存储(解耦主流程)↓
KafkaConsumer 监听到消息↓
调用 CommentService.processComment()↓
MySQL 写入评论表

五、总结:Kafka 在 Spring Boot 中的定位

模块作用
KafkaTemplate生产者 API,负责消息发送
@KafkaListener消费者注解,监听 Kafka Topic
KafkaConfig统一配置 Kafka 客户端参数
Producer 封装类隔离业务与发送逻辑
Consumer 封装类解耦消费逻辑,便于扩展与重试
DTO/Event标准化传输格式(如 JSON)

🧠 面试/简历描述模板

在 Spring Boot 项目中集成 Kafka,用于实现评论、弹幕、播放记录等模块的异步消息处理。封装 Kafka 生产者与消费者逻辑,通过 KafkaTemplate 发送消息、@KafkaListener 异步消费,解耦主流程、提升系统吞吐能力。


Kafka Broker 是什么消息队列是怎么体现的Kafka 调用的整体逻辑流程

🧩 一、Kafka Broker 是什么?

Kafka Broker 是 Kafka 的核心组件之一,它是 Kafka 服务端的实例,负责:

  • 接收消息(Producer 发来)
  • 存储消息(分布式日志持久化)
  • 投递消息给消费者(Consumer)

一个 Kafka 集群由多个 Broker 组成,每个 Broker 负责维护一部分数据(即部分 Topic 的分区)。

👉 可以简单理解:一个 Broker 就是一个 Kafka 节点,它是 Kafka 消息的存储与转发中心。

例如:

Producer -----> Kafka Broker(1) ----> Consumer↑存储的是某个 Topic 的某个分区

📦 二、Kafka 的“消息队列”是如何体现的?

Kafka 并不是传统意义上“链表结构”的消息队列,而是基于 “Topic + Partition + Offset” 组成的 持久化分布式消息队列

关键概念:

概念含义
Topic类似一个消息主题,比如 "comment-events"
Partition每个 Topic 可以被划分成多个分区,每个分区是一个有序队列
Offset消息在分区中的位置编号(偏移量)

🔁 队列特性的体现方式:

  • Kafka 的每个 分区本质上就是一个追加写的消息队列,消费时是按照 offset 顺序读取的。
  • 每个消费者可以通过 offset 控制从哪个位置消费。
  • 消息存储在磁盘上,可以 重复消费,具备 高可用、持久化、可回溯 的能力。

🔄 三、Kafka 的完整调用流程是怎样的?

以“用户发表评论”场景为例,Kafka 在系统中经历以下流程:


1️⃣ Producer 发送消息

kafkaTemplate.send("comment-events", messageJson);
  • 将业务消息(如评论)发送给 Kafka 的 某个 Topic
  • Kafka 客户端选择 Topic 的某个 Partition,将消息写入。

2️⃣ Kafka Broker 接收并存储

  • Kafka Broker 收到消息,写入对应的 Topic 分区文件中(顺序追加)。
  • 数据以高效二进制日志文件形式存储(支持高并发写入)。

3️⃣ Consumer 拉取消息

@KafkaListener(topics = "comment-events")
public void consume(String message) { ... }
  • 消费者通过轮询方式,从指定 Topic 的分区中拉取消息。
  • Kafka 保证每个消费者组可以独立消费,互不影响。
  • 消费者提交 offset,Kafka 不会自动删除消息(支持回放)。

🧠 图解整个调用过程

[业务代码]↓ 调用 KafkaProducer.send()
Producer─────────────→ Kafka Broker↑分区日志队列(消息队列本质)↓
@KafkaListener 拉取消息
Consumer↓
持久化 or 业务处理

🧠 总结:通俗理解 Kafka 的机制

概念解释
Kafka BrokerKafka 的核心服务节点,负责接收/存储/分发消息
消息队列特性Topic → Partition → Offset 结构形成有序、可回溯、高并发的消息通道
调用流程发送(Producer)→ 存储(Broker)→ 消费(Consumer)
技术优势高性能、可持久化、异步解耦、支持海量并发与分布式部署

Kafka Broker 在代码中的体现位置

虽然 Kafka Broker 是运行在服务器端的后台服务,不是直接在代码中“写出来”的一个类或对象,但在代码中配置连接 Kafka Broker 的地址,就等于是告诉应用程序:

“Kafka Broker 在这里,所有消息收发请连接这个地址。”

🧩 所以,Broker 在代码中的体现,主要是它的地址体现在配置文件里。


🧾 1. 在 application.ymlapplication.properties 中体现 Broker 地址

spring:kafka:bootstrap-servers: localhost:9092

这句配置就告诉了 Kafka 客户端(Producer、Consumer):

Kafka Broker 正在 localhost:9092 上运行。
所有消息都要发送/接收给这个地址的 Kafka 服务。

如果是连接一个集群,可能会是:

spring:kafka:bootstrap-servers: broker1:9092,broker2:9092,broker3:9092

💡 2. 它在 KafkaTemplate@KafkaListener 背后被使用

当用如下代码发送消息:

kafkaTemplate.send("comment-events", "your message");

或者监听消费:

@KafkaListener(topics = "comment-events")
public void consume(String msg) {...
}

这些 Kafka 客户端组件背后,Spring Boot 会使用在配置文件中定义的 bootstrap-servers,自动构建连接到 Kafka Broker 的连接客户端。


🔍 示例代码对应关系总结

Kafka 功能代码体现实际作用
Broker 地址配置bootstrap-servers: localhost:9092Kafka 客户端通过这个地址连接 Broker
Producer 发送消息kafkaTemplate.send(...)使用配置好的 Kafka 客户端发送消息到 Broker
Consumer 消费消息@KafkaListener(...)Spring Boot 通过配置的地址监听 Kafka Broker 的消息

🧠 总结一句话

Kafka Broker 本身不在业务代码中显式出现,它通过配置文件中的 bootstrap-servers 地址体现,Producer 和 Consumer 都基于这个地址与 Kafka Broker 建立连接、收发消息。


http://www.xdnf.cn/news/1018585.html

相关文章:

  • list使用及模拟
  • HarmonyOS 应用模块化设计 - 面试核心知识点
  • WPF--Application.Current.Dispatcher.BeginInvoke
  • 在Jupyter Notebook中使用Conda虚拟环境
  • 使用 PyMuPDF 和 PySide6/PyQt6 编写的 PDF 查看器 (显示树状书签和缩略图列表,没有文字选择功能)
  • Monte Carlo衍生品定价(金融工程)
  • Spring Boot3流式访问Dify聊天助手接口
  • PHP语法基础篇(二):输出函数与字符串操作
  • 《第五章-心法进阶》 C++修炼生涯笔记(基础篇)指针与结构体⭐⭐⭐⭐⭐
  • 6月计算机新书:深度学习、大模型、DeepSeek
  • Blender 3D建模工具的快捷键总结--选择、视图、对象、编辑、UV贴图、模型材质、动画与渲染、工具
  • 238. 除自身以外数组的乘积
  • Linux运维-ansible-python开发-获取inventroy信息
  • 第二十五章 25.Network Architecture(CCNA)
  • 简析MDM在餐饮设备中的部署与应用
  • 快速掌握Django框架设计思想(图解版)
  • java_oss_微信小程序_通过临时签名url访问oss中存储的图像
  • 微信小程序中跨页面调用函数来刷新页面
  • 深入理解JavaScript设计模式之策略模式
  • @Profile, @Conditional, @ConditionalOnMissingBean, @ConditionalOnClass
  • nodejs 语言特性(面试系列2)
  • 【Pandas】pandas DataFrame droplevel
  • java中跨域问题及解决方案
  • Spring XML 常用命名空间配置
  • React Native 项目实战 —— 记账本应用开发指南
  • 【React Native 性能优化:虚拟列表嵌套 ScrollView 问题全解析】
  • Java-数组-异常(基础)
  • 包含40个购物网站UI界面的psd适用于电商项目
  • 在 Linux 系统中通过 yum 安装 Sublime Text
  • 平压印刷机设计原理与关键技术研究