当前位置: 首页 > ops >正文

Spring Boot 与 Kafka 的深度集成实践(三)

4.2 接收消息

配置好消费者后,就可以使用@KafkaListener注解来监听 Kafka 主题并接收消息了 。@KafkaListener注解可以标记在方法上,用于指定要监听的主题、消费者组等信息 。

以下是使用@KafkaListener注解接收消息的代码示例:

 

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

@Component

public class KafkaConsumer {

@KafkaListener(topics = "my-topic", groupId = "my-group")

public void receive(String message) {

System.out.println("接收到消息: " + message);

}

}

在上述代码中:

  • @KafkaListener注解标记在receive方法上,topics属性指定了要监听的主题为my-topic,groupId属性指定了消费者组为my-group 。当有消息发送到my-topic主题时,并且属于my-group消费者组的消费者就会监听并接收该消息 。
  • receive方法的参数message就是接收到的消息内容,在方法内部可以对消息进行处理,这里只是简单地将消息打印输出 。

4.3 消息处理

在接收到 Kafka 消息后,通常需要对消息进行处理,常见的处理逻辑包括数据持久化、业务逻辑处理等 。同时,为了保证系统的稳定性和可靠性,还需要对消息处理过程中可能出现的异常进行处理 。

数据持久化

假设接收到的消息是一个用户订单信息,需要将其保存到数据库中 。可以使用 Spring Data JPA 来实现数据持久化 。首先,定义一个订单实体类Order:

 

import javax.persistence.Entity;

import javax.persistence.GeneratedValue;

import javax.persistence.GenerationType;

import javax.persistence.Id;

@Entity

public class Order {

@Id

@GeneratedValue(strategy = GenerationType.IDENTITY)

private Long id;

private String orderNumber;

private double amount;

// 省略getter和setter方法

}

然后,定义一个订单仓库接口OrderRepository:

 

import org.springframework.data.jpa.repository.JpaRepository;

public interface OrderRepository extends JpaRepository<Order, Long> {

}

最后,在 Kafka 消费者中注入OrderRepository,并将接收到的消息保存到数据库中:

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

@Component

public class KafkaConsumer {

@Autowired

private OrderRepository orderRepository;

@KafkaListener(topics = "order-topic", groupId = "order-group")

public void receive(String message) {

// 假设消息格式为 "订单号,金额",例如 "1001,100.0"

String[] parts = message.split(",");

String orderNumber = parts[0];

double amount = Double.parseDouble(parts[1]);

Order order = new Order();

order.setOrderNumber(orderNumber);

order.setAmount(amount);

orderRepository.save(order);

System.out.println("订单已保存: " + order);

}

}

在上述代码中,receive方法接收到消息后,将消息按照逗号分隔,解析出订单号和金额,然后创建一个Order对象,并使用orderRepository.save(order)方法将订单保存到数据库中 。

业务逻辑处理

如果接收到的消息需要进行复杂的业务逻辑处理,比如对用户订单进行折扣计算、库存更新等操作 。可以在 Kafka 消费者中调用相应的业务服务来完成这些处理 。

假设存在一个订单服务OrderService,其中包含计算折扣和更新库存的方法:

 

import org.springframework.stereotype.Service;

@Service

public class OrderService {

public double calculateDiscount(double amount) {

// 简单示例:如果订单金额大于100,打9折

if (amount > 100) {

return amount * 0.9;

}

return amount;

}

public void updateStock(String orderNumber, int quantity) {

// 这里省略实际的库存更新逻辑,比如更新数据库中的库存数量

System.out.println("更新库存: 订单号 " + orderNumber + ",数量 " + quantity);

}

}

在 Kafka 消费者中注入OrderService,并调用其方法进行业务逻辑处理:

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

@Component

public class KafkaConsumer {

@Autowired

private OrderService orderService;

@KafkaListener(topics = "order-topic", groupId = "order-group")

public void receive(String message) {

// 假设消息格式为 "订单号,金额,数量",例如 "1001,100.0,5"

String[] parts = message.split(",");

String orderNumber = parts[0];

double amount = Double.parseDouble(parts[1]);

int quantity = Integer.parseInt(parts[2]);

double discountedAmount = orderService.calculateDiscount(amount);

orderService.updateStock(orderNumber, quantity);

System.out.println("订单处理完成: 订单号 " + orderNumber + ",折扣后金额 " + discountedAmount);

}

}

在上述代码中,receive方法接收到消息后,解析出订单号、金额和数量,然后调用orderService.calculateDiscount(amount)方法计算折扣后的金额,调用orderService.updateStock(orderNumber, quantity)方法更新库存 。

异常处理

在消息处理过程中,可能会出现各种异常,如消息格式错误、数据库操作失败、业务逻辑异常等 。为了保证系统的稳定性,需要对这些异常进行妥善处理 。

Spring Kafka 提供了多种异常处理方式,其中一种是使用SeekToCurrentErrorHandler作为默认的异常处理器 。SeekToCurrentErrorHandler会在消息处理失败时,将偏移量重置为当前消息的偏移量,以便在下次重试时重新处理该消息 。同时,可以设置最大重试次数和重试间隔时间 。

以下是配置异常处理器的代码示例:

 

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import org.springframework.kafka.listener.SeekToCurrentErrorHandler;

import org.springframework.retry.backoff.ExponentialBackOffPolicy;

import org.springframework.retry.policy.SimpleRetryPolicy;

import org.springframework.retry.support.RetryTemplate;

import java.util.HashMap;

import java.util.Map;

@Configuration

@EnableKafka

public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")

private String bootstrapServers;

@Value("${spring.kafka.consumer.group-id}")

private String groupId;

@Bean

public Map<String, Object> consumerConfigs() {

Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return props;

}

@Bean

public ConsumerFactory<String, String> consumerFactory() {

return new DefaultKafkaConsumerFactory<>(consumerConfigs());

}

@Bean

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory());

factory.setConcurrency(3);

// 配置异常处理器

SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler(retryTemplate());

errorHandler.setMaxAttempts(3); // 设置最大重试次数为3

factory.setErrorHandler(errorHandler);

return factory;

}

@Bean

public RetryTemplate retryTemplate() {

RetryTemplate retryTemplate = new RetryTemplate();

// 配置重试策略

SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();

retryPolicy.setMaxAttempts(3); // 设置最大重试次数为3

retryTemplate.setRetryPolicy(retryPolicy);

// 配置重试间隔策略

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();

backOffPolicy.setInitialInterval(1000); // 初始重试间隔为1秒

backOffPolicy.setMultiplier(2); // 每次重试间隔翻倍

backOffPolicy.setMaxInterval(10000); // 最大重试间隔为10秒

retryTemplate.setBackOffPolicy(backOffPolicy);

return retryTemplate;

}

}

在上述代码中:

  • kafkaListenerContainerFactory方法中,创建了一个SeekToCurrentErrorHandler实例,并传入retryTemplate()方法返回的重试模板 。设置setMaxAttempts(3)表示最大重试次数为 3 次 。
  • retryTemplate方法配置了重试模板,其中SimpleRetryPolicy设置了最大重试次数为 3 次,ExponentialBackOffPolicy设置了初始重试间隔为 1 秒,每次重试间隔翻倍,最大重试间隔为 10 秒 。

当消息处理过程中出现异常时,SeekToCurrentErrorHandler会按照配置的重试策略进行重试 。如果重试达到最大次数后仍然失败,会根据具体的业务需求进行进一步处理,比如将异常消息记录到日志中,或者发送到专门的异常处理队列中 。

5. 集成中的常见问题与解决方案

5.1 连接问题

在将 Spring Boot 与 Kafka 集成时,连接问题是常见的故障点之一。连接失败可能导致消息无法发送和接收,严重影响系统的正常运行 。以下是一些可能导致 Kafka 连接失败的常见原因及相应的排查和解决方法 。

地址错误

如果在 Spring Boot 的配置文件中指定的 Kafka 服务器地址错误,客户端将无法连接到 Kafka 集群 。例如,在application.properties文件中配置spring.kafka.bootstrap-servers时,可能误写了 IP 地址或端口号。

 

# 错误配置,IP地址错误

spring.kafka.bootstrap-servers=192.168.1.100:9092

排查方法:仔细检查配置文件中的 Kafka 服务器地址,确保 IP 地址和端口号与实际的 Kafka 集群配置一致 。可以通过 ping 命令或 telnet 命令来测试网络连接是否畅通 。例如,使用 telnet 命令测试与 Kafka 服务器的连接:

 

telnet 192.168.1.100 9092

如果无法连接,说明地址或端口可能存在问题 。

端口被占用

Kafka 默认使用 9092 端口进行通信,如果该端口被其他进程占用,Kafka 服务器将无法启动,客户端也无法连接 。

排查方法:在 Linux 系统中,可以使用netstat命令或lsof命令来查看端口占用情况 。例如,使用netstat命令查看 9092 端口的占用情况:

 

netstat -tuln | grep 9092

如果有其他进程占用了 9092 端口,可以通过lsof -i :9092命令查看占用该端口的进程信息,并根据需要终止该进程 。

解决方法:如果确定是端口被占用导致的连接问题,可以修改 Kafka 的配置文件server.properties,将listeners配置项中的端口号修改为其他未被占用的端口,然后重启 Kafka 服务器 。

 

# 修改为其他未被占用的端口,如9093

