当前位置: 首页 > java >正文

从原理到实践:一文掌握Kafka的消息生产与消费

什么是kafka

一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

kafka中基本术语

消息:kafka中的数据单元,也称为记录

批次:为了提高效率,消息分批次被消费,这一组消息就叫批次

主题:消息的种类叫主题,一个主题代表了一类消息

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序。

生产者:生产者用于持续不断的向某个主题发送消息

消费者:消费者用于处理生产者产生的消息

消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体

偏移量:它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据

Kafka Broker:在 Kafka 中,Broker(代理)是 Kafka 集群的基础工作单元,负责消息的存储、传输和处理。简单来说,Broker 就是运行 Kafka 服务的服务器节点。

Kafka 的特性

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性:Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 的数据能够持久存储。
  • 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发:支持数千个客户端同时读写

kafka常用参数配置

Produce关键配置

bootstrap.servers

Kafka集群地址(逗号分隔)

acks

acks是kafka生产者中最核心的可靠性配置,配置决定了生产者认为消息是否"成功写入",该配置有三种级别,分别是0、1(默认)、all

acks=0 无确认

acks=1 Leader确认

acks=all 全副本确认

配置值可靠性延迟吞吐量适用场景数据丢失风险
acks=0最低最低最高监控日志、实时指标极高:发送即视为成功
acks=1中等普通日志、非关键数据中等:Leader 写入后崩溃可能丢失
acks=all最高中等金融交易、订单数据极低:需配合 min.insync.replicas

key.serializer、value.serializer

键序列化、值序列化类

interceptor.classes

允许为 Kafka 生产者或消费者插入自定义逻辑,在消息发送/消费的关键节点进行拦截处理

Consumer 关键配置

bootstrap.servers

Kafka集群地址

enable.auto.commit

消息的自动和手动提交,取决于该配置的设置

自动提交:

  • 开发者无需关心位移管理逻辑
  • 减少代码复杂度和出错可能
  • 批量提交减少网络请求次数
  • 无提交等待时间,连续消费不间断

手动提交:

  • 确保业务操作与位移提交的原子性
  • 避免"部分成功"导致的数据不一致
  • 支持事务性操作
  • 异常恢复能力

维度自动提交手动提交
开发复杂度极简复杂
吞吐量 最高中等
可靠性可能丢失精确一次
资源消耗较低较高
适用场景通知/日志交易/订单

🌐 生活场景类比:报纸配送系统

自动提交模式

送报员每天将报纸投入信箱即视为送达(自动提交),不等待住户确认。
优势:高效率覆盖整个社区,每天能送1000户。
风险:可能有人没收到报纸(消息丢失)。

手动提交模式

快递员必须当面签收包裹(手动确认)。
优势:确保每个包裹送达(消息可靠)。
代价:每天只能送100户,效率低下。

key.serializer、value.serializer

键序列化、值序列化类

保持消费者活跃关键配置

参数作用域默认值介绍风险
session.timeout.ms消费者45秒消费者需要定期发送心跳给协调器证明自己存活,如果超时消费者被标记为死亡,触发再均衡分区重新分配给其他消费者误判死亡
heartbeat.interval.ms消费者3秒控制消费者发送心跳的频率心跳风暴
max.poll.interval.ms消费者5分钟控制两次poll()调用之间的最大允许间隔,也就是控制业务处理时长,如果处理时长超时,直接踢出消费组被意外踢出
offsets.retention.minutesBroker7天

当消费者停止工作后,生产者依旧在生产数据,位移数据依旧在增多,但是到了位移保留时长后,位移数据将会被删除

位移丢失
auto.offset.reset消费者latest当没有初始offset或offset已被删除时,消费者如何处理。可选值:latest(从最新消息开始),earliest(从头开始),none(报错)数据丢失或重复消费

位移数据删除后处理:

策略消费起始位置是否"归零"数据影响风险
latest分区最新位移
(如12500)
跳过11000-12500的所有订单消息丢失
earliest分区当前起始位移
(如0或5000)
物理归零重放所有可用历史订单重复消费
none不启动N/A服务中断抛异常

总结:

kafka发送消息

//示例:
private final KafkaTemplate<String, String> kafkaTemplate;
//参数很多可参考官网文档
kafkaTemplate.send(topic, message);

方法参数详解

1. 基本发送:指定主题和消息内容

ListenableFuture<SendResult<K, V>> send(String topic, V data);
  • 参数

    • topic:消息发送到的 Kafka 主题名称。

    • data:消息内容(Value)。

kafkaTemplate.send("user-events", "{\"userId\": 1001, \"action\": \"login\"}");

 2. 指定分区:发送到特定分区

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, V data);
  • 参数

    • partition:目标分区的编号(从 0 开始)。

  • 示例

// 发送到主题 "order-events" 的分区 2
kafkaTemplate.send("order-events", 2, "{\"orderId\": \"O20231001\"}");
  • 作用:明确将消息发送到指定分区,适用于需要控制消息物理存储位置的场景(如日志顺序性要求)。

3. 指定 Key 和 Value:控制分区策略

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
  • 参数

    • key:消息的键(Key),用于计算分区(默认 Hash 策略)。

  • 示例

// 使用用户ID作为Key,确保同一用户的消息进入同一分区
kafkaTemplate.send("user-actions", "user-1001", "{\"action\": \"purchase\"}");
  • 作用:通过 Key 控制分区分配,保证相同 Key 的消息总是进入同一分区,实现顺序性消费。

4. 指定分区、Key 和 Value

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data
);
  • 示例
