当前位置: 首页 > java >正文

springboot 整合spring-kafka客户端:SASL_SSL+PLAINTEXT方式

有关kafka的部署可以参考前面的文章:Kafka KRaft + SSL _SASL/PLAIN 部署文档

先确定最新版kafka_2.13-4.0.0的配:server.properties

# 节点角色配置
process.roles=broker,controller
node.id=1# 监听器配置
listeners=PLAINTEXT://:9092,CONTROLLER://:9093,SSL://:9094,SASL_SSL://:9095
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9094,SASL_SSL://localhost:9095
controller.listener.names=CONTROLLER
inter.broker.listener.name=SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL# 控制器配置
controller.quorum.voters=1@localhost:9093# 日志配置
log.dirs=D:/kafka_2.13-4.0.0/logs# 网络配置
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600# 日志保留策略
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000# 分区配置
num.partitions=1
default.replication.factor=1
offsets.topic.replication.factor=1# 元数据配置
metadata.log.dir=D:/kafka-logs
metadata.log.segment.bytes=1073741824
metadata.max.retention.bytes=-1
metadata.max.retention.ms=604800000# 安全配置
# security.inter.broker.protocol=SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true# SSL 配置 D:\kafka_2.13-4.0.0\config\ssl\kafka.server.keystore.jks
ssl.keystore.location=D:/kafka_2.13-4.0.0/config/ssl/kafka.server.keystore.jks
ssl.keystore.password=kafka123
ssl.key.password=kafka123
ssl.truststore.location=D:/kafka_2.13-4.0.0/config/ssl/kafka.server.truststore.jks
ssl.truststore.password=kafka123
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.endpoint.identification.algorithm=HTTPS
ssl.secure.random.implementation=SHA1PRNG

以及:jaas.conf

确保kafka正常启动

可以使用以下命令查看kafka中的topic,向某个topic发送消息,查看kafka中的消息:

kafka-topics.bat --bootstrap-server localhost:9092 --list
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic demo-topic
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo-topic --from-beginning

然后引入spring-kafka相关依赖包:

        <!-- Spring Kafka Starter --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><!-- Apache Kafka Clients (可选) --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version> <!-- 使用你需要的版本 --></dependency>

接着在项目配置文件中加入如下配置:

# Kafka configuration PLAINTEXT
#spring.kafka.bootstrap-servers=localhost:9092
#
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.auto-offset-reset=earliest
#spring.kafka.consumer.group-id=my-group# Kafka configuration SASL_SSL
spring.kafka.bootstrap-servers=localhost:9095spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=my-group

重要的一步,还需要将kafa证书导入到java 的信任库

首先cd 到kafka目录:D:\kafka_2.13-4.0.0\config\ssl
使用命令查看truststore.jks 文件中的证书别名:

 keytool -list -keystore kafka.server.truststore.jks -storepass kafka123

在从truststore.jks文件中导出证书:

keytool -exportcert -alias kafka -keystore kafka.server.truststore.jks -storepass kafka123 -file kafka.crt

最后使用命令将证书导入信任库:这一步 需要以管理员的身份运行cmd

keytool -importcert -alias kafka-server -file kafka.crt -cacerts -storepass changeit

到这里所有的配置就完成了

下面我们在springboot项目中分别加入一个生产者和消费者测试发送和消费消息:

@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message);System.out.println("Sent message: " + message);}
}
@Service
public class KafkaConsumerService {@KafkaListener(topics = "demo-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}

通过请求发送:

@RestController
public class KafkaController {@AutowiredKafkaProducerService kafkaProducerService;@GetMapping("/kafka")public Map<String, String> kafka(@RequestParam String message) {kafkaProducerService.sendMessage("demo-topic", message != null ? message : "Hello from Spring Boot Kafka!");return Map.of("message", "Sent");}
}

发送消息:

成功接收消费到消息:

http://www.xdnf.cn/news/15591.html

相关文章:

  • LeetCode20
  • 边界路由器
  • Baumer工业相机堡盟工业相机如何通过YoloV8模型实现人物识别(C#)
  • 如何做好DNA-SIP?
  • Redis完全指南:从基础到实战(含缓存问题、布隆过滤器、持久化及Spring Boot集成)
  • 数据结构 栈(2)--栈的实现
  • 4.PCL点云的数据结构
  • 「Chrome 开发环境快速屏蔽 CORS 跨域限制详细教程」*
  • springboot跨域问题 和 401
  • 人工智能基础知识笔记十四:文本转换成向量
  • Android 实现:当后台数据限制开启时,仅限制互联网APN。
  • 什么是“数据闭环”
  • Docker-Beta?ollama的完美替代品
  • MySQL高可用集群架构:主从复制、MGR与读写分离实战
  • TDengine 的可视化数据库操作工具 taosExplorer(安装包自带)
  • VMware Workstation Pro 17下载安装
  • VR全景园区:开启智慧园区新时代
  • 基于C#+SQlite开发(WinForm)个人日程管理系统
  • 【leetcode】852. 山脉数组的封顶索引
  • 树莓派Qt 安装
  • CDSS系统升级“可视化解释-智能反馈-临床语言“三位一体设计架构设计分析
  • nginx代理websocket请求
  • 【华为】交换机vlan互访实验
  • 语雀编辑器内双击回车插入当前时间js脚本
  • 取消office word中的段落箭头标记
  • Java零基础快速入门
  • Vue3入门-组件及组件化
  • Kafka——无消息丢失配置怎么实现?
  • SpringMVC核心注解:@RequestMapping详解
  • java-字符串