当前位置: 首页 > news >正文

kafka入门(二)

Java客户端访问Kafka

引入maven依赖

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka‐clients</artifactId>
<version>2.4.1</version>
</dependency>

消息发送端代码

package com.tuling.kafka.kafkaDemo;import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;public class MsgProducer {private final static String TOPIC_NAME = "my‐replicated‐topic";public static void main(String[] args) throws InterruptedException, ExecutionException {Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");// props.put(ProducerConfig.ACKS_CONFIG, "1");// props.put(ProducerConfig.RETRIES_CONFIG, 3);// props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);// props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);// props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);// props.put(ProducerConfig.LINGER_MS_CONFIG, 10);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());Producer<String, String> producer = new KafkaProducer<>(props);int msgNum = 5;final CountDownLatch countDownLatch = new CountDownLatch(msgNum);for (int i = 1; i <= msgNum; i++) {Order order = new Order(i, 100 + i, 1, 1000.00);// 指定发送分区// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0, order.getOrderId().toString(), JSON.toJSONString(order));// 未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNumProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));// 等待消息发送成功的同步阻塞方法// RecordMetadata metadata = producer.send(producerRecord).get();// System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"//         + metadata.partition() + "|offset-" + metadata.offset());// 异步回调方式发送消息producer.send(producerRecord, new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if (exception != null) {System.err.println("发送消息失败:" + exception.getStackTrace());}if (metadata != null) {System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());}countDownLatch.countDown();}});// 送积分 TODO}countDownLatch.await(5, TimeUnit.SECONDS);producer.close();}
}

消息接收端代码

package com.tuling.kafka.kafkaDemo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class MsgConsumer {private final static String TOPIC_NAME = "my‐replicated‐topic";private final static String CONSUMER_GROUP_NAME = "testGroup";public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094");props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");// props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");// props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC_NAME));// 消费指定分区// consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));// 消息回溯消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));*/// 指定offset消费/*consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);*/// 从指定时间点开始消费/*List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);// 从1小时前开始消费long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;Map<TopicPartition, Long> map = new HashMap<>();for (PartitionInfo par : topicPartitions) {map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);}Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {TopicPartition key = entry.getKey();OffsetAndTimestamp value = entry.getValue();if (key == null || value == null) continue;Long offset = value.offset();System.out.println("partition-" + key.partition() + "|offset-" + offset);System.out.println();// 根据消费里的timestamp确定offsetif (value != null) {consumer.assign(Arrays.asList(key));consumer.seek(key, offset);}}*/while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.printf("收到消息:partition = %d, offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());}/*if (records.count() > 0) {// 手动同步提交offset,当前线程会阻塞直到offset提交成功// 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了consumer.commitSync();// 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {if (exception != null) {System.err.println("Commit failed for " + offsets);System.err.printl

Spring Boot整合Kafka

引入spring boot kafka依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring‐kafka</artifactId></dependency>

application.yml配置如下:

server:port: 8080spring:kafka:bootstrap-servers: 192.168.65.60:9092,192.168.65.60:9093,192.168.65.60:9094producer:retries: 3batch-size: 16384buffer-memory: 33554432acks: 1key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manual_immediate

发送者代码:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my‐replicated‐topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}
}

消费者代码:

package com.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;@Component
public class MyConsumer {/*** @KafkaListener(groupId = "testGroup", topicPartitions = {* @TopicPartition(topic = "topic1", partitions = {"0", "1"}),* @TopicPartition(topic = "topic2", partitions = "0",* partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))* }, concurrency = "6")* // concurrency 就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数* @param record*/@KafkaListener(topics = "my‐replicated‐topic", groupId = "zhugeGroup")public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);// 手动提交offsetack.acknowledge();}/* // 配置多个消费组@KafkaListener(topics = "my‐replicated‐topic", groupId = "tulingGroup")public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {String value = record.value();System.out.println(value);System.out.println(record);ack.acknowledge();} */
}

http://www.xdnf.cn/news/542575.html

相关文章:

  • [创业之路-369]:企业战略管理案例分析-9-战略制定-差距分析的案例之华为
  • 「华为」持续加码人形机器人赛道!
  • 动态规划之爬楼梯模型
  • 如何将内网的IP地址映射到外网?常见方法及详细步骤
  • 头歌实践平台:动态NAT配置
  • Java虚拟机 - 程序计数器和虚拟机栈
  • DeepSeek-V3 vs GPT-4:技术对比与性能评测
  • php、laravel框架下如何将一个png图片转化为jpg格式
  • 2025年医美行业报告60+份汇总解读 | 附 PDF 下载
  • II-Medical-8B论文速读:课程SFT,DPO和RL 为长思维链推理从无到有
  • 焊接结构动力疲劳计算
  • Nvidia - NVLink Fusion
  • CouchDB 可观测最佳实践
  • ChatGPT助力继续教育自动答题
  • PyTorch进阶实战指南:01自定义神经网络组件开发
  • LLM的应用
  • Java转Go日记(四十四):Sql构建
  • 服务器磁盘不同格式挂载区别
  • Python数据可视化再探——Matplotlib模块 之一
  • python-数据可视化(大数据、数据分析、可视化图像、HTML页面)
  • [ 计算机网络 ] 深入理解TCP/IP协议
  • 万亿健康服务市场新挑战:传统上门按摩平台的技术架构升级迫在眉睫
  • c语言- 如何构建CMake项目(Linux/VSCode)
  • uniapp-商城-63-后台 商品列表(分类展示商品的删除)
  • [每日一题] 3355. 零数组变换 i
  • 如何删除 HP 笔记本电脑中的所有数据:3 种解决方案说明
  • [Java] idea的调试介绍
  • win7无线网络名称显示为编码,连接对应网络不方便【解决办法】
  • Journal of Real-Time Image Processing 投稿过程
  • 推扫式高光谱相机VIX-N230重磅发布——开启精准成像新时代