当前位置: 首页 > news >正文

本节课课堂总结

1. Kafka基本概念

Kafka的介绍

Apache Kafka是分布式发布-订阅消息系统(消息中间件)。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

就像是一个放鸡蛋的篮子。生产者生产鸡蛋放到篮子中,消费者从篮子中消费鸡蛋。

消息队列的两种模式

1、 点对点模式(一对一,消费者主动拉取数据,消息收到后清除)。

2、 发布订阅模式(一对多,消费者消费数据之后不会删除,数据可以被多个消费者使用)。有两种消费方式,一种是消费者主动拉取操纵,好处是速度可以自己控制,坏处是要维护一个常轮询,不断询问队列是否有新数据产生;另一种是消息队列推送数据,消费者的消费能力不一样,没法根据不同的消费者提供不同的推送速度。

Kafka的角色

术语

解释

Broker

安装了kafka的节点

Topic

 

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处),发送消息必须有主题。

Partition

Partition是物理上的概念,每个Topic包含一个或多个Partition.

Producer

负责发布消息到Kafka broker

Consumer

消息消费者,向Kafka broker读取消息的客户端

Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

当consumer去消费数据的时候,会有一个偏移量(offset),一个分区的数据,一个consumer只能消费一次。

replica

partition 的副本,保障 partition 的高可用

leader

replica 中的一个角色, producer 和 consumer 只跟 leader 交互

follower

replica 中的一个角色,从 leader 中复制数据,数据备份,如果leader挂掉,顶替leader的工作

controller

Kafka 集群中的其中一个服务器,用来进行 leader election 以及各种 failover

 

Kafka中存储的消息,被消费后不会被删除,可以被重复消费,消息会保留多长,由kafka自己去配置。默认7天删除。背后的管理工作由zookeeper来管理。

Kafka的特性

(1) 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
​(2)可扩展性:kafka集群支持热扩展。
​(3)持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。
​(4)容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
​(5)高并发:支持数千个客户端同时读写。

Kafka集群的安装

登录Kafka的官网http://kafka.apache.org/。

Kafka是Java和Scala语言开发的。所以安装Kafka之前必须要保证先安装JDK。

Kafka依赖于Zookeeper的选举机制,所以安装Kafka之前还要保证Zookeeper已经被安装好了。

 

下载安装包:

注意选择kafka的版本,以及scala的版本。

首先进到software目录当中,如下图所示:

 

然后将安装包上传上来

解压之后的效果如下图所示,并把原始kafka文件名重命名一下:

重命名之后的效果如下图所示:

解压过后的目录如下图所示:

 

 

上传安装包到集群中,并解压。

利用已有的3台机器:node01、node02、node03

修改配置:

·cd config

·vi server.properties

声明Kafka集群对应的一个编号,0、1、2分别代表node01、node02、node03。

所以此时不用改,一会分发给node02、node03的时候要把这个编号改过来。

0: node01

1: node02

2: node03

·需要修改的第二个地方是

修改为:

(去掉注释,把主机名称改了)

·第三个修改的地方为添加以下语句:

在这里添加:

·修改日志的存放路径:

(Kafkadata目录不需要提前创建,它可以自动创建好。)

在这里找这个路径:

·分区数量修改为3:

·Zookeeper集群中的端口号需要修改:

·全部修改完之后保存并退出:

全部修改语句如下所示(以node01为样例):

broker.id=0    从0 开始 ,0  1  2

delete.topic.enable=true   //这条在文件中没有,手动添加,默认主题不允许删除

listeners=PLAINTEXT://node01:9092

log.dirs=/root/kafkadata    // 数据存放的目录,会自动生成,不需要创建

num.partitions=3

zookeeper.connect=node01:2181,node02:2181,node03:2181

 

返回到software目录里面:

分发kafka的安装包,到其他的节点中:

scp -r kafka node02:$PWD

scp -r kafka node03:$PWD

 

在其他的节点上,修改broker.id 和 listeners中的主机名。

1.6启动kafka集群

启动脚本和停止脚本命令。

kafka-server-start.sh

kafka-server-stop.sh

 

以后台守护进程启动:

kafka-server-start.sh -daemon /opt/software/kafka/config/server.properties

 

注意: 在启动kafka之前,必须先启动zookeeper。

 

为了使用方便,可以配置环境变量。

 

Kafka.sh:一键启动和关闭kafka集群。

①添加一个kafka环境变量

②node02、node03也进行相同的配置

③进入到当前目录并把kafka.sh也上传进来

④修改一下权限,让它变成绿色的可执行脚本文件

⑤路径和主机名称修改为和自己一致的

⑥试一下一键启动

1.7Kafka常用的配置解释

#broker 的全局唯一编号,不能重复

broker.id=0

#删除 topic 功能使能

delete.topic.enable=true

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘 IO 的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka 运行日志存放的路径

log.dirs=/root/kafkadata

#topic 在当前 broker 上的分区个数

num.partitions=3

#用来恢复和清理 data 下数据的线程数量

num.recovery.threads.per.data.dir=1

# segment 文件保留的最长时间,超时将被删除

log.retention.hours=168

#配置连接 Zookeeper 集群地址

zookeeper.connect=node01:2181,node02:2181,node03:2181

 

http://www.xdnf.cn/news/108739.html

相关文章:

  • kotlin与MVVM结合使用总结(一)
  • 按照文本每行匹配文件复制到指定位置
  • CONDA:用于 Co-Salient 目标检测的压缩深度关联学习(总结)
  • 开源 RAG 引擎:文档理解精准、检索高效、可视化干预灵活,一站式搞定
  • Kappa架构:简化大数据实时流处理的创新方案
  • 【Luogu】动态规划二
  • 2025.4.27机器学习笔记:文献阅读
  • 类和对象(中)
  • Spring AI 会话记忆(笔记)
  • 【3.2】pod详解—— Pod的相位(phase)状态(status)
  • Linux常用指令
  • 小刚说C语言刷题——1338求圆环的面积
  • C++二分法详解
  • el-table 目录树列表本地实现模糊查询
  • Linux部署Redis主从
  • 天梯-零头就抹了吧
  • 实操Obsidian+Ollama+deepseek构建本地知识库
  • C语言五子棋项目
  • [计算机科学#1]:计算机的前世今生,从算盘到IBM的演变之路
  • flex布局说明
  • 百万点数组下memset、memcpy与for循环效率对比及原理分析
  • 经典算法 小数点后的第n位
  • 语音合成之四基于LLM的语音合成
  • Sql刷题日志(day5)
  • JVM理解(通俗易懂)
  • 2025年渗透测试面试题总结-拷打题库14(题目+回答)
  • 时间自动填写——电子表格公式的遗憾(DeepSeek)
  • A13 自定义系统服务使用总结
  • Kafka集群
  • ABP-Book Store Application中文讲解 - Part 0:开发环境搭建