当前位置: 首页 > java >正文

Spring框架集成Kakfa的方式

Spring框架集成Kakfa的方式

springboot集成kafka的方式

添加maven依赖

<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.3.0</version>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

配置application.yml

spring:kafka:producer:bootstrap-servers: ip:porttopics: topicsretries: 0batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:topics: topicsbootstrap-servers: ip:portgroup-id: group_idauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:security.protocol: SASL_SSLsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";ssl.truststore.location: client.truststore.jksssl.truststore.password: trus_passwordssl.endpoint.identification.algorithm:

创建kafka生产者和消费者

在Spring Boot应用中,正确配置application.propertiesapplication.yml后,Spring Boot的Kafka自动配置(KafkaAutoConfiguration)会自动创建和装配KafkaTemplateKafkaConsumer等相关的Bean。

  • KafkaTemplate:用于发送消息到Kafka

  • ConsumerFactory:创建Kafka消费者的工厂

  • KafkaListenerContainerFactory:为@KafkaListener方法创建消息监听容器。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class KafkaMessageService {@Value("${spring.kafka.producer.topics}")private String outputTopic;@Autowiredprivate final KafkaTemplate<String, String> kafkaTemplate;/*** 监听输入主题的消息* @param message 接收到的消息*/@KafkaListener(topics = "${spring.kafka.consumer.topics}")public void listen(String message) {log.info("Received message:  message = {}", topic, message);// todo 处理消息// 发送到输出主题kafkaTemplate.send(outputTopic, processedMessage);log.info("Sent Processed Message: {}", processedMessage);}
}

手动配置kafka生产者和消费者

如果需要更复杂的配置,也可以自定义kafka的配置类。

kafka消费者配置类:

@Configuration
@Slf4j
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);props.put("ssl.truststore.location", truststoreLocation);props.put("ssl.truststore.password", truststorePassword);props.put("ssl.endpoint.identification.algorithm", endpointIdentificationAlgorithm);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量,模拟多个独立的消费者并发处理消息factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;}

kafka生产者配置类:

@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.key-serializer}")private String keyDeserializer;@Value("${spring.kafka.producer.value-serializer}")private String valueDeserializer;@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>(4);configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keyDeserializer);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueDeserializer);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {return new KafkaTemplate<>(producerFactory);}
}

监听消息并处理:

@Component
@Slf4j
public class KafkaMessageProcess {@Value("${spring.kafka.producer.topics}")private String outTopic;@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@KafkaListener(topics = "#{'${spring.kafka.consumer.topics}'.split(',')}")public void listen(@Payload String message,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,Acknowledgment acknowledgment) {log.info("Received message: topic = {}, message = {}", topic, message);// 手动确认消息,提交当前消息的偏移量(offset)到Kafka。Kafka会记录这个偏移量,表示该消息(及之前的所有消息)已被成功消费。acknowledgment.acknowledge();}private void process(String message) {// todo process msg}}

KafkaListener 源码分析

@KafkaListener 的注册

