kafka的部署
目录
一、kafka简介
1.1、概述
1.2、消息系统介绍
1.3、点对点消息传递模式
1.4、发布-订阅消息传递模式
二、kafka术语解释
2.1、结构概述
2.2、broker
2.3、topic
2.4、producer
2.5、consumer
2.6、consumer group
2.7、leader
2.8、follower
2.9、partition
2.10、offset
2.11、replica
2.12、message
2.13、zookeeper
三、kafka架构
四、kafka的部署
4.1、软件下载
4.1.1、jdk的安装
4.1.2、zookeeper安装
4.1.3、kafka的安装
4.2、单机模式
4.3、集群部署
4.3.1、针对每一个节点的hosts文件添加节点的ip映射信息
4.3.2、时间同步
4.3.3、zookeeper配置
4.3.4、创建对应的服务id
4.3.5、zoo.cfg参数解析
4.3.6、集群kafka配置
一、kafka简介
1.1、概述
kafka是由linkedin公司开发,是一个分布式、分区、多副本、多生产者、多消费者,基于zookeeper的分布式 日志系统(也可以作为MQ系统),常见可以用于web/nginx日志、访问日志、消息服务等,Linkedin2010年将项目贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息详细。
设计目标如下:
1. 一时间复杂度为O(1)的方式提供消息持久能力,即使对TB级以上的数据也能保证常数时间的访问性能。
2. 高吞吐率:即使在非常廉价的商用机器上也能做到单机支持每秒100k条消息的传输。
3. 支持Kafka Server间的消息分布,以及分布式消费,同时保证每个partition内的消息顺序传输。
4. 同时支持离线数据和实时数据处理。
5. Scale out:支持在线水平扩展。
1.2、消息系统介绍
一个消息系统负责将数据从一个应用传递到另外一个应用,应用只需要关注数据,无需要关系数据再两个或者 多个应用间是如何传递的。分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消 息,有两种主要的消息传递模式:点对点传递模式、发布-订阅模式。大部分的消息系统选用发布-订阅模式。
kafka无疑也是一种消息订阅模式的系统。
1.3、点对点消息传递模式
在点对点消息系统中,消息持久化到一个队列中。此时,将有一个或多个消费队列中的数据。但是一条消息只 能被消费一次,当一个消费者消费了队列中的某条数据之后,该条数据则从消息队列中删除。该模式及时有多 个消费者同时消费数据,也能保证数据处理的顺序,架构示意图如下

1.4、发布-订阅消息传递模式
在该模式中,消息呗持久化到一个topic中。与点对点消息系统不同的是,消费者可以订阅一个或者多个 topic,消费者可以消费topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在该模式下,消息的生产者称为发布者,消费者称为订阅这,架构示意图如下:

二、kafka术语解释
2.1、结构概述
上图中一个topic配置了3个partition。Partition1有两个offset:0和1。Partition2有4个offset。Partition3有1个offset。副本的id和副本所在的机器的id恰好相同。
如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producer和consumer可同时生产和消费数据。
2.2、broker
一台Kafka服务器就是一个Broker,一个集群由多个Broker组成,一个Broker可以容纳多个Topic,Broker和Broker之间没有Master和Standy的概念,他们之间的地位基本是平等的。
Kafka集群包含一个或者多个服务器,服务器节点成为broker。
broker存储topic的数据,如果某topic有N个partion,集群有N个broker。
broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
2.3、topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名。
2.4、producer
topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个 segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
2.5、consumer
消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
2.6、consumer group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定groupname则属于默认的group)。
2.7、leader
每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
2.8、follower
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
2.9、partition
为了实现可扩展性,一个非常大的Topic可以被分为多个Partion,从而分布到多台Broker上。Partion中的每条消息都会被分配一个自增Id(Offset)。Kafka只保证按一个Partion中的顺序将消息发送给消费者,但是不保证单个Topic中的多个Partion之间的顺序。
2.10、offset
消息在Topic的Partion中的位置,同一个Partion中的消息随着消息的写入,其对应的Offset也自增,结构图如下:

