当前位置: 首页 > ds >正文

Spring Boot集成Kafka并使用多个死信队列的完整示例

以下是Spring Boot集成Kafka并使用多个死信队列的完整示例,包含代码和配置说明。


1. 添加依赖 (pom.xml)

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
</dependencies>

2. 配置文件 (application.yml)

spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliest

3. 自定义异常类

public class BusinessException extends RuntimeException {public BusinessException(String message) {super(message);}
}

4. Kafka配置类

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;@Configuration
@EnableKafka
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;// Kafka生产者配置@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(config);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}// Kafka消费者配置@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> config = new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");return new DefaultKafkaConsumerFactory<>(config);}// 自定义错误处理器(支持多个死信队列)@Beanpublic CommonErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) {// 重试策略:3次重试,间隔1秒FixedBackOff backOff = new FixedBackOff(1000L, 3);DefaultErrorHandler errorHandler = new DefaultErrorHandler((record, exception) -> {String dlqTopic = determineDlqTopic(exception);kafkaTemplate.send(dlqTopic, record.key(), record.value());System.out.println("消息发送到死信队列: " + dlqTopic);}, backOff);// 配置需要重试的异常类型errorHandler.addRetryableExceptions(BusinessException.class);errorHandler.addNotRetryableExceptions(SerializationException.class);return errorHandler;}// 根据异常类型选择死信队列private String determineDlqTopic(Throwable exception) {if (exception.getCause() instanceof SerializationException) {return "serialization-error-dlq";} else if (exception.getCause() instanceof BusinessException) {return "business-error-dlq";} else {return "general-error-dlq";}}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setCommonErrorHandler(errorHandler(kafkaTemplate()));return factory;}
}

5. Kafka消费者服务

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {@KafkaListener(topics = "main-topic")public void consume(String message) {try {if (message.contains("invalid-format")) {throw new SerializationException("消息格式错误");} else if (message.contains("business-error")) {throw new BusinessException("业务处理失败");}System.out.println("成功处理消息: " + message);} catch (Exception e) {throw new RuntimeException(e);}}
}

6. 启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(KafkaApplication.class, args);}
}

7. 测试步骤

  1. 创建Kafka主题

    kafka-topics --create --bootstrap-server localhost:9092 --topic main-topic
    kafka-topics --create --bootstrap-server localhost:9092 --topic serialization-error-dlq
    kafka-topics --create --bootstrap-server localhost:9092 --topic business-error-dlq
    kafka-topics --create --bootstrap-server localhost:9092 --topic general-error-dlq
    
  2. 发送测试消息

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;public void sendTestMessages() {kafkaTemplate.send("main-topic", "valid-message");kafkaTemplate.send("main-topic", "invalid-format");kafkaTemplate.send("main-topic", "business-error");
    }
    
  3. 观察死信队列

    • 格式错误的消息会进入 serialization-error-dlq
    • 业务异常的消息会进入 business-error-dlq
    • 其他异常进入 general-error-dlq

关键点说明

  1. 错误路由逻辑:通过determineDlqTopic方法根据异常类型选择不同的死信队列。
  2. 重试机制:通过FixedBackOff配置重试策略(最多重试3次,间隔1秒)。
  3. 异常分类
    • SerializationException(序列化问题)直接进入死信队列,不重试。
    • BusinessException(业务异常)会触发重试,最终失败后进入死信队列。
http://www.xdnf.cn/news/3275.html

相关文章:

  • 【MySQL】增删改查(CRUD)
  • Microsoft Entra ID 免费版管理云资源详解
  • mysql-5.7.24-linux-glibc2.12-x86_64.tar.gz的下载安装和使用
  • 上海地区IDC机房服务器托管选型报告(2025年4月30日)
  • (51单片机)LCD显示红外遥控相关数据(Delay延时函数)(LCD1602教程)(Int0和Timer0外部中断教程)(IR红外遥控模块教程)
  • LeRobot 项目部署运行逻辑(三)——机器人及舵机配置
  • 【STM32实物】基于STM32的RFID多卡识别语音播报系统设计
  • 左右分屏电商带货视频批量混剪自动剪辑生产技术软件:智能剪辑与合规化方案解析
  • 【优选算法 | 前缀和】前缀和算法:高效解决区间求和问题的关键
  • 无侵入式的解决 ViewPager2 跟横向滑动子 View 手势冲突的一种思路
  • 人工智能数学基础(五):概率论
  • Kafka Producer的acks参数对消息可靠性有何影响?
  • 阿里云服务器技术纵览:从底层架构到行业赋能​
  • PostgreSQL数据库操作基本命令
  • JAVA SE 反射,枚举与lambda表达式
  • 制作一款打飞机游戏36:调度编辑器
  • K8S - 命名空间实战 - 从资源隔离到多环境管理
  • 系统升级姿势解锁:绞杀、并行与隐藏开关
  • 拥抱 Kotlin Flow
  • 虚幻商城 Quixel 免费资产自动化入库(2025年版)
  • ArcGIS Pro几个小知识点分享
  • WebRtc09:网络基础P2P/STUN/TURN/ICE
  • 「动态规划::背包」01背包 / AcWing 2(C++)
  • OpenCV 图形API(75)图像与通道拼接函数-----将 4 个单通道图像矩阵 (GMat) 合并为一个 4 通道的多通道图像矩阵函数merge4()
  • 章越科技赋能消防训练体征监测与安全保障,从传统模式到智能跃迁的实践探索
  • Hbuilder 开发鸿蒙应用,打包成 hap 格式(并没有上架应用商店,只安装调试用)
  • 【Vue2】4-开发者工具安装
  • HOW - 经典详情页表单内容数据填充(基于 Antd 组件库)
  • 数据库服务器备份,数据库服备份到另一台服务器的方法有哪些?
  • 普通IT的股票交易成长史--20250430晚