当前位置: 首页 > ai >正文

Docker部署kafka实操+Java中访问

Docker部署kafka实操+Java中访问

1.compose.yaml脚本结构

docker-compose.yml

version: '3.8'services:zookeeper:image: bitnami/zookeeper:3.8container_name: zookeeperports:- "2181:2181"environment:- ALLOW_ANONYMOUS_LOGIN=yesvolumes:- zookeeper_data:/bitnami/zookeeperkafka:image: bitnami/kafka:3.4container_name: kafkaports:- "9092:9092"environment:- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST_IP}:9092- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=truevolumes:- kafka_data:/bitnami/kafkadepends_on:- zookeepervolumes:zookeeper_data:driver: localkafka_data:driver: local

启用服务:ip需要指定成自己的

HOST_IP=192.168.17.17 docker compose up -d

启动成功之后会有两个服务:

2.kafka基本操作

1.创建主题:

docker exec -it kafka kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 3 \--topic aaa-topic

成功截图:

2.发消息

docker exec -it kafka kafka-console-producer.sh \--bootstrap-server localhost:9092 \--topic test-topic

测试截图:

3.消费消息

docker exec -it kafka kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic aaa-topic \--from-beginning

测试截图:

4.主题列表

docker exec -it kafka kafka-topics.sh --list \--bootstrap-server localhost:9092

测试截图:

3.Springboot中访问

依赖

        <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>

配置文件:

# kafka配置
kafka:servers: 192.168.17.17:9092topicName: test-topic

Java代码配置:

@Configuration
public class KafkaProducerConfig {@Value("${kafka.servers}")private String kafkaServers;@Beanpublic KafkaProducer<String, String> kafkaProducer() {Properties properties = new Properties();// 配置Kafka服务器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);// 配置序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 可选配置properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 数据可靠性配置properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量发送大小properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 延迟发送时间return new KafkaProducer<>(properties);}
}

测试controller:

@RestController
public class TestKafkaController {@Autowiredprivate KafkaMessageService kafkaMessageService;/*** 提供HTTP接口测试消息发送*/@GetMapping("/send-kafka")public String sendToKafka(String message) {kafkaMessageService.sendMessage(message);return "消息已发送: " + message;}
}

测试service

public interface KafkaMessageService {public void sendMessage(String message);
}@Service
public class KafkaMessageServiceImpl implements KafkaMessageService {@Resourceprivate KafkaProducer<String, String> kafkaProducer;@Value("${kafka.topicName:}")private String topicName;@Overridepublic void sendMessage(String message) {try {// 创建消息记录ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);// 发送消息kafkaProducer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功: " +"主题=" + metadata.topic() +", 分区=" + metadata.partition() +", 偏移量=" + metadata.offset());} else {System.err.println("消息发送失败: " + exception.getMessage());}});// 刷新生产者,确保消息被发送kafkaProducer.flush();} catch (Exception e) {System.err.println("发送消息时发生错误: " + e.getMessage());}}
}

请求http://127.0.0.1:8080/send-kafka?message=hello 即可发送成功

http://www.xdnf.cn/news/17585.html

相关文章:

  • 42.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--扩展功能--集成网关--网关集成认证(一)
  • 云计算概述
  • 【web站点安全开发】任务2:HTML5核心特性与元素详解
  • GitLab CI + Docker 自动构建前端项目并部署 — 完整流程文档
  • 跨界重构规则方法论
  • TCP服务器网络编程设计流程详解
  • Linux Ansible的安装与基本使用
  • Linux:企业级WEB应用服务器TOMCAT
  • 技术干货|Kafka 如何实现零停机迁移
  • Stereolabs ZED相机 选型指南:双目 / 单目、短距 / 长距,如何为机器人视觉系统匹配最优方案?
  • selenium常见的与浏览器版本不兼容闪退问题
  • 计算机网络2-2:物理层下面的传输媒体
  • 【Node.js从 0 到 1:入门实战与项目驱动】2.2 验证安装(`node -v`、`npm -v`命令使用)
  • 计算机视觉(4)-相机基础知识恶补
  • Flink Redis维表:Broadcast Join与Lookup Join对比及SQL示例
  • 从零部署Nacos:替代Eureka的服务注册与服务发现基础教程
  • 使用Excel制作甘特图
  • 无人机三维路径规划
  • Python科学计算与可视化领域工具TVTK、Mayavi、Mlab、Traits(附视频教程)
  • 【PyTorch学习笔记 - 02】 Datasets DataLoaders
  • 白板功能文档
  • 物联网、大数据与云计算持续发展,楼宇自控系统应用日益广泛
  • 在达梦数据库中使用group by 命令报错问题
  • uniapp常用组件
  • OpenBMC中C++单例模式架构与实现全解析
  • PySpark性能优化与多语言选型讨论
  • 13-docker的轻量级私有仓库之docker-registry
  • golang 基础案例_02
  • 使用Pytest进行接口自动化测试(三)
  • Docker-09.Docker基础-Dockerfile语法