当前位置: 首页 > news >正文

《深入剖析Kafka分布式消息队列架构奥秘》之Springboot集成Kafka

🎼个人主页:【Y小夜】

😎作者简介:一位双非学校的大三学生,编程爱好者,

专注于基础和实战分享,欢迎私信咨询!

🎆入门专栏:🎇【MySQL,Javaweb,Rust,python】

🎈热门专栏:🎊【Springboot,Redis,Springsecurity,Docker,AI】 

感谢您的点赞、关注、评论、收藏、是对我最大的认可和支持!❤️

目录

​🎈为什么要使用集群?

​🎈Springboot集成Kafka

🎉引入依赖

🎉配置

🎉编写Kafka的配置类

🎉编写生产者配置类

🎉编写生产者业务

🎉编写消费者配置类

🎉编写消费者监听类

🎉报错: 


​🎈为什么要使用集群?

        其实在单机服务下,Kafka已经具备了非常高的性能,TPS能够达到百万级,但是实际工作中使用,单机搭建的Kafka有很大的局限性。

一方面:消息太多,需要分开放

另一方面:服务不稳定,数据容易丢失

大家集群这部分我的电脑上没有多个虚拟机,所以这部分,要是需要操作的话。可以看其他博主的文章。

​🎈Springboot集成Kafka

🎉引入依赖

这个依赖关系版本和下载的kafka有依赖关系,大家可以关注下。

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.1</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>

🎉配置

spring.kafka.bootstrap-servers=ip地址:9092
kafka.topics=test1

🎉编写Kafka的配置类

package com.yan.kafka;import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;@Configuration
public class KafkaConfig {@Value("${kafka.topics}")private String topic;@Beanpublic NewTopic topic(){return TopicBuilder.name(topic).build();}
}

🎉编写生产者配置类

package com.yan.kafka;import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;//发送配置类
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String service;public Map<String, Object>  config(){Map<String,Object> config=new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,service);//进行序列化config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return config;}@Beanpublic ProducerFactory<String,String> producerFactory(){return new DefaultKafkaProducerFactory<>(config());}@Beanpublic KafkaTemplate<String,String> kafkaTemplate(){return new KafkaTemplate<>(producerFactory());}
}

🎉编写生产者业务

package com.yan.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {@Autowiredprivate KafkaTemplate kafkaTemplate;@Value("${kafka.topics}")private String topic;@GetMapping("/kafka")public String sendMsg(){for (int i = 0; i < 10; i++) {kafkaTemplate.send(topic,"你好"+i);}return "success";}
}

🎉编写消费者配置类

package com.yan.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String service;public Map<String, Object> config(){Map<String,Object> config=new HashMap<>();config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,service);//进行反序列化config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class);return config;}@Beanpublic ConsumerFactory<String,String> consumerFactory(){return new DefaultKafkaConsumerFactory<>(config());}@Beanpublic ConcurrentKafkaListenerContainerFactory concurrentKafkaListenerContainerFactory(){ConcurrentKafkaListenerContainerFactory ckcf=new ConcurrentKafkaListenerContainerFactory();ckcf.setConsumerFactory(consumerFactory());return ckcf;}}

🎉编写消费者监听类

package com.yan.kafka;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaListeners {@KafkaListener(topics="${kafka.topics}",groupId = "aaa")void listener(String data){System.out.println("收到了"+data);}
}

然后,启动程序,访问一下localhost:8080/kafka,控制台会输出:

🎉报错: 

下面三种图是我做的时候出现的报错。我们可以对程序做一下检测

首先输入jps,看linux中kafka是否启动

如果没有启动,则需要输入命令进行启动。

然后看一下9092端口是否开放:

firewall-cmd --zone=public --list-ports

若没有开放,则使用命令,开放9092端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent
firewall-cmd --reload

接着打开 /config/server.properties 文件,

listeners=PLAINTEXT://localhost:9092

将localhost修改为kafka当前的操作系统的地址即可。

然后打开windows窗口,输入

telnet ip地址 9092

 

能进入的话,就说明可以连接到。

把这些修改完毕后,应该就可以顺利访问了。!!!

http://www.xdnf.cn/news/1408735.html

相关文章:

  • 中级统计师-统计实务-第四章 专业统计
  • 嵌入式ARM程序高级调试技能:20.qemu arm ARM Linux 上 addr2line 的实际应用示例
  • 【重学MySQL】九十五、Linux 下 MySQL 大小写规则设置详解
  • CF每日3题(1500-1600)
  • 阿里云创建自己的博客,部署wordpress
  • 基于Matlab元胞自动机的强场电离过程模拟与ADK模型分析
  • Scikit-learn Python机器学习 - 数据集的划分
  • 网格图--Day03--网格图DFS--2658. 网格图中鱼的最大数目,1034. 边界着色,1020. 飞地的数量
  • Cartographer中的gflag与lua文件
  • 【开题答辩全过程】以 基于Java的城市公交查询系统设计与实现为例,包含答辩的问题和答案
  • 记录测试环境hertzbeat压测cpu高,oom问题排查。jvm,mat,visulavm
  • 浏览器和 node 操作文件的 api 以及区别
  • GEE 实战:Landsat 5 月度 NDVI 数据插值填补(以 8 月为例)_后附完整代码
  • Python:如何批量下载CLMS NDVI V3数据集?
  • PyQt5 K线图实现与性能优化详解
  • 神州数码之FTP/TFTP 升级 篇
  • 深入解析Linux系统中的/etc/hosts文件
  • 在Windows的wsl中如何以root登录Ubuntu
  • OpenStack 02:使用 DevStack 单节点一体化部署
  • Kafka面试精讲 Day 3:Producer生产者原理与配置
  • Java提供高效后端支撑,Vue呈现直观交互界面,共同打造的MES管理系统,含完整可运行源码,实现生产计划、执行、追溯一站式管理,提升制造执行效率
  • isp图像处理--bayer Binning
  • isp 图像处理--DPC坏点矫正
  • 张柏芝亮相林家谦演唱会 再次演绎《任何天气》
  • 秋招笔记-8.31
  • 【ACP】2025-最新-疑难题解析- 练习一汇总
  • 矩阵待办ios app Tech Support
  • 【机器学习】-torch相关知识01
  • IO_hw_8.29
  • 8.31【A】scons,带宽,语义semantic,读论文颜色规范,系统运行命令