当前位置: 首页 > news >正文

Kafka消息中间件安装配置

Kafka消息中间件安装配置

  • 1 docker安装zookeeper集群(可忽略)
    • 1.1 创建相关目录
    • 1.2 zk-docker-compose.yaml
    • 1.3 验证集群状态
    • 1.4 启动服务命令
  • ※2 docker安装kafka集群(基于KRaft模式)
    • 2.1 参数讲解
    • 2.2 创建相关目录
    • 2.3 docker-compose.yaml
    • 2.4 验证集群状态
    • 2.5 开放相关端口

1 docker安装zookeeper集群(可忽略)

zookeeper 集群中的每个节点都需要一个唯一的标识(myid 文件)和统一的集群服务器列表配置(zoo.cfg 中的 server.x 列表)。

  • ZOO_MY_ID:表示当前 zookeeper 实例在集群中的编号,范围为1-255,所以一个 zookeeper 集群最多有 255 个节点
  • ZOO_SERVERS:表示当前 zookeeper 实例所在集群中的所有节点的编号、主机名(或IP地址)、端口

zookeeper 集群需要的端口:

  • 2181:客户端连接端口。
  • 2888:zookeeper 集群中的节点,Follower 与 Leader 之间进行数据同步和消息传递的端口。
  • 3888:用于 Leader 选举的端口。

1.1 创建相关目录

mkdir -p /opt/soft/kafka_cluster/zk1/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk2/{data,logs}
mkdir -p /opt/soft/kafka_cluster/zk3/{data,logs}sudo chmod -R 777 /opt/soft/kafka_cluster/zk1
sudo chmod -R 777 /opt/soft/kafka_cluster/zk2
sudo chmod -R 777 /opt/soft/kafka_cluster/zk3

1.2 zk-docker-compose.yaml

version: '3.2'services:zk1:image: zookeeper:3.9.3container_name: zk1restart: alwaysports:- "2181:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 1ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=zk3:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk1/data:/data- ./zk1/logs:/datalognetworks:- zk-netzk2:image: zookeeper:3.9.3container_name: zk2restart: alwaysports:- "2182:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 2ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zk3:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk2/data:/data- ./zk2/logs:/datalognetworks:- zk-netzk3:image: zookeeper:3.9.3container_name: zk3restart: alwaysports:- "2183:2181"environment:ALLOW_ANONYMOUS_LOGIN: yes # 允许匿名连接,生产环境应配置认证ZOO_MY_ID: 3ZOO_SERVERS: server.1=zk1:2888:3888;2181 server.2=zk2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181volumes:- /etc/localtime:/etc/localtime- ./zk3/data:/data- ./zk3/logs:/datalognetworks:- zk-net# 给集群创建一个网络,名称自己随便定义,这里取名为 zk-net
networks:zk-net:driver: bridge

1.3 验证集群状态

集群启动后(大约需要等待一分钟进行选举),可以通过以下命令检查每个节点的状态:

# 进入节点1容器
docker exec -it zk1 /bin/bash# 在容器内执行状态检查命令
[root@node05 kafka_cluster]# docker exec -it zk1 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit[root@node05 kafka_cluster]# docker exec -it zk2 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
# exit[root@node05 kafka_cluster]# docker exec -it zk3 /bin/sh
# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

注意:对每个节点执行此操作,输出会显示该节点的模式是 Mode: leader 还是 Mode: follower。一个健康的集群应该有一个 Leader 和两个 Follower。

1.4 启动服务命令

# -f表示指定某个配置文件名   -d:表示后台启动
docker-compose up -d 
# 查看当前服务
docker ps

※2 docker安装kafka集群(基于KRaft模式)

Kafka 4.0 引入了 KRaft 模式(Kafka Raft Metadata Mode),它使 Kafka 集群不再依赖 ZooKeeper 进行元数据管理。KRaft 模式简化了 Kafka 部署和管理,不需要额外配置 ZooKeeper 服务,使得集群的配置和运维更加高效。

2.1 参数讲解

通用KRaft配置

  • KAFKA_ENABLE_KRAFT=yes(允许使用kraft,使用kraft模式)
  • KAFKA_CFG_PROCESS_ROLES=broker,controller(指定Kafka进程的角色,在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。)
  • KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER(指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。)
  • KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093(定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。)
  • KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT(将监听器名称映射到安全协议。)
  • KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID} (使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可)
  • KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093(定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。)
  • ALLOW_PLAINTEXT_LISTENER=yes(允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用)