2.11、replica
副本。Topic的Partion含有N个replica,N为副本因子。其中一个Replica为Leader,其他都为Follower,Leader处理Partition的所有读写请求,与此同时,Follower会定期去同步Leader上的数据。
2.12、message
通讯的基本单位,消息
2.13、zookeeper
存放Kafka集群相关元数据的组件。在ZK集群中会保存Topic的状态消息,例如分区的个数,分区的组成,分区的分布情况等;保存Broker的状态消息;报错消费者的消息等。通过这些消息,Kafka很好的将消息生产,消息存储,消息消费的过程结合起来。
三、kafka架构
在Kafka集群中生产者将消息发送给以Topic命名的消息队列Queue中,消费者订阅发往以某个Topic命名的消息队列Queue中的消息。其中Kafka集群由若干个Broker组成,Topic由若干个Partition组成,每个Partition里面的消息通过Offset来获取。

一个典型的Kafka集群中包含若干个Producer(可以是某个模块下发的Command,或者是Web前端产生的 PageView,或者是服务器日志,系统CPU,Memor等),若干个Broker(Kafka集群支持水平扩展,一般Broker数量越多,整个Kafka集群的吞吐率也就越高),若干个ConsumerGroup, 以及一个Zookeeper集群。Kafka通过zookeeper管理集群配置。Producer使用Push模式将消息发不到Broker上,consumer使用Pull模式从Broker上订阅并消费消息。

四、kafka的部署
4.1、软件下载
无论单机部署还是集群,这一步都不能省
4.1.1、jdk的安装
由于带GUI界面的安装,是自带jdk版本的,我们可以选择使用默认jdk
自带JDK,这种JDK可以使用java -version检查,如果使用javac就不行了,所以进行安装sudo yum install java-1.8.0-openjdk-devel -y
4.1.2、zookeeper安装
Apache ZooKeeper
选择3.5.7版本
上传服务器,安装
解压
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin zookeeper3.5.7
mv zookeeper3.5.7/ /opt创建软链接
ln -s /opt/zookeeper3.5.7/ /opt/zookeeper配置环境变量
vim /etc/profile添加
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/binsource /etc/profile将Zookeeper提供的配置文件复制一份,复制成Zookeeper默认寻找的文件
cd /opt/zookeeper/conf
ls
cp zoo_sample.cfg zoo.cfg
cd ..创建数据存放目录
mkdir data
chmod 755 /opt/zookeeper/data修改数据存放位置
cd conf/
vim zoo.cfg##修改以下配置
dataDir=/opt/zookeeper/data启动 Zookeeper,Zookeeper的bin目录下
cd ..
./bin/zkServer.sh start zoo.cfg
检测zookeeper是否正常
jps # 看到控制台成功输出 QuorumPeerMain,表示启动成功./bin/zkServer.sh status zoo.cfg ## Mode: standalone表示ok
4.1.3、kafka的安装
https://kafka.apache.org/downloads
选择 kafka_2.12-3.8.0.tgz
进行下载,Scala 2.12 和 Scala 2.13 主要是使用Scala编译的版本不同,两者皆可
上传服务器,安装
解压
tar -zxvf kafka_2.12-2.7.0.tgz
mv kafka_2.12-2.7.0 /opt
cd /opt创建软链接
ln -s /opt/kafka_2.12-2.7.0/ /opt/kafka
ls配置环境变量
vim /etc/profile添加
export KAFKA_HOME=/opt/kafka
export PATH=:$PATH:${KAFKA_HOME}source /etc/profile
4.2、单机模式
在Kafka的config目录下存在相关的配置信息——本次我们只想让Kafka快速启动起来只关注server.properties文件即可cd ${KAFKA_HOME}/config
ls
#connect-console-sink.properties connect-file-source.properties consumer.properties server.properties
#connect-console-source.properties connect-log4j.properties kraft tools-log4j.properties
#connect-distributed.properties connect-mirror-maker.properties log4j.properties trogdor.conf
#connect-file-sink.properties connect-standalone.properties producer.properties zookeeper.properties打开配置文件,并主要注意以下几个配置
vim server.propertiesbroker.id=0 #kafka服务节点的唯一标识,这里是单机不用修改
# listeners = PLAINTEXT://host1:9092 别忘了设置成自己的主机名
listeners=PLAINTEXT://host1:9092 #kafka底层监听的服务地址,注意是使用主机名,不是ip。
# log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。
log.dirs=/opt/kafka/data ##kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录)log.retention.hours=168 #日志的留存策略,默认168小时也就是一周
# zookeeper 的连接地址 ,别忘了设置成自己的主机名,单机情况下可以使用 localhost
zookeeper.connect=host1:2181
启动kafka
./bin/kafka-server-start.sh -daemon config/server.properties #后台启动kafka使用 jps 查看是否成功启动kafka
jps
34843 QuorumPeerMain
21756 Jps
116076 Kafka
4.3、集群部署
4.3.1、针对每一个节点的hosts文件添加节点的ip映射信息
vim /etc/hosts
192.168.157.80 host1
192.168.157.81 host2
192.168.157.82 host3
4.3.2、时间同步
yum install ntp -y
ntpdate cn.pool.ntp.org | ntp[1-7].aliyun.com #两个时钟同步地址选择一个就行
4.3.3、zookeeper配置
vim /opt/zookeeper/conf/zoo.cfg
##额外添加以下配置
server.1=host1:2888:3888 #数据同步端口:领导选举时服务器监听的端口
server.2=host2:2888:3888
server.3=host3:2888:3888
4.3.4、创建对应的服务id
# host1
echo 1 > /opt/zookeeper/data/myid #在这个文件中写入自己服务的id号
# host2
echo 2 > /opt/zookeeper/data/myid
# host3
echo 3 > /opt/zookeeper/data/myid
4.3.5、zoo.cfg参数解析
tickTime=2000: 通信心跳数,用于设置Zookeeper服务器与客户端之间的心跳时间间隔,单位是毫秒。这个时间间隔是Zookeeper使用的基本时间单位,用于服务器之间或客户端与服务器之间维持心跳的时间间隔。initLimit=10: LF初始通信时限,用于设置集群中的Follower跟随者服务器与Leader领导者服务器之间启动时能容忍的最多心跳数。如果在这个时限内(10个心跳时间)领导和根随者没有发出心跳通信,就视为失效的连接,领导和根随者彻底断开。syncLimit=5: LF同步通信时限,用于设置集群启动后,Leader与Follower之间的最大响应时间单位。假如响应超过这个时间(syncLimit * tick Time -> 10秒),Leader就认为Follower已经死掉,会将Follower从服务器列表中删除。dataDir: 数据文件目录+数据持久化路径,主要用于保存Zookeeper中的数据。dataLogDir: 日志文件目录,用于存储Zookeeper的日志文件。clientPort=2181: 客户端连接端口,用于监听客户端连接的端口
4.3.6、集群kafka配置
server.properties
配置文件
cd ${KAFKA_HOME}/config
vim server.propertiesbroker.id=0 #kafka服务节点的唯一标识
# listeners = PLAINTEXT://your.host.name:9092 别忘了设置成自己的主机名
listeners=PLAINTEXT://host1:9092 #集群中需要设置成每个节点自己的
# log.dirs 指定的目录 kafka启动时可以自动创建,因此不要忘了让kafka可以有读写这个目录的权限。
log.dirs=/opt/kafka/data ##kafka的分区以日志的形式存储在集群中(其实就是broker数据存储的目录)
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 #日志的留存策略,默认168小时也就是一周
# zookeeper 集群的连接地址
zookeeper.connect=host1:2181,host2:2181,host3:2181
其余配置:
##修改差异配置
cd ${KAFKA_HOME}/config
vim server.properties# host2节点
broker.id=1
listeners=PLAINTEXT://host2:9092
# host3节点
broker.id=2
listeners=PLAINTEXT://host3:9092
kafka集群即可正常启动
kafka其余命令./bin/kafka-server-stop.sh #关闭kafka
kafka-console-consumer.sh #消费命令
kafka-console-producer.sh #生产命令
kafka-consumer-groups.sh #查看消费者组,重置消费位点等
kafka-topics.sh #查询topic状态,新建,删除,扩容
kafka-acls.sh #配置,查看kafka集群鉴权信息
kafka-configs.sh #查看,修改kafka配置
kafka-mirror-maker.sh #kafka集群间同步命令
kafka-preferred-replica-election.sh #重新选举topic分区leader
kafka-producer-perf-test.sh #kafka自带生产性能测试命令
kafka-reassign-partitions.sh #kafka数据重平衡命令
kafka-run-class.sh #kafka执行脚本