当前位置: 首页 > ops >正文

消费者API

目录

  • 独立消费者案例(订阅主题)
  • 独立消费者案例(订阅分区)
  • 消费者组案例

独立消费者案例(订阅主题)

在这里插入图片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

独立消费者案例(订阅分区)

在这里插入图片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者组案例

测试同一个主题的分区数据,只能由于一个消费者组中的一个一个消费

消费者1

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者2

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者3

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer2 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

三个消费者的组ID相同,会形成消费者组,每个消费者消费一个分区数据

生产者发送数据

package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",2,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",1,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}

```c
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",0,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}
http://www.xdnf.cn/news/18130.html

相关文章:

  • Flink on Native K8S安装部署
  • 软件系统运维常见问题
  • 快手可灵招海外产品运营实习生
  • 51单片机拼接板(开发板积木)
  • 计算机毕设推荐:痴呆症预测可视化系统Hadoop+Spark+Vue技术栈详解
  • MySQL事务篇-事务概念、并发事务问题、隔离级别
  • Vibe 编码技巧与建议(Vibe Coding Tips and Tricks)
  • AAA服务器技术
  • Qt中使用QString显示平方符号(如²)
  • 搭建最新--若依分布式spring cloudv3.6.6 前后端分离项目--步骤与记录常见的坑
  • 【qml-5】qml与c++交互(类型单例)
  • 前端下载文件、压缩包
  • Java网络编程:TCP与UDP通信实现及网络编程基础
  • 集成电路学习:什么是Object Tracking目标跟踪
  • 大模型参数如何影响模型的学习和优化?
  • 从H.264到AV1:音视频技术演进与模块化SDK架构全解析
  • 开源游戏引擎Bevy 和 Godot
  • ProfiNet从站转Modbus TCP网关技术详解
  • 【深度解析】2025年中国GEO优化公司:如何驱动“答案营销”
  • 【实时Linux实战系列】实时大数据处理与分析
  • 关闭VSCode Markdown插件在Jupyter Notebook中的自动预览
  • 第四章:大模型(LLM)】07.Prompt工程-(2)Zero-shot Prompt
  • Node.js完整安装配置指南(包含国内镜像配置)
  • 【2025CVPR-目标检测方向】学习稳健且硬件自适应的对象检测器,以应对边缘设备的延迟攻击
  • 黑马java入门实战笔记
  • 链路聚合路由器OpenMPTCProuter源码编译与运行
  • 【Day 30】Linux-Mysql数据库
  • vue的双向数据绑定
  • 【DL学习笔记】损失函数各个类别梳理
  • Go并发编程-goroutine