kafka集群的安装与部署
kafka集群的安装与部署
- 1、在所有节点上安装kafka
- 2、配置 kafka
- 3、创建Systemd服务文件
- 4、启动kafka集群
- 5、验证集群状态
- 6、用守护进程启动
1、在所有节点上安装kafka
创建目录
mkdir -p /usr/local/packages
进入目录
cd /usr/local/packages
下载文件
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
解压
tar -zxvf kafka_2.13-3.8.0.tgz -C /usr/local
mv /usr/local/kafka_2.13-3.8.0 /usr/local/kafka
创建数据目录
mkdir -p /data/kafka/datalogs
2、配置 kafka
在每台节点上创建配置文件
vi /usr/local/kafka/config/server.properties
每个节点的ip不同
# 本配置文件适用于基于ZooKeeper的模式,需要Apache ZooKeeper支持############################# 服务器基础配置 ############################## 每个broker的唯一ID,必须设置为唯一的整数
broker.id=0############################# 套接字服务器设置 ############################## 套接字服务器监听的地址和端口
listeners=PLAINTEXT://192.168.3.18:9092# 向客户端广告的监听器名称、主机名和端口
advertised.listeners=PLAINTEXT://dict-gsszhjtysxxpt-018:9092# 服务器用于接收和发送网络请求的线程数
num.network.threads=3# 服务器用于处理请求(可能包括磁盘I/O)的线程数
num.io.threads=8# 套接字服务器使用的发送缓冲区大小(字节)
socket.send.buffer.bytes=102400# 套接字服务器使用的接收缓冲区大小(字节)
socket.receive.buffer.bytes=102400# 套接字服务器接受的最大请求大小(防止内存溢出)
socket.request.max.bytes=104857600############################# 日志基础配置 ############################## 存储日志文件的目录列表(逗号分隔)
log.dirs=/data/kafka/datalogs# 每个主题的默认分区数,更多分区可提高消费并行度
num.partitions=3# 每个数据目录用于日志恢复和刷新的线程数
num.recovery.threads.per.data.dir=1############################# 内部主题设置 ############################## 内部主题(如__consumer_offsets)的副本因子
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# 日志保留策略 ############################## 日志文件的最小保留时间(小时)
log.retention.hours=168# 单个日志段文件的最大大小(字节)
log.segment.bytes=1073741824# 检查日志是否可删除的时间间隔(毫秒)
log.retention.check.interval.ms=300000############################# ZooKeeper配置 ############################## ZooKeeper连接字符串(主机:端口列表,可指定根目录)
zookeeper.connect=dict-gsszhjtysxxpt-018:2181,dict-gsszhjtysxxpt-019:2181,dict-gsszhjtysxxpt-020:2181/kafka# 连接ZooKeeper的超时时间(毫秒)
zookeeper.connection.timeout.ms=18000############################# 组协调器设置 ############################## 初始消费者重平衡的延迟时间(毫秒)
group.initial.rebalance.delay.ms=0# 默认副本因子(生产环境建议设置为3)
default.replication.factor=3# 最小同步副本数(确保高可用性)
min.insync.replicas=2
3、创建Systemd服务文件
vi /etc/systemd/system/kafka.service
所有节点相同
[Unit]
Description=kafka Service
After=network.target zookeeper.service[Service]
Type=forking
User=root
Group=rootEnvironment=JAVA_HOME=/etc/java/jdk1.8.0_421
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=always
RestartSec=10s[Install]
WantedBy=multi-user.target
4、启动kafka集群
启动kafka集群之前要先启动zookeeper集群
在所有节点上执行:
#重新加载systemd配置
systemctl daemon-reload
#启用kafka服务
systemctl enable kafka
#停用kafka服务
systemctl stop kafka
#启动kafka服务
systemctl start kafka
#重启服务
systemctl restart kafka
#检查服务状态
systemctl status kafka
5、验证集群状态
创建测试Topic
创建复制因子为3的topic
/usr/local/kafka/bin/kafka-topics.sh --create \--topic test-topic \--bootstrap-server 192.168.3.18:9092,192.168.3.19:9092,192.168.3.20:9092 \--partitions 3 \--replication-factor 3
查看topic详情
/usr/local/kafka/bin/kafka-topics.sh --describe \--topic test-topic \--bootstrap-server 192.168.3.18:9092,192.168.3.19:9092,192.168.3.20:9092
6、用守护进程启动
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties