本节课课堂总结
1. Kafka基本概念
Kafka的介绍
Apache Kafka是分布式发布-订阅消息系统(消息中间件)。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
就像是一个放鸡蛋的篮子。生产者生产鸡蛋放到篮子中,消费者从篮子中消费鸡蛋。
消息队列的两种模式
1、 点对点模式(一对一,消费者主动拉取数据,消息收到后清除)。
2、 发布订阅模式(一对多,消费者消费数据之后不会删除,数据可以被多个消费者使用)。有两种消费方式,一种是消费者主动拉取操纵,好处是速度可以自己控制,坏处是要维护一个常轮询,不断询问队列是否有新数据产生;另一种是消息队列推送数据,消费者的消费能力不一样,没法根据不同的消费者提供不同的推送速度。
Kafka的角色
术语 | 解释 |
Broker | 安装了kafka的节点 |
Topic
| 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处),发送消息必须有主题。 |
Partition | Partition是物理上的概念,每个Topic包含一个或多个Partition. |
Producer | 负责发布消息到Kafka broker |
Consumer | 消息消费者,向Kafka broker读取消息的客户端 |
Consumer Group | 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group) 当consumer去消费数据的时候,会有一个偏移量(offset),一个分区的数据,一个consumer只能消费一次。 |
replica | partition 的副本,保障 partition 的高可用 |
leader | replica 中的一个角色, producer 和 consumer 只跟 leader 交互 |
follower | replica 中的一个角色,从 leader 中复制数据,数据备份,如果leader挂掉,顶替leader的工作 |
controller | Kafka 集群中的其中一个服务器,用来进行 leader election 以及各种 failover |
Kafka中存储的消息,被消费后不会被删除,可以被重复消费,消息会保留多长,由kafka自己去配置。默认7天删除。背后的管理工作由zookeeper来管理。
Kafka的特性
(1) 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
(2)可扩展性:kafka集群支持热扩展。
(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
(5)高并发:支持数千个客户端同时读写。
Kafka集群的安装
登录Kafka的官网http://kafka.apache.org/。
Kafka是Java和Scala语言开发的。所以安装Kafka之前必须要保证先安装JDK。
Kafka依赖于Zookeeper的选举机制,所以安装Kafka之前还要保证Zookeeper已经被安装好了。
下载安装包:
注意选择kafka的版本,以及scala的版本。
首先进到software目录当中,如下图所示:
然后将安装包上传上来
解压之后的效果如下图所示,并把原始kafka文件名重命名一下:
重命名之后的效果如下图所示:
解压过后的目录如下图所示:
上传安装包到集群中,并解压。
利用已有的3台机器:node01、node02、node03
修改配置:
·cd config
·vi server.properties
声明Kafka集群对应的一个编号,0、1、2分别代表node01、node02、node03。
所以此时不用改,一会分发给node02、node03的时候要把这个编号改过来。
0: node01
1: node02
2: node03
·需要修改的第二个地方是
修改为:
(去掉注释,把主机名称改了)
·第三个修改的地方为添加以下语句:
在这里添加:
·修改日志的存放路径:
(Kafkadata目录不需要提前创建,它可以自动创建好。)
在这里找这个路径:
·分区数量修改为3:
·Zookeeper集群中的端口号需要修改:
·全部修改完之后保存并退出:
全部修改语句如下所示(以node01为样例):
broker.id=0 从0 开始 ,0 1 2 delete.topic.enable=true //这条在文件中没有,手动添加,默认主题不允许删除 listeners=PLAINTEXT://node01:9092 log.dirs=/root/kafkadata // 数据存放的目录,会自动生成,不需要创建 num.partitions=3 zookeeper.connect=node01:2181,node02:2181,node03:2181 |
返回到software目录里面:
分发kafka的安装包,到其他的节点中:
scp -r kafka node02:$PWD scp -r kafka node03:$PWD |
在其他的节点上,修改broker.id 和 listeners中的主机名。
1.6启动kafka集群
启动脚本和停止脚本命令。
kafka-server-start.sh
kafka-server-stop.sh
以后台守护进程启动:
kafka-server-start.sh -daemon /opt/software/kafka/config/server.properties
注意: 在启动kafka之前,必须先启动zookeeper。
为了使用方便,可以配置环境变量。
Kafka.sh:一键启动和关闭kafka集群。
①添加一个kafka环境变量
②node02、node03也进行相同的配置
③进入到当前目录并把kafka.sh也上传进来
④修改一下权限,让它变成绿色的可执行脚本文件
⑤路径和主机名称修改为和自己一致的
⑥试一下一键启动
1.7Kafka常用的配置解释
#broker 的全局唯一编号,不能重复 broker.id=0 #删除 topic 功能使能 delete.topic.enable=true #处理网络请求的线程数量 num.network.threads=3 #用来处理磁盘 IO 的线程数量 num.io.threads=8 #发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 #接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 #请求套接字的缓冲区大小 socket.request.max.bytes=104857600 #kafka 运行日志存放的路径 log.dirs=/root/kafkadata #topic 在当前 broker 上的分区个数 num.partitions=3 #用来恢复和清理 data 下数据的线程数量 num.recovery.threads.per.data.dir=1 # segment 文件保留的最长时间,超时将被删除 log.retention.hours=168 #配置连接 Zookeeper 集群地址 zookeeper.connect=node01:2181,node02:2181,node03:2181
|