当前位置: 首页 > news >正文

kafak


一、普通使用

生产者
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerExample {
public static void main(String[] args) {
// 1. 配置生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 确保消息持久化[6](@ref)
props.put("retries", 3); // 重试次数

        // 2. 创建生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);

        // 3. 发送消息(异步回调)
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key1", "Hello Kafka!");
producer.send(record, (metadata, e) -> {
if (e == null) {
System.out.printf("✅ 消息发送成功: topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
e.printStackTrace();
}
});

        // 4. 关闭连接
producer.close();
}
}

消费者
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
public static void main(String[] args) {
// 1. 配置消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group"); // 消费者组ID[6](@ref)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费

        // 2. 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题

        // 3. 持续拉取消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("📩 收到消息: key=%s, value=%s (partition=%d, offset=%d)%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} finally {
consumer.close(); // 确保资源释放
}
}
}


2、SpringBoot环境下使用

生产者的配置与使用
@Configuration
public class KafkaConfig {
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
}

@Service
public class MessageService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message); // 发送消息[4](@ref)
}
}


消费者的配置与使用

@Configuration
@EnableKafka
public class ConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}

    @Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

@Service
public class MessageListener {
@KafkaListener(topics = "test-topic", groupId = "spring-group")
public void listen(String message) {
System.out.println("🔔 Spring消费消息: " + message); // 自动监听主题[4,6](@ref)
}
}

http://www.xdnf.cn/news/1262431.html

相关文章:

  • windows自动获取wsl IP,并开启端口转发。
  • 【代码随想录day 14】 力扣 111.二叉树的最小深度
  • Axure基于中继器实现的组件库(导航菜单、动态表格)
  • Array Description(Dynamic programming)
  • 在发布应用程序内测时如何选择合适的分发上架方式?
  • Git 基础操作笔记(速查)
  • 视频遥测终端机是什么,其工作原理和应用领域
  • 高校合作 | 世冠科技联合普华、北邮项目入选教育部第二批工程案例
  • 01数据结构-图的概念和图的存储结构
  • 数据结构---二叉树(概念、特点、分类、特性、读取顺序、例题)、gdb调试指令、时间复杂度(概念、大O符号法、分类)
  • 【世纪龙科技】数智重构车身实训-汽车车身测量虚拟实训软件
  • 二叉树实现
  • Docker 创建镜像错误记录
  • Redis缓存击穿、穿透雪崩
  • 【NFTurbo】基于DockerCompose一键部署
  • gmssl私钥文件格式
  • 用户组权限及高级权限管理:从基础到企业级 sudo 提权实战
  • 《从零构建大语言模型》学习笔记2,文本数据处理1(以及tiktoken库无法下载gpt2参数,调用get_encoding时SSL超时的解决方法)
  • Redis是单线程性能还高的原因
  • SaaS 版 MES 系统业务文档
  • 【SpringBoot】SpringBoot配置
  • GPT OSS 双模型上线,百度百舸全面支持快速部署
  • 华为USG防火墙双机,但ISP只给了1个IP, 怎么办?
  • 医防融合中心-智慧化慢病全程管理医疗AI系统开发(上)
  • C++信息学奥赛一本通-第一部分-基础一-第2章-第5节
  • 单层 PDF 与双层 PDF:一字之差,功能大不同
  • 修复C++14兼容性问题 逻辑检查
  • 力扣-238.除自身以外数组的乘积
  • FileLink:企业数据传输的革新者​
  • Node.js Turbo 包入门教程