listeners=PLAINTEXT://:9093

同时,需要在 Spring Boot 的配置文件中更新 Kafka 服务器的端口号:

 

spring.kafka.bootstrap-servers=192.168.1.100:9093

防火墙限制

如果服务器上启用了防火墙,可能会阻止客户端与 Kafka 服务器之间的通信 。防火墙可能会限制对 Kafka 端口(默认为 9092)的访问,导致连接失败 。

排查方法:检查服务器的防火墙配置,查看是否限制了对 Kafka 端口的访问 。在 Linux 系统中,可以使用iptables -L命令查看防火墙规则 。

解决方法:如果是防火墙限制导致的连接问题,可以通过以下几种方式解决:

  • 开放防火墙端口:在 Linux 系统中,可以使用iptables命令开放 9092 端口 。例如:
 

iptables -A INPUT -p tcp --dport 9092 -j ACCEPT

  • 关闭防火墙:在测试环境或安全可控的情况下,可以暂时关闭防火墙 。在 Linux 系统中,可以使用以下命令关闭防火墙:
 

systemctl stop firewalld

systemctl disable firewalld

  • 配置防火墙规则:如果不想关闭防火墙,可以根据实际情况配置防火墙规则,允许特定的 IP 地址或网段访问 Kafka 端口 。例如,允许 192.168.1.0/24 网段的主机访问 9092 端口:
 

iptables -A INPUT -p tcp -s 192.168.1.0/24 --dport 9092 -j ACCEPT

5.2 消息丢失与重复消费

在 Kafka 的使用过程中,消息丢失和重复消费是比较常见且需要重点关注的问题,它们可能会对系统的数据一致性和业务逻辑产生严重影响 。以下是对这两个问题的原因分析及解决方案 。

消息丢失原因及解决方案
  • 生产者未确认:如果生产者在发送消息时,没有正确配置acks参数,或者没有处理好发送结果,可能会导致消息丢失 。当acks=0时,生产者发送消息后不会等待 Kafka 集群的确认,直接继续发送下一条消息,这种情况下如果网络出现问题,消息可能会丢失 。当acks=1时,生产者只会等待 Kafka 集群中 leader 节点的确认,而不等待 follower 节点的同步,如果 leader 节点在同步数据给 follower 节点之前发生故障,消息也可能丢失 。
  • 解决方法:为了确保消息不丢失,建议将acks配置为all(或-1),表示生产者会等待所有同步副本确认消息已收到,这样可以最大程度地保证消息的可靠性 。同时,在生产者发送消息时,要正确处理发送结果,通过异步回调或同步等待的方式,确保消息成功发送 。例如,在异步发送消息时,添加回调函数来处理发送结果:
 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.stereotype.Service;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

@Service

public class KafkaProducerService {

private static final String TOPIC = "my-topic";

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

public void sendMessageAsync(String message) {

ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);

future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override

public void onSuccess(SendResult<String, String> result) {

System.out.println("消息发送成功:" + result.getRecordMetadata());

}

@Override

public void onFailure(Throwable ex) {

System.out.println("消息发送失败:" + ex.getMessage());

// 这里可以添加重试逻辑或其他处理方式

}

});

}

}

  • Broker 故障:Kafka Broker 的临时故障可能导致消息未被持久化,从而丢失 。例如,Broker 所在的服务器磁盘故障、内存溢出等问题,都可能影响消息的存储 。
  • 解决方法:为了提高 Kafka 集群的可靠性,可以增加副本数量,确保每个分区有多个副本 。同时,合理配置min.insync.replicas参数,该参数指定了必须确认写操作成功的最小副本数量 。当acks=all且min.insync.replicas设置合理时,如果大多数副本没有收到写操作,生产者将引发异常,从而避免消息丢失 。例如,创建一个复制因子为 3 的主题,设置min.insync.replicas为 2:
 

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-topic --config min.insync.replicas=2

  • 网络问题:网络中断或不稳定可能导致消息在传输过程中丢失 。例如,生产者与 Kafka 集群之间的网络抖动、延迟过高,可能会使消息无法成功到达 Broker 。
  • 解决方法:可以通过配置生产者的重试机制来应对网络问题 。设置retries参数为一个合理的值,当消息发送失败时,生产者会自动重试发送 。同时,可以适当调整retry.backoff.ms参数,控制重试的间隔时间 。例如:
 

spring.kafka.producer.retries=3

spring.kafka.producer.retry.backoff.ms=1000

重复消费原因及解决方案
  • 消费者组重新平衡:当消费者组发生变化时,Kafka 可能会重新分配分区,导致某些消息被多次消费 。例如,新的消费者加入消费者组、已有消费者离开消费者组,或者消费者的心跳超时等情况,都可能触发消费者组的重新平衡 。
  • 解决方法:为了减少消费者组重新平衡对消息消费的影响,可以合理设置session.timeout.ms和max.poll.interval.ms参数 。session.timeout.ms表示消费者与 Kafka 集群之间的心跳超时时间,如果超过这个时间没有收到心跳,Kafka 会认为消费者已死亡,从而触发重新平衡 。max.poll.interval.ms表示消费者在两次调用poll方法之间的最大时间间隔,如果超过这个时间,Kafka 也会认为消费者已死亡,触发重新平衡 。可以适当增大这两个参数的值,减少不必要的重新平衡 。同时,在消费者端进行去重处理,为每条消息生成唯一的 ID,并在消费时检查是否已经消费过该消息 。例如,可以使用数据库或缓存来记录已消费的消息 ID 。
  • 消费者处理失败:如果消费者在处理消息时失败,并且没有正确处理异常,可能会导致消息被重新消费 。例如,消费者在处理消息时发生异常,但没有将偏移量提交,当消费者重启后,会再次消费该消息 。
  • 解决方法:在消费者处理消息时,要正确处理异常,确保消息处理的原子性 。可以使用事务来保证消息处理和偏移量提交的一致性 。同时,实现幂等性消费,设计消费者逻辑时,确保即使消息被重复消费也不会对系统状态产生影响 。例如,在数据库操作中,可以使用唯一约束来避免重复插入数据 。
  • 消息未正确提交:消费者在处理完消息后未能正确提交偏移量,可能会导致消息被重复消费 。例如,消费者在自动提交偏移量时,由于网络问题或其他原因,导致偏移量提交失败,但消息已经被处理,当消费者重启后,会再次从上次未提交的偏移量处开始消费,从而导致消息重复 。
  • 解决方法:可以将消费者的偏移量提交方式设置为手动提交,在消息处理完成后,手动调用commitSync或commitAsync方法提交偏移量 。同时,在提交偏移量时,要处理好异常情况,确保偏移量能够正确提交 。例如:
 

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.springframework.stereotype.Component;

import java.util.Arrays;

import java.util.Properties;

@Component

public class KafkaConsumerManualCommit {

public void consume() {

Properties props = new Properties();

props.put("bootstrap.servers", "localhost:9092");

props.put("group.id", "my-group");

props.put("enable.auto.commit", "false");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("my-topic"));

try {

while (true) {

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records) {

System.out.println("收到消息: " + record.value());

// 处理消息

}

// 手动提交偏移量

consumer.commitSync();

}

} catch (Exception e) {

e.printStackTrace();

} finally {

consumer.close();

}

}

}

http://www.xdnf.cn/news/13201.html

相关文章:

  • 多模态分类案例实现
  • 可视化如何全方位赋能销售工作
  • Rust 学习笔记:通过 Send 和 Sync trait 实现可扩展并发性
  • 常用的OceanBase调优配置参数
  • Vue 的 v-model 指令详解
  • 2023年全国研究生数学建模竞赛华为杯D题区域双碳目标与路径规划研究求解全过程文档及程序
  • C# 中常用的 字符串截取方法
  • 代码解读——ReferenceNet
  • 深入理解Linux DRM显示子系统:架构、实战项目与关键问题全解析
  • 相机camera开发之差异对比核查二:测试机和对比机的差异提交对比
  • 项目又延期?如何用“灵活IT人力外包”快速补位技术缺口
  • Android高性能音频与图形开发:OpenSL ES与OpenGL ES最佳实践
  • NexusTerminal一款视频移动端的webSSH
  • 人工操舵是如何操作的?介绍人工操舵的经验和规律
  • 云原生核心技术 (4/12): Docker 进阶:镜像优化实战与 Docker Compose 揭秘
  • Python----OpenCV(图像处理——图像的多种属性、RGB与BGR色彩空间、HSB、HSV与HSL、ROI区域)
  • Dual-Port MIPI to HDMI 2.0,4k@60Hz
  • Java + Spring Boot项目枚举(Enum)目录建议
  • Couchbase 可观测性最佳实践
  • 二十、【用户管理与权限 - 篇二】前端交互:实现用户管理界面
  • C++17 std::string_view:性能与便捷的完美结合
  • 【习题】应用程序框架基础
  • 蓝桥杯国赛训练 day3
  • C++ 8.1 内联函数
  • 【Nginx系列】Nginx 负载均衡策略之 least_conn
  • shell脚本--查看应用的cpu 和 内存使用率 并把最新告警内容显示出来
  • Huggingface-CLI的使用
  • AIStarter 4.0 苹果版体验评测|轻松部署 ComfyUI 与 DeepSeek 的 AI 工具箱
  • 二刷苍穹外卖 day01
  • 为MySQL社区版实现审计功能:从插件配置到日志监控全解析