// 发送到分区 1,Key 为 "region-east",Value 为区域数据
kafkaTemplate.send("region-data", 1, "region-east", "{\"sales\": 5000}");
  • 作用:同时指定分区和 Key(以 Key 的分区计算结果优先,若分区已指定则忽略 Key)。

5. 包含时间戳

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data
);
  • 参数

    • timestamp:消息时间戳(毫秒),用于日志留存策略或流处理。

  • 示例

    long eventTime = System.currentTimeMillis();
    kafkaTemplate.send("sensor-data", 0, eventTime, "sensor-001", "{\"temp\": 25.5}"
    );
  • 作用

    • 显式设置消息时间戳,影响 Kafka 日志清理策略(如 LogAppendTime 或 CreateTime)。

6. 使用 ProducerRecord 对象

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
  • 示例

    ProducerRecord<String, String> record = new ProducerRecord<>("audit-logs", 0, "log-20231001", "{\"level\": \"INFO\", \"message\": \"User login\"}"
    );
    kafkaTemplate.send(record);
  • 作用:直接使用 Kafka 原生 ProducerRecord 对象,支持更底层配置(如 Headers)。

kafka接收消息

 //示例:   @KafkaListener(topics = PositionAnalyseMessage.TOPIC, groupId = "wetool-position-analyse")public void consume0(PositionAnalyseMessage message) {this.doConsume(message);}

1.在同一个消费组中,同一条消息只被一个消费者消费

// 同组的消费者A
@KafkaListener(topics = "test-topic", groupId = "same-group")
public void consumeA(String message) { }// 同组的消费者B
@KafkaListener(topics = "test-topic", groupId = "same-group")
public void consumeB(String message) { }

2.在不同消费组中,同一条消息会被不同组的消费者都会消费(类似发布订阅)

// 订单消息处理
@KafkaListener(topics = "order-topic", groupId = "order-process")
public void processOrder(Order order) {// 处理订单逻辑
}// 订单统计
@KafkaListener(topics = "order-topic", groupId = "order-statistics")
public void statisticsOrder(Order order) {// 统计订单数据
}// 订单通知
@KafkaListener(topics = "order-topic", groupId = "order-notification")
public void notifyOrder(Order order) {// 发送订单通知
}

消费场景

消息手动提交

参数设置

enable.auto.commit=false

consumer.pause() 、consumer.resume()方法

consumer.pause():
暂停消费指定分区的消息,暂时不再从这些分区拉取新消息。

consumer.resume():
恢复消费之前暂停的分区的消息,继续从这些分区拉取消息。

acknowledgment.acknowledge()

手动确认消息已被成功处理

分区解释

规则:

从 0 开始编号:所有 Kafka 分区的编号都是从 0 开始的整数
连续递增:分区编号是连续的(0, 1, 2, 3...)

格式:

<topic_name>-<partition_id>

配置:

num.partitions=1  未指定分区数 → 使用 num.partitions 值(默认为 1)

单分区:

停止分区:

停止拉取新消息核心效果:

  • 消费者不再从指定分区获取新消息
  • 但已拉取到内存的消息仍会继续处理
  • 分区积压消息会保留在Kafka broker上

暂停分区的意义:

  • 流量控制:防止消费者过载的终极手段
  • 稳定性保障:避免消费者崩溃和再均衡风暴
  • 精准调控:分区粒度的流量管理
  • 无缝恢复:负载下降后自动恢复消费
  • 资源保护:防止内存溢出和线程耗尽

代码实现逻辑

批量消费消息场景

参数设置

containerFactory.setBatchListener(true);

max.poll.records:单次 poll() 调用返回的最大消息数(默认500)

fetch.max.wait.ms:等待多久拉取消息

http://www.xdnf.cn/news/17671.html

相关文章:

  • 【bug 解决】串口输出字符乱码的问题
  • pdftk - macOS 上安装使用
  • 干货分享|如何从0到1掌握R语言数据分析
  • OpenAI传来捷报,刚刚夺金IOI,实现通用推理模型的跨越式突破
  • 如何实现PostgreSQL的高可用性,包括主流的复制方案、负载均衡方法以及故障转移流程?
  • 【接口自动化】-11-接口加密签名 全局设置封装
  • 容器安全扫描工具在海外云服务器环境的集成方法
  • Element用法---Loading 加载
  • npm、pnpm、yarn区别
  • 一周学会Matplotlib3 Python 数据可视化-绘制饼状图(Pie)
  • 前沿技术借鉴研讨-2025.8.12 (数据不平衡问题)
  • Web项目Excel文件处理:前端 vs. 后端,企业级如何选择?
  • 【3】Transformers快速入门:大语言模型LLM是啥?
  • 11-docker单机版的容器编排工具docker-compose基本使用
  • centos 7 如何安装 ZipArchive 扩展
  • MySQL 数据库表操作与查询实战案例
  • MySQL概述
  • 计算机网络:ovn数据通信都是用了哪些协议?
  • 集成电路学习:什么是URDF Model统一机器人描述格式模型
  • 智能合约执行引擎在Hyperchain中的作用
  • 北京-4年功能测试2年空窗-报培训班学测开-第七十三天-投递简历-[特殊字符][特殊字符]
  • LangChain 框架 Parser 讲解
  • 介绍一下jQuery的AJAX异步请求
  • Nacos-1--什么是Nacos?
  • vue+flask基于规则的求职推荐系统
  • 3Ds Max的魔改利器:RailClone - 程序化建模的革命者
  • Linux Web服务器与WordPress部署笔记
  • DNS 服务详解与 BIND 部署
  • Django ORM查询技巧全解析
  • 基于Springboot+UniApp+Ai实现模拟面试小工具九:移动端框架搭建