当前位置: 首页 > news >正文

Kafka的无消息丢失配置怎么实现

那 Kafka 到底在什么情况下才能保证消息不丢失呢?

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证。

    第一个核心要素是“已提交的消息”。什么是已提交的消息?当 Kafka 的若干个 Broker 成

功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此

时,这条消息在 Kafka 看来就正式变为“已提交”消息了。

    那为什么是若干个 Broker 呢?这取决于你对“已提交”的定义。你可以选择只要有一个

Broker 成功保存该消息就算是已提交,也可以是令所有 Broker 都成功保存该消息才算是

已提交。不论哪种情况,Kafka 只对已提交的消息做持久化保证这件事情是不变的。

    第二个核心要素就是“有限度的持久化保证”,也就是说 Kafka 不可能保证在任何情况下

都做到不丢失消息。举个极端点的例子,如果地球都不存在了,Kafka 还能保存任何消息

吗?显然不能!倘若这种情况下你依然还想要 Kafka 不丢消息,那么只能在别的星球部署

Kafka Broker 服务器了。

    现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说 Kafka 不丢消息是

有前提条件的。假如你的消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N

个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会

丢失。

     总结一下,Kafka 是能做到不丢失消息的,只不过这些消息必须是已提交的消息,而且还要

满足一定的条件。当然,说明这件事并不是要为 Kafka 推卸责任,而是为了在出现该类问

题时我们能够明确责任边界。

最佳实践

  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一

定要使用带有回调通知的 send 方法。

  •  设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。

如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。

这是最高等级的“已提交”定义。

  •  设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到

的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。

  • 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪

些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么

它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,

即不允许这种情况的发生。

  • 设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将

消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

6. 设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入

到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千

万不要使用默认值 1。

  • 确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂

机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要

在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas +

1。

  • 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设

置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程

处理的场景而言是至关重要的。


推荐阅读

  • 事件风暴在DDD中的应用
  • 网关层数据脱敏
  • 建立估算软件开发工作量的方法
http://www.xdnf.cn/news/1098649.html

相关文章:

  • Chromium 引擎启用 Skia Graphite后性能飙升
  • 在徐州网络中服务器租用与托管的优势
  • 机器学习13——支持向量机下
  • 大数据时代UI前端的智能化升级:基于机器学习的用户意图预测
  • Qt开发:QtConcurrent介绍和使用
  • RocksDB 与 ZenFS:原理、特性及在科研与工程中的应用初步探索
  • 配置双网卡Linux主机作为路由器(连接NAT网络和仅主机模式网络)
  • systemd服务脚本详解与管理命令
  • vue3 td 标签优化时间显示
  • LFU 缓存
  • 【笔记分享】集合的基数、群、环、域
  • QT解析文本框数据——概述
  • 实现一个点击输入框可以弹出的数字软键盘控件 qt 5.12
  • 文件系统子系统 · 核心问题问答精要
  • 【性能测试】jmeter+Linux环境部署和分布式压测,一篇打通...
  • 动态规划疑惑总结
  • Ajax之核心语法详解
  • OpenCV探索之旅:多尺度视觉与形状的灵魂--图像金字塔与轮廓分析
  • 安全访问云端内部应用:用frp的stcp功能解决SSH转发的痛点
  • 【Nginx】Nginx 安装与 Sticky 模块配置
  • 使用Docker将Python项目部署到云端的完整指南
  • 网络安全(初级)(1)
  • 显卡GPU的架构和工作原理
  • QT Android 如何打包大文件到目录下?
  • Android ViewBinding 使用与封装教程​​
  • 【数据结构与算法】数据结构初阶:动态顺序表各种方法(接口函数)复盘与整理
  • 模块三:现代C++工程实践(4篇)第二篇《性能调优:Profile驱动优化与汇编级分析》
  • uniapp滚动组件, HuimayunScroll:高性能移动端滚动组件的设计与实现
  • 深入理解oracle ADG和RAC
  • 【大模型推理论文阅读】Enhancing Latent Computation in Transformerswith Latent Tokens