  1. 扫描注解:在bean初始化阶段,KafkaListenerAnnotationBeanPostProcessor 由于实现了BeanPostProcessor,会扫描所有 Bean,查找 @KafkaListener 注解
KafkaListenerAnnotationBeanPostProcessor// 注:省略了部分代码
// BeanPostProcessor接口提供的方法,是 Spring 框架的核心扩展机制之一,允许在 Bean 初始化后进行自定义处理。
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {// 扫描标注了KafkaListener的类Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);// 扫描标注了KafkaListener的方法Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (methodx) -> {Set<KafkaListener> listenerMethods = this.findListenerAnnotations(methodx);return !listenerMethods.isEmpty() ? listenerMethods : null;});// 遍历扫描到的方法,解析签名Iterator var13 = annotatedMethods.entrySet().iterator();Map.Entry<Method, Set<KafkaListener>> entry = (Map.Entry)var13.next();Method method = (Method)entry.getKey();Iterator var11 = ((Set)entry.getValue()).iterator();while(var11.hasNext()) {KafkaListener listener = (KafkaListener)var11.next();// 扫描到后,后续的解析注册逻辑this.processKafkaListener(listener, method, bean, beanName);}return bean;
}
  1. 解析注解:提取 topicsgroupIdcontainerFactory 等信息。
KafkaListenerAnnotationBeanPostProcessorprotected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {// 解析注解,将注解元数据、方法、bean等静态配置封装到endpointthis.processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);String containerFactory = this.resolve(kafkaListener.containerFactory());KafkaListenerContainerFactory<?> listenerContainerFactory = this.resolveContainerFactory(kafkaListener, containerFactory, beanName);// 将上一步扫描到的listener、method等封装成endpoint,进行注册this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
}
  1. 注册监听端点:调用 KafkaListenerEndpointRegistrar.registerEndpoint() 注册监听器。
KafkaListenerEndpointRegistrarpublic void registerEndpoint(KafkaListenerEndpoint endpoint, @Nullable KafkaListenerContainerFactory<?> factory) {KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized(this.endpointDescriptors) {// 是否立即启动,// true:立即创建并启动对应的 MessageListenerContainer(Kafka 消费者容器)// false: 仅将端点信息保存到 endpointDescriptors 集合中,后续统一创建并启动if (this.startImmediately) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor), true);} else {this.endpointDescriptors.add(descriptor);}}
}// 统一创建KafkaMessageListenerContainer并启动
protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}
}public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {synchronized (this.listenerContainers) {// 创建MessageListenerContainer,这个方法是创建容器的环节,源码看下一节分析MessageListenerContainer container = createListenerContainer(endpoint, factory);// 将创建好的容器放到一个线程安全的map中this.listenerContainers.put(id, container);if (startImmediately) {// 启动startIfNecessary(container);}}
}

KafkaListenerContainerFactory 创建监听容器

KafkaMessageListenerContainer 是 Spring Kafka 的核心组件之一,负责 管理和执行 Kafka 消费者的消息监听逻辑,封装了原生 KafkaConsumer,提供了线程管理、消息拉取、监听器调用、错误处理等功能。

暂时回到我们开头的配置,这里我们配置的容器创建工厂是ConcurrentKafkaListenerContainerFactoryconcurrency=3表示启动三个线程并发处理消息,这个时候,则会由ConcurrentKafkaListenerContainerFactory创建ConcurrentMessageListenerContainer

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);// 设置并发消费者数量factory.setConcurrency(3);// 设置手动提交factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);return factory;
}

在ConcurrentMessageListenerContainer中有一个集合,到时候会根据concurrency创建对应数量的KafkaMessageListenerContainer 子容器。

private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();

容器创建代码

