Docker部署kafka实操+Java中访问
Docker部署kafka实操+Java中访问
1.compose.yaml脚本结构
docker-compose.yml
version: '3.8'services:zookeeper:image: bitnami/zookeeper:3.8container_name: zookeeperports:- "2181:2181"environment:- ALLOW_ANONYMOUS_LOGIN=yesvolumes:- zookeeper_data:/bitnami/zookeeperkafka:image: bitnami/kafka:3.4container_name: kafkaports:- "9092:9092"environment:- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST_IP}:9092- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=truevolumes:- kafka_data:/bitnami/kafkadepends_on:- zookeepervolumes:zookeeper_data:driver: localkafka_data:driver: local
启用服务:ip需要指定成自己的
HOST_IP=192.168.17.17 docker compose up -d
启动成功之后会有两个服务:
2.kafka基本操作
1.创建主题:
docker exec -it kafka kafka-topics.sh --create \--bootstrap-server localhost:9092 \--replication-factor 1 \--partitions 3 \--topic aaa-topic
成功截图:
2.发消息
docker exec -it kafka kafka-console-producer.sh \--bootstrap-server localhost:9092 \--topic test-topic
测试截图:
3.消费消息
docker exec -it kafka kafka-console-consumer.sh \--bootstrap-server localhost:9092 \--topic aaa-topic \--from-beginning
测试截图:
4.主题列表
docker exec -it kafka kafka-topics.sh --list \--bootstrap-server localhost:9092
测试截图:
3.Springboot中访问
依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.4.0</version></dependency>
配置文件:
# kafka配置
kafka:servers: 192.168.17.17:9092topicName: test-topic
Java代码配置:
@Configuration
public class KafkaProducerConfig {@Value("${kafka.servers}")private String kafkaServers;@Beanpublic KafkaProducer<String, String> kafkaProducer() {Properties properties = new Properties();// 配置Kafka服务器地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);// 配置序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 可选配置properties.put(ProducerConfig.ACKS_CONFIG, "all"); // 数据可靠性配置properties.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量发送大小properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 延迟发送时间return new KafkaProducer<>(properties);}
}
测试controller:
@RestController
public class TestKafkaController {@Autowiredprivate KafkaMessageService kafkaMessageService;/*** 提供HTTP接口测试消息发送*/@GetMapping("/send-kafka")public String sendToKafka(String message) {kafkaMessageService.sendMessage(message);return "消息已发送: " + message;}
}
测试service
public interface KafkaMessageService {public void sendMessage(String message);
}@Service
public class KafkaMessageServiceImpl implements KafkaMessageService {@Resourceprivate KafkaProducer<String, String> kafkaProducer;@Value("${kafka.topicName:}")private String topicName;@Overridepublic void sendMessage(String message) {try {// 创建消息记录ProducerRecord<String, String> record = new ProducerRecord<>(topicName, message);// 发送消息kafkaProducer.send(record, (metadata, exception) -> {if (exception == null) {System.out.println("消息发送成功: " +"主题=" + metadata.topic() +", 分区=" + metadata.partition() +", 偏移量=" + metadata.offset());} else {System.err.println("消息发送失败: " + exception.getMessage());}});// 刷新生产者,确保消息被发送kafkaProducer.flush();} catch (Exception e) {System.err.println("发送消息时发生错误: " + e.getMessage());}}
}
请求http://127.0.0.1:8080/send-kafka?message=hello 即可发送成功