当前位置: 首页 > ds >正文

kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消费线程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 处理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 处理records  }}
}

此方法需要引入一个共享的offsets来参与提交。

http://www.xdnf.cn/news/10360.html

相关文章:

  • AIGC学习笔记(8)——AI大模型开发工程师
  • [SC]SystemC在CPU/GPU验证中的应用(六)
  • 大语言模型值ollama使用(1)
  • 2.1HarmonyOS NEXT开发工具链进阶:DevEco Studio深度实践
  • Win10 doccano pip安装笔记
  • Redis最佳实践——安全与稳定性保障之连接池管理详解
  • python做题日记(11)
  • Go 语言的 GC 垃圾回收
  • AWTK 嵌入式Linux平台实现多点触控缩放旋转以及触点丢点问题解决
  • 使用VSCode在WSL和Docker中开发
  • 【机器学习】支持向量机
  • 手写HashMap
  • 8086 处理器 Flags 标志位全解析:CPU 的 “晴雨表” 与 “遥控器”总结:
  • Qlib量化工具介绍与使用指南
  • org.junit.runners.model.InvalidTestClassError:此类问题的解决
  • 【MySQL】索引下推减少回表次数
  • SpringBoot整合MyBatis完整实践指南
  • 5.31 day33
  • 【LUT技术专题】图像自适应3DLUT
  • 设计模式——适配器设计模式(结构型)
  • SpringBoot3-从环境搭建到异常处理的完整指南
  • C++ 命令模式:设计与实现详解
  • flowable候选人及候选人组(Candidate Users 、Candidate Groups)的应用包含拾取、归还、交接
  • LLm中 float16和 float32 区别,为什么训练不能采用float16--梯度消失
  • LeetCode 算 法 实 战 - - - 移 除 链 表 元 素、反 转 链 表
  • go|context源码解析
  • 【Block总结】Dynamic Tanh (DyT)|即插即用|何凯明和Yann LeCun署名
  • 4.2.5 Spark SQL 分区自动推断
  • 开发体育平台,怎么接入最合适的数据接口
  • 免费高清多功能录屏软件推荐