AbstractKafkaListenerContainerFactory// 
public C createListenerContainer(KafkaListenerEndpoint endpoint) {C instance = createContainerInstance(endpoint);JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);if (endpoint instanceof AbstractKafkaListenerEndpoint) {configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);}endpoint.setupListenerContainer(instance, this.messageConverter);// 初始化容器的配置,endpoint中有静态的配置,比如topic信息、KafkaListener标记的方法、bane等,这里会将这些信息复制到容器中,还有initializeContainer(instance, endpoint);customizeContainer(instance);return instance;
}protected abstract C createContainerInstance(KafkaListenerEndpoint endpoint);ConcurrentKafkaListenerContainerFactory
// 调用子类的方法,这里是通过模板方法的设计模式,在抽象类中定义好整个流程,具体部分的实现由子类完成
protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();if (topicPartitions != null && topicPartitions.length > 0) {ContainerProperties properties = new ContainerProperties(topicPartitions);return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {Collection<String> topics = endpoint.getTopics();if (!topics.isEmpty()) { // NOSONARContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}else {ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern()); // NOSONARreturn new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);}}
}

启动容器,消费消息

前面我们提到容器创建好后有一个启动的过程,也就是这一行代码startIfNecessary(container);,会真正启动容器,进一步触发消费者线程(ListenerConsumer)的初始化并开始消息消费流程。

KafkaListenerEndpointRegistrarprivate void startIfNecessary(MessageListenerContainer listenerContainer) {if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {listenerContainer.start();}
}// 调用到AbstractMessageListenerContainer的start方法,
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {doStart();}}
}// 调用到ConcurrentMessageListenerContainer的doStart()方法,执行真正的启动逻辑
protected void doStart() {if (!isRunning()) {// 根据concurrency创建对应数量的子容器for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);configureChildContainer(i, container);if (isPaused()) {container.pause();}// 启动子容器container.start();// 保存到子容器列表this.containers.add(container);}}
}// 调用到KafkaMessageListenerContainer的doStart,启动子容器
protected void doStart() {// 创建消费者线程this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);// 阻塞等待消费者线程真正启动完成。this.startLatch = new CountDownLatch(1);// 提交到线程池,异步启动消费者线程。this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

消费消息的逻辑在ListenerConsumer中,该类实现了Runnable接口的run()方法,在run()方法中实现了拉取消息,并通过反射调用我们自定义的业务方法,进行消息处理等自定义逻辑。

ListenerConsumerpublic void run() {while (isRunning()) {try {// 从kafka拉取消息并通过反射调用业务方法pollAndInvoke();}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}
}protected void pollAndInvoke() {// 拉取消息ConsumerRecords<K, V> records = doPoll();// 通过反射调用到我们自定义的方法进行消息处理invokeIfHaveRecords(records);
}
http://www.xdnf.cn/news/19098.html

相关文章:

  • 【完整源码+数据集+部署教程】工地建筑进度监测系统源码和数据集:改进yolo11-SDI
  • 【WebRTC】从入门到忘记
  • pytest使用allure测试报告
  • 迁移学习实战:医疗影像识别快速突破方案
  • 【查看css技巧】hover或者其他方式触发出来的样式如何查看
  • npm使用的环境变量及其用法
  • Socket编程核心API与结构解析
  • Java-面试八股文-Mysql篇
  • 【C语言】深入理解指针(1)
  • 什么是策略模式?策略模式能带来什么?——策略模式深度解析:从概念本质到Java实战的全维度指南
  • 20250829_编写10.1.11.213MySQL8.0异地备份传输脚本+在服务器上创建cron任务+测试成功
  • 保护海外服务器免受黑客攻击的方法
  • KNN算法详解:从原理到实战(鸢尾花分类 手写数字识别)
  • 人工智能之数学基础:透过频率直方图理解概率密度函数
  • 【线性代数入门 | 那忘算8】洛谷P3389 高斯消元(内附行列式教学)
  • web3简介
  • 屏随人动+视觉魔方+多样主题+智能留言,涂鸦Wukong AI 2.0助力打造爆款带屏云台相机
  • DVWA靶场通关笔记-命令执行(Impossible级别)
  • 如何制作手感良好的移动算法?
  • 【视频讲解】R语言海七鳃鳗性别比分析:JAGS贝叶斯分层逻辑回归MCMC采样模型应用
  • GPT-Realtime架构与Token成本控制深度解析
  • 解析DB-GPT项目中三个 get_all_model_instances 方法的区别
  • 考研数据结构Part3——二叉树知识点总结
  • 大数据毕业设计选题推荐:基于北京市医保药品数据分析系统,Hadoop+Spark技术详解
  • useEffect用法
  • 将2D基础模型(如SAM/SAM2)生成的2D语义掩码通过几何一致性约束映射到3D高斯点云
  • 告别K8s部署繁琐!用KubeOperator可视化一键搭建生产级集群
  • 数据结构 02(线性:顺序表)
  • aggregating英文单词学习
  • 数字人 + 矩阵聚合系统源码搭建与定制化开发