当前位置: 首页 > ai >正文

K8S部署ELK(二):部署Kafka消息队列

目录

1. Kafka 简介

1.1 Kafka 核心概念

(1)消息系统 vs. 流处理平台

(2)核心组件

1.2 Kafka 核心特性

(1)高吞吐 & 低延迟

(2)持久化存储

(3)分布式 & 高可用

(4)水平扩展

(5)流处理能力

1.3 Kafka 典型应用场景

1.4 Kafka 架构示例

数据流示例(订单处理)

1.5 Kafka vs 其他消息队列

2. kafka部署

2.1 创建Namespace

2.2 创建ConfigMap

2.3 创建Headless Service

2.4 创建Statefulset

2.5 部署所有资源

2.6 检查kafka Pod状态


1. Kafka 简介

Apache Kafka 是一个 分布式流处理平台,主要用于构建 高吞吐量、低延迟、可扩展 的实时数据管道和流式应用程序。它最初由 LinkedIn 开发,后成为 Apache 顶级开源项目,广泛应用于大数据、日志聚合、事件驱动架构等领域。


1.1 Kafka 核心概念

(1)消息系统 vs. 流处理平台

  • 传统消息队列(如 RabbitMQ):主要用于解耦生产者和消费者,保证消息可靠传递。

  • Kafka

    • 不仅是一个消息队列,还是一个 分布式流存储系统,支持持久化存储和流式计算。

    • 适用于 高吞吐、大规模数据流 场景(如日志、指标、事件数据)。

(2)核心组件

组件说明
Producer(生产者)向 Kafka 发送消息(如日志、交易数据)。
Consumer(消费者)从 Kafka 读取并处理消息。
Broker(代理)Kafka 服务器,负责存储和转发消息。
Topic(主题)消息的分类(类似数据库表),如 orderslogs
Partition(分区)每个 Topic 可分成多个 Partition,提高并行处理能力。
Offset(偏移量)每条消息在 Partition 中的唯一 ID(类似数据库主键)。
Consumer Group(消费者组)多个消费者共同消费一个 Topic,实现负载均衡。
ZooKeeper管理 Kafka 集群元数据(新版本 Kafka 已逐步移除依赖)。

1.2 Kafka 核心特性

(1)高吞吐 & 低延迟

  • 支持每秒百万级消息处理(取决于硬件配置)。

  • 采用 顺序 I/O(相比随机 I/O 更快)和 零拷贝 技术优化性能。

(2)持久化存储

  • 消息默认持久化到磁盘(可配置保留时间),支持 重放(replay) 数据。

  • 适用于 事件溯源(Event Sourcing)审计日志

(3)分布式 & 高可用

  • 支持 多副本(Replication),防止数据丢失。

  • 自动故障转移(Leader/Follower 机制)。

(4)水平扩展

  • 可动态增加 Broker 和 Partition,提升吞吐量。

(5)流处理能力

  • 配合 Kafka StreamsksqlDB 可实现实时流计算(如聚合、窗口计算)。


1.3 Kafka 典型应用场景

场景说明
日志聚合收集应用日志(替代 ELK 中的 Logstash)。
消息队列解耦微服务,如订单系统 → 库存系统。
实时数据处理结合 Flink/Spark Streaming 做实时分析。
事件驱动架构如用户行为追踪、IoT 设备数据采集。
Commit Log(提交日志)数据库变更捕获(CDC),如 Debezium + Kafka。

1.4 Kafka 架构示例

生产者(Producer) → Kafka Cluster(Broker1, Broker2...)↓
消费者(Consumer Group)→ 实时处理(Flink/Spark)↓存储(HDFS/DB)

数据流示例(订单处理)

  1. 订单服务(Producer)发送消息到 orders Topic。

  2. 库存服务(Consumer)读取 orders 消息,扣减库存。

  3. 分析服务(Consumer)统计实时销售额。


1.5 Kafka vs 其他消息队列

特性KafkaRabbitMQPulsar
吞吐量⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
延迟⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
持久化支持(磁盘)可选(内存/磁盘)支持
流处理原生支持(Kafka Streams)不支持支持(Pulsar Functions)
适用场景大数据、日志任务队列、RPC多租户、云原生

适用 Kafka 的场景

  • 需要高吞吐、持久化存储的实时数据流(如日志、事件)。

  • 流处理(如实时分析、监控)。

不适用 Kafka 的场景

  • 需要复杂路由(RabbitMQ 更合适)。

  • 低延迟任务队列(Redis Streams/RabbitMQ 更好)。

Kafka 已成为现代数据架构的核心组件,广泛应用于大数据、微服务、实时计算等领域。

2. kafka部署

2.1 创建Namespace

kubectl create namespace elk

2.2 创建ConfigMap

vim kafka-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: ldc-kafka-scriptsnamespace: elk
data:setup.sh: |-  #启动脚本#!/bin/bashexport KAFKA_CFG_NODE_ID=${MY_POD_NAME##*-} exec /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh

2.3 创建Headless Service

vim kafka-headless.yaml
apiVersion: v1
kind: Service
metadata:name: kafka-headlessnamespace: elk
spec:clusterIP: Noneselector:app: kafkaports:- name: brokerport: 9092- name: controllerport: 9093

2.4 创建Statefulset

vim kafka-statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafkanamespace: elklabels:app: kafka
spec:selector:matchLabels:app: kafkaserviceName: kafka-headlesspodManagementPolicy: Parallelreplicas: 1  #根据资源情况设置实例数,推荐3个副本updateStrategy:type: RollingUpdatetemplate:metadata:labels:app: kafkaspec:affinity:nodeAffinity:  #这里做了节点亲和性调度到master节点requiredDuringSchedulingIgnoredDuringExecution:nodeSelectorTerms:- matchExpressions:- key: node-role.kubernetes.io/control-planeoperator: Exists#values:#- mastertolerations:- key: "node-role.kubernetes.io/control-plane"operator: "Exists"effect: "NoSchedule"containers:- name: kafkaimage: swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/bitnami/kafka:3.4.0imagePullPolicy: "IfNotPresent"command:- /opt/leaderchain/setup.shenv:- name: BITNAMI_DEBUGvalue: "true" #详细日志# KRaft settings - name: MY_POD_NAME # 用于生成KAFKA_CFG_NODE_IDvalueFrom:fieldRef:fieldPath: metadata.name            - name: KAFKA_CFG_PROCESS_ROLESvalue: "controller,broker"- name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERSvalue: "0@kafka-0.kafka-headless:9093"  #修改实例数时要更新- name: KAFKA_KRAFT_CLUSTER_IDvalue: "Jc7hwCMorEyPprSI1Iw4sW"  # Listeners            - name: KAFKA_CFG_LISTENERSvalue: "PLAINTEXT://:9092,CONTROLLER://:9093"- name: KAFKA_CFG_ADVERTISED_LISTENERSvalue: "PLAINTEXT://:9092"- name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPvalue: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"- name: KAFKA_CFG_CONTROLLER_LISTENER_NAMESvalue: "CONTROLLER"- name: KAFKA_CFG_INTER_BROKER_LISTENER_NAMEvalue: "PLAINTEXT"- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"ports:- containerPort: 9092name: broker- containerPort: 9093name: controllerprotocol: TCP                     volumeMounts:- mountPath: /bitnami/kafkaname: kafka-data- mountPath: /opt/leaderchain/setup.shname: scriptssubPath: setup.shreadOnly: true      securityContext:fsGroup: 1001runAsUser: 1001volumes:    - configMap:defaultMode: 493name: ldc-kafka-scripts  #ConfigMap的名字name: scripts                   volumeClaimTemplates:- apiVersion: v1kind: PersistentVolumeClaimmetadata:name: kafka-dataspec:accessModes: [ "ReadWriteOnce" ] storageClassName: nfs-client  #存储类的名称resources:requests:storage: 1Gi

2.5 部署所有资源

[root@master1 Kafka]# ls
kafka-configmap.yaml  kafka-headless.yaml  kafka-statefulset.yaml
[root@master1 Kafka]# kubectl apply -f ./
configmap/ldc-kafka-scripts created
service/kafka-headless created
statefulset.apps/kafka created

2.6 检查kafka Pod状态

[root@master1 Kafka]# kubectl get pod -n elk 
NAME             READY   STATUS    RESTARTS   AGE
filebeat-6db9l   1/1     Running   0          62m
filebeat-qllxg   1/1     Running   0          62m
filebeat-r5hw7   1/1     Running   0          62m
kafka-0          1/1     Running   0          2m2s
http://www.xdnf.cn/news/16807.html

相关文章:

  • NVIDIA GPU架构
  • 四、Portainer图形化管理实战与Docker镜像原理
  • express-jwt报错:Error: algorithms should be set
  • Ubuntu系统VScode实现opencv(c++)视频及摄像头使用
  • [硬件电路-112]:模拟电路 - 信号处理电路 - 二极管的应用 - 峰值检测电路与波形展示
  • 【网络与爬虫 37】ScrapeFly深度解析:云端爬虫革命,告别复杂部署拥抱一键API
  • C++入门自学Day5-- c++类与对象(面试题)
  • 苹果MAC 安卓模拟器
  • HarmonyOS 开发:基于 ArkUI 实现复杂表单验证的最佳实践
  • CS课程项目设计7:基于Canvas交互友好的五子棋游戏
  • Pyspark的register方法自定义udf函数
  • Mysql在页内是怎么查找数据的?
  • Web 开发 10
  • Redis 核心概念、命令详解与应用实践:从基础到分布式集成
  • pyqt5显示任务栏菜单并隐藏主窗口,环境pyqt5+vscode
  • JVM 03 类加载机制
  • Python打卡Day30 模块和库的导入
  • LeetCode 刷题【26. 删除有序数组中的重复项、27. 移除元素、28. 找出字符串中第一个匹配项的下标】
  • vue2一种快速导入 Element UI(即 Element 2.x)方式
  • ARM Cortex-M异常处理高级特性详解
  • MCP Agent 工程框架Dify初探
  • 【C++】类和对象(2)
  • AI Agent开发学习系列 - LangGraph(4): 有多个输入的Graph(练习解答)
  • 设计模式篇:在前端,我们如何“重构”观察者、策略和装饰器模式
  • Android 运行 deno 的新方法 (3): Termux 胖喵安初
  • vue3pinia
  • 深度学习核心:卷积神经网络 - 原理、实现及在医学影像领域的应用
  • vue3 新手学习入门
  • Elasticsearch 混合检索一句 `retriever.rrf`,把语义召回与关键词召回融合到极致
  • Agents-SDK智能体开发[5]之集成MCP进阶