当前位置: 首页 > ds >正文

kafka初步介绍

Kafka角色介绍

Topic

        Topic主题的意思,消费者必须指定主题用于的消息发送,生产者也必须指定主题用于消息的接收。topic只是逻辑上的划分。

partition

        partition是分区的意思,他的主要作用是将发送到一个topic的数据做一个划分。如果有4个partition那么消费者就可以去这四个分区中获取消息,理想情况下提高了4倍效率。(降低Topic处理消息的压力)其中的offset是用来记录消息在分区当中的物理位置,可以用来保证在同一分区下消息的顺序性。

        partition是将消息以物理的形式进行隔离,就是在一个目录下由不同文件存储。

broker

        broker即kafka服务器中的一个节点。用于接收生产者发来的消息、将消息写入磁盘(分区对应的日志文件)、向消费者提供消息、参与分区副本的同步与 Leader 选举

consumer-group

        消费者组是让一组消费者消费一个或者多个主题的分区,一个消费者组中一个分区只会被其中一个消费者消费。

        分组的好处:组相当于调度中心,如果组内有人丢失消息了,组内维护有offset可以帮忙你送。谁没活都去配置中心领。

        为什么不用一个分组消费一个分区。每个组offset不共享,当组内无法处理时外部就会从最开始的消息开始消费出现重复消费。扩展也麻烦想要扩展只能加消费者组。

熟悉Kafka配置

kafka的配置可以通过配置类的形式进行设置,但是我们使用SpringBoot可以通过properties/yaml文件的形式加载配置(值得注意的是properties文件都是以扁平键值对,用 . 分割;yaml文件是通过树形结构)然后就可以通过注解的形式使用Kafka

#表示配置Kafaka服务地址,通过 ,可以配置多台服务
spring.kafka.bootstrap-servers = 127.0.01
#表示Kafka消息失败重试次数
spring.kafka.producer.retries=3
#设置批量发送消息大小的阀值,达到16kb就会批量发送。
#批次发送的意义是为了减少网络开支成本,多条消息建立一次网络通道
#但是这里没有设置消息等待发送时间,也就是每一条消息都会立即发送,这条消息更像是一个保险策略
spring.kafka.producer.batch-size=16384
#设置缓冲区大小,消息都会放到缓冲区里面等待
spring.kafka.producer.buffer-memory=33554432
#消息确认机制 0表示无需ack机制 1表示需要leader节点确认(ACK机制) -1 表示需要所有节点都确认
spring.kafka.producer.acks=1
#设置键值对的序列化方式。kafka对于生产者和消费者都必须设置序列化类型,
#因为Kafka将生产者消息将对象转为字符数组,消费者需要将字符数组转为需要的类型
#所以为了让Kafka能够接收消费消息都需要设置序列化类型
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
----------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------
#消费者组的意义是为了记录一个组中消息消费到的位置也就是offset
#这样新加入的消费者就知道从哪里开始执行任务
spring.kafka.consumer.group-id=default-group-td-geek
#兜底策略,避免因消息正在消费时,偏移量提交时宕机导致该条消息不消费。
#手动提交,只有在消息被消费完毕之后才会去提交偏移量
spring.kafka.consumer.enable-auto-commit=false
#兜底策略,当消费者启动时,判断偏移量是否可靠,如果不可靠 配置lateset让消费者从最新消息开始消费 配置earliest让消费者从最早消息开始消费
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消息生产者

只需要注入Kafka客户端,调用客服端对象的send方法就可以发送消息,send方法需要指定消息发送到的topic,还有具体的数据。同时我们可以设置key值用于,分区运算,保证消息顺序(在同一个分区下消息可以保证顺序性)

消息消费者

通过注解的方式绑定监听器,监听器可以接收指定的topic用来消费消息。

@KafkaListener(topics = {"alphaess_"})
//ConsumerRecord<String, String> 是Kafka中消息记录对象,第一个String指的是Key 第二个String指的是Value
public void onMessage1(ConsumerRecord<String, String> record){// 消费的哪个topic、partition的消息,打印出消息内容System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
}

http://www.xdnf.cn/news/17790.html

相关文章:

  • 不废话,UE5极速云渲染操作方法
  • STM32_bug总结(TIM定时中断进不去和只进1次)
  • MyBatis持久层实现
  • 全面解析MySQL(5)——“索引、事务、JDBC”三大核心
  • PostgreSQL——数据查询
  • 【K8s】部署安装K8s为什么要关闭swap分区?
  • Day50--图论--98. 所有可达路径(卡码网),797. 所有可能的路径
  • 元宇宙虚拟金融服务全景解析:技术创新、场景重构与未来趋势
  • 一体化步进伺服电机在无人机舱门应用中的应用案例
  • 使用Gradle手搓一个Kotlin/Native项目
  • CMU-15445(9)——PROJECT#3-Query Execution-Task#2Task#3
  • 机器学习-决策树(上)
  • TDengine 可观测性最佳实践
  • Nginx反向代理功能
  • 微前端架构:原理、场景与实践案例
  • 扫雷 (minesweeper)
  • 从0-1搭建webpack的前端工程化项目
  • 【前端基础】15、列表元素、表格元素、表单元素(注:极其粗略的记载。)
  • (3万字详解)Linux系统学习:深入了解Linux系统开发工具
  • js异步操作 Promise :fetch API 带来的网络请求变革—仙盟创梦IDE
  • Java Web项目后台管理系统之内容管理仿写:内容、搜索、页码加载
  • Zabbix携手Grafana打造炫酷监控大屏
  • 【Linux文件操作】文件操作系统调用
  • 19.Linux DHCP服务
  • 2025.8.6 图论(1)Solution
  • MySQL 基本语法
  • 对自己的 app 进行分析, 诊断,审视
  • 多路转接 select
  • 常见鱼饵制作方式
  • FPGA学习笔记——DS18B20(数字温度传感器)