Broker特定配置

  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9194(定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。)
  • KAFKA_BROKER_ID=1(broker.id,必须唯一)

可选:资源与性能

  • KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true(允许自动创建主题,生产环境建议关闭)
  • KAFKA_CFG_NODE_ID=1(node id 唯一并且和broker一致)

2.2 创建相关目录

mkdir -p /opt/soft/kafka_cluster/kafka1
mkdir -p /opt/soft/kafka_cluster/kafka2
mkdir -p /opt/soft/kafka_cluster/kafka3sudo chmod -R 777 /opt/soft/kafka_cluster/kafka1
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka2
sudo chmod -R 777 /opt/soft/kafka_cluster/kafka3

2.3 docker-compose.yaml

  1. 部署前准备
# 1. 编辑.env 文件 注意HOST替换为主机ip
CLUSTER_ID=y2OnVDLzRYyxh3V156gnxw
HOST=192.168.1.151
  1. 编写docker-compose.yaml文件
version: "3.2"
services:kafka1:image: "bitnami/kafka:3.8.0"container_name: kafka11restart: alwaysuser: rootports:- 9192:9092 # 外部客户端访问端口- 9193:9093 # Controller监听端口,内部通信environment:### 通用KRaft配置 #### 允许使用kraft,使用kraft模式- KAFKA_ENABLE_KRAFT=yes# kafka角色,同时做为broker和controller[指定Kafka进程的角色。在KRaft模式下,节点可以同时是broker和controller,或者是其中一种。]- KAFKA_CFG_PROCESS_ROLES=broker,controller# 指定供外部使用的控制类请求信息[指定用于控制器通信的监听器名称。]- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER# 定义Kafka监听的协议、接口和端口。内部通信和外部客户端访问的端口可以分开。- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093# 将监听器名称映射到安全协议。- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT# 使用Kafka时的集群id,集群内的Kafka都要用这个id做初始化,生成一个UUID即可- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}      # 定义参与控制器选举的所有投票者地址。格式为id@host:port,其中id是每个节点的BROKER_ID。- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093# 允许使用PLAINTEXT监听器,默认false,不建议在生产环境使用- ALLOW_PLAINTEXT_LISTENER=yes### Broker特定配置 #### 定义客户端如何连接到Kafka。通常设置为宿主机的IP或域名和映射后的端口。- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9192# broker.id,必须唯一- KAFKA_BROKER_ID=1### 可选:资源与性能 #### 允许自动创建主题,建议生产环境关闭- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true# node id 唯一并且和broker一致- KAFKA_CFG_NODE_ID=1volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka1:/bitnami/kafkanetworks:- kafka-netkafka2:image: "bitnami/kafka:3.8.0"container_name: kafka22restart: alwaysuser: rootports:- 9292:9092- 9293:9093environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9292- KAFKA_BROKER_ID=2- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true- KAFKA_CFG_NODE_ID=2volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka2:/bitnami/kafkanetworks:- kafka-netkafka3:image: "bitnami/kafka:3.8.0"container_name: kafka33restart: alwaysuser: rootports:- 9392:9092- 9393:9093environment:- KAFKA_ENABLE_KRAFT=yes- KAFKA_CFG_PROCESS_ROLES=broker,controller- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT- KAFKA_KRAFT_CLUSTER_ID=${CLUSTER_ID}- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093- ALLOW_PLAINTEXT_LISTENER=yes- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://${HOST}:9392- KAFKA_BROKER_ID=3- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true- KAFKA_CFG_NODE_ID=3volumes:- /etc/localtime:/etc/localtime:ro # 将外边时间直接挂载到容器内部,权限只读- /opt/soft/kafka_cluster/kafka3:/bitnami/kafkanetworks:- kafka-net# 可选:Kafka UI 管理工具kafka-ui:image: provectuslabs/kafka-ui:latestcontainer_name: kafka-uirestart: alwaysports:- "9080:8080"environment:- KAFKA_CLUSTERS_0_NAME=local-kraft-cluster- KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS=kafka1:9092,kafka2:9092,kafka3:9092# 如果Kafka配置了认证,需在此设置depends_on:- kafka1- kafka2- kafka3networks:- kafka-netnetworks:kafka-net:driver: bridgeipam:config:- subnet: 172.30.0.0/16

2.4 验证集群状态

  1. 进入其中一个容器,使用Kafka命令行工具验证集群是否正常运行。
docker exec -it kafka11 /bin/bash# 进入容器后,列出主题(此时应为空)
kafka-topics.sh --bootstrap-server localhost:9092 --list
  1. 创建一个测试主题
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 3 --replication-factor 3
  1. 查看主题详情,确认分区和副本分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test-topicTopic: test-topic       TopicId: 2ChsTY_IT1KiuC0AP8-25w PartitionCount: 3       ReplicationFactor: 3    Configs:Topic: test-topic       Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3      Elr:    LastKnownElr:Topic: test-topic       Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1      Elr:    LastKnownElr:Topic: test-topic       Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2      Elr:    LastKnownElr:
  1. 测试消息生产与消费
# 在一个终端运行生产者
docker exec -it kafka11 /bin/bashkafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

docker exec -it kafka11 /bin/sh# 在另一个终端运行消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

在生产端输入消息,应该在消费端能看到。
5. 访问Kafka UI(如果部署了):
打开浏览器访问 http://your-host-ip:9080,应该能看到Kafka UI界面,并可以查看集群、主题、消费者组等信息。

2.5 开放相关端口

# kafka1相关端口
firewall-cmd --zone=public --add-port=9192/tcp --permanent
firewall-cmd --zone=public --add-port=9193/tcp --permanent# kafka2相关端口
firewall-cmd --zone=public --add-port=9292/tcp --permanent
firewall-cmd --zone=public --add-port=9293/tcp --permanent## kafka3相关端口
firewall-cmd --zone=public --add-port=9392/tcp --permanent
firewall-cmd --zone=public --add-port=9393/tcp --permanentfirewall-cmd --zone=public --add-port=9080/tcp --permanent# 重启防火墙
firewall-cmd --reload# 查看开放的端口
firewall-cmd --list-port
http://www.xdnf.cn/news/1416871.html

相关文章:

  • 日语学习-日语知识点小记-构建基础-JLPT-N3阶段(23):文法+单词第7回5+考え方3
  • 【DeepSeek】蓝耘元生代 | 蓝耘MaaS平台与DeepSeek-V3.1重构智能应用开发
  • 【数据库】Sql Server数据库中isnull、iif、case when三种方式的使用和空值判断
  • 【重学MySQL】九十七、MySQL目录结构与文件系统解析
  • 2025年06月 Scratch 图形化(四级)真题解析#中国电子学会#全国青少年软件编程等级考试
  • Dify之插件开发之Crawl4ai 爬虫(简单逻辑实现)
  • 【XR技术概念科普】VST(视频透视)vs OST(光学透视):解码MR头显的两种核心技术路径
  • 高并发场景下的热点数据处理:从预热到多级缓存的性能优化实践
  • Java 双链表
  • 云市场周报 (2025.09.01):解读腾讯云向量数据库、阿里云西安节点与平台工程
  • 【Pycharm】Pychram软件工具栏Git和VCS切换
  • 【数据可视化-105】Pyecharts主题组件:让你的图表瞬间高大上
  • 飞牛nas修改crontab计划默认编辑器
  • leetcode-hot-100 (贪心算法)
  • 构建共享新生态的智慧物流开源了
  • TensorFlow 2.10 是最后一个支持在原生Windows上使用GPU的TensorFlow版本
  • TensorFlow深度学习实战(36)——自动机器学习(AutoML)
  • Golang之GoWorld深度解析:基于Go语言的分布式游戏服务器框架
  • 【最新版】Win11 24H2 正式版2025年8月版 Windows11的24H2全系列下载 官方原版光盘系统ISO文件下载
  • .net 微服务jeager链路跟踪
  • Java全栈开发工程师面试实战:从基础到微服务的完整技术演进
  • 嵌入式学习(day37) 数据库 Sqlite相关命令函数
  • Flutter 本地持久化存储:Hive 与 SharedPreferences 实战对比
  • 基于FPGA的多协议视频传输IP方案
  • Kubernetes 中根据 Pod IP 查找 Pod 及关联服务的方法
  • Fiddler抓包原理及教程(附带解决高版本Android抓包无网络问题)
  • 【Android】Span富文本简介
  • Python 爬虫案例:爬取豆瓣电影 Top250 数据
  • 华为云CCE
  • 【Flask】测试平台开发,实现全局邮件发送工具 第十二篇