当前位置: 首页 > news >正文

centos7.9使用docker-compose安装kafka

docker-compose配置文件

services:zookeeper:image: confluentinc/cp-zookeeper:7.0.1hostname: zookeepercontainer_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.0.1hostname: kafkacontainer_name: kafkadepends_on:- zookeeperports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXTKAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_INTERNAL://:29092KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.85:9092,PLAINTEXT_INTERNAL://kafka:29092KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNALKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1kafka-manager:image: hlebalbau/kafka-manager:stablecontainer_name: kafka-managerdepends_on:- zookeeperports:- "9002:9000"environment:ZK_HOSTS: "zookeeper:2181"kAFKA_BROKERS: 192.168.1.85:9092KAFKA_MANAGER_AUTH_ENABLED: "false"

application.properties文件

spring.application.name=kafka_demo
# application.properties
spring.kafka.bootstrap-servers=192.168.1.85:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

生产者controller

import com.yykj.kafka_demo.service.KafkaProducerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaTestController {private final KafkaProducerService kafkaProducerService;public KafkaTestController(KafkaProducerService kafkaProducerService) {this.kafkaProducerService = kafkaProducerService;}@GetMapping("/send")public String sendMessageToKafka(@RequestParam(value = "topic", defaultValue = "test-topic") String topic,@RequestParam(value = "message", defaultValue = "Hello Kafka!") String message) {kafkaProducerService.sendMessage(topic, message);return "消息已发送: " + message + " 到主题: " + topic;}
}

生产者service

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private final KafkaTemplate<String, String> kafkaTemplate;public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}/*** 发送消息到指定主题* @param topic 主题名称* @param message 消息内容*/public void sendMessage(String topic, String message) {kafkaTemplate.send(topic, message).whenComplete((result, ex) -> {if (ex == null) {System.out.println("消息发送成功: " + message +", 分区: " + result.getRecordMetadata().partition() +", 偏移量: " + result.getRecordMetadata().offset());} else {System.err.println("消息发送失败: " + ex.getMessage());}});}
}

消费者监听

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaConsumerService {// 监听指定的主题,groupId用于区分不同的消费者组@KafkaListener(topics = "${kafka.topic:test-topic}", groupId = "${kafka.group-id:test-group}")public void consumeMessage(ConsumerRecord<String, String> record) {System.out.printf("收到消息 -> 主题: %s, 分区: %d, 偏移量: %d, 键: %s, 值: %s%n",record.topic(),record.partition(),record.offset(),record.key(),record.value());// 这里可以添加你的业务逻辑处理}
}

pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.5.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.yykj</groupId><artifactId>kafka_demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka_demo</name><description>kafka_demo</description><url/><licenses><license/></licenses><developers><developer/></developers><scm><connection/><developerConnection/><tag/><url/></scm><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- Spring Boot Starter for Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>3.3.6</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><annotationProcessorPaths><path><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></path></annotationProcessorPaths></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

http://www.xdnf.cn/news/649189.html

相关文章:

  • 2025LitCTF--Crypto--WriteUp
  • MathQ-Verify:数学问题验证的五步流水线,为大模型推理筑牢数据基石
  • 【深度学习】6. 卷积神经网络,CNN反向传播,感受野,池化变种,局部连接机制,可视化实例
  • Kafka|基础入门
  • LLM outputs.loss 返回什么
  • 零基础设计模式——结构型模式 - 桥接模式
  • 如何做好一份网络安全技术文档?
  • 在SpringBoot项目中策略模式的使用
  • Spring 核心配置文件(spring.xml)构建指南
  • Vue 核心技术与实战day04
  • anaconda环境变量+vscode汉化配置
  • Unity 3D AssetBundle加密解密教程
  • 【后端高阶面经:Elasticsearch篇】38、Elasticsearch 高可用架构解析:分片容灾、Translog 调优与双集群
  • UDP和TCP特征的详解
  • 鸿蒙OSUniApp 制作自定义的进度条组件#三方框架 #Uniapp
  • 上海市计算机学会竞赛平台2025年5月月赛丙组手机充电
  • TCP协议原理与Java编程实战:从连接建立到断开的完整解析
  • 计算机网络】深入解析 TCP 协议:从三次握手到拥塞控制
  • java高级 -动态代理
  • 华为云Flexus+DeepSeek征文 | DeepSeek-V3/R1商用服务开通体验全流程及使用评测
  • 项目部署一次记录
  • 第7章:Zephyr 的低功耗机制
  • 在 ElementUI 中实现 Table 单元格合并
  • 【Android】SharePreference原理
  • 【ARTS】【LeetCode-59】螺旋矩阵
  • 【HarmonyOS 5应用架构详解】深入理解应用程序包与多Module设计机制
  • 深度解析 8086 处理器:x86 架构的奠基者
  • 【后端高阶面经:架构篇】46、分布式架构:如何应对高并发的用户请求
  • 2025社区团购系统开发:未来趋势、核心技术与落地解决方案
  • Python - 文件部分