当前位置: 首页 > web >正文

ElasticStack对接kafka集群

背景

在当代数字化浪潮中,日志数据的高效处理对于企业运维监控和数据分析至关重要。本博文聚焦于ELK(Elasticsearch、Logstash、Kibana)技术栈与Kafka集群的深度对接,旨在探讨如何通过这一架构优化,实现高效、可靠且可扩展的日志处理解决方案,以应对日益增长的数据量和复杂多变的业务需求,同时减轻Logstash压力并降低其与Filebeat的耦合性,提升整个系统的性能与稳定性,为企业的数据驱动决策提供坚实的技术支撑。

Kafka集群特性适配 :Kafka具备高吞吐量(如单机每秒可处理10w + /s)、高可用性(通过多副本机制保障数据不丢失)、强扩展性(可方便地进行集群扩展以应对数据增长)以及丰富的生态集成能力(与多种编程语言和工具兼容良好)等特点,使其成为对接ELK的理想选择。其强大的消息队列功能能够很好地满足日志数据海量、实时性要求高的处理需求,确保数据在产生端(如Filebeat采集的日志)和消费端(如Logstash后续处理)之间的高效流转和可靠存储。

一、 架构图解

为了减轻Logstash压力以及Logstash和filebeat的耦合性,我们可以考虑在Logstash前面加一套MQ集群。

所谓的MQ,指的是Message Queue,即消息队列。但是这种架构无疑是给系统增加了负担:

        - 1.MQ不存在单点问题;
        - 2.MQ具有很强的处理数据能力;
        - 3.增加了集群的整体复杂性,运维和开发的同学都得增加学习成本;

也就是说,这意味消息队列要提供以下特性:

        - 1.MQ集群吞吐量大,能够承担数据的读写;5台32core,32GB读取处理消息数量23w/s,写速度可以达到220m/s,
        - 2.MQ集群要提供非常强的高可用性,不能是单点的故障;
        - 3.文档丰富,社区资源丰富;

市面上有很多MQ产品,典型代表有:

        - RocketMQ【阿里巴巴,有社区版(功能较差,文档不够丰富,仅支持Java相关的API)和SAAS版本(功能强,需要花钱),性能很好,单机每秒能够处理10w+/s】
        - ActiveMQ【老牌系统,文档相对丰富,性能一般,单机每秒处理1w+/s】
        - Kafka【日志收集,大数据分析,性能非常好,单机每秒处理10w+/s,存在丢失数据的风险,但可以忽略不计,API文档非常丰富,基于Java和Scala语言研发,二次开发比较方便,社区完善了Golang,Python等API】
        - RabbitMQ【金融公司,文档丰富,性能较好,单机每秒处理1w+/s,可以做到数据不丢失,API开发相对来说不太友好,基于Erlang语言研发,国内并不流行,因此二次开发招人比较困难。】

 二、 架构实现

首先我们需要有一套kafka集群,没有的话可以参考我之前的文章: Kafka集群部署实战_kafka部署方案-CSDN博客

1. filebeat生产kafka集群数据

# 编写filebeat并启动
[root@elk93 /etc/filebeat/config]# cat filebeat_tcp-to-kafka.yaml .
filebeat.inputs:
- type: tcphost: "0.0.0.0:9000"# 数据输出到kafka
output.kafka:# 指定kafka集群的地址hosts: - 10.0.0.91:9092- 10.0.0.92:9092- 10.0.0.93:9092# 指定topictopic: novacao-linux96-kafka
[root@elk93 /etc/filebeat/config]# rm -rf /var/lib/filebeat/
[root@elk93 /etc/filebeat/config]# filebeat -e -c `pwd`/filebeat_tcp-to-kafka.yaml# 发送测试数据
[root@elk91 ~]# echo helllllllllllllllo |nc 10.0.0.93 9000# kafka验证数据
[root@elk92 ~]# kafka-console-consumer.sh --bootstrap-server 10.0.0.93:9092 --topic novacao-linux96-kafka --from-beginning
.....
{"@timestamp":"2025-03-17T12:35:18.320Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.28"},"log":{"source":{"address":"10.0.0.91:55810"}},"input":{"type":"tcp"},"agent":{"name":"elk93","type":"filebeat","version":"7.17.28","hostname":"elk93","ephemeral_id":"73d1dee2-d555-4955-b689-75d602e1b5e0","id":"ced21de3-ed8a-4601-acba-07f0d7db5a5a"},"ecs":{"version":"1.12.0"},"host":{"name":"elk93"},"message":"helllllllllllllllo"}

2. Logstash消费kafka集群数据

kibana基于开发工具创建账号
POST /_security/api_key
{"name": "Linux96", "role_descriptors": {"filebeat_monitoring": { "cluster": ["all"],"index": [{"names": ["novacao-logstash-kafka*"],"privileges": ["all"]}]}}
}
生成实例
{"id" : "QSYgpJUBD3ll3qToqN4V","name" : "Linux96","api_key" : "EWyBlHEHTnSQlALuB41hpw","encoded" : "UVNZZ3BKVUJEM2xsM3FUb3FONFY6RVd5QmxIRUhUblNRbEFMdUI0MWhwdw=="
}# 解码数据
[root@elk91 ~]# echo UVNZZ3BKVUJEM2xsM3FUb3FONFY6RVd5QmxIRUhUblNRbEFMdUI0MWhwdw== |base64 -d ;echo
QSYgpJUBD3ll3qToqN4V:EWyBlHEHTnSQlALuB41hpw# Logstash消费数据
[root@elk93 /etc/logstash/conf.d]# cat 09-logstash-to-ES_api-keys.conf
input { kafka {# 指定kafka集群的地址bootstrap_servers => "10.0.0.91:9092,10.0.0.92:9092,10.0.0.93:9092"# 指定从kafka哪个topic拉取数据topics => ["novacao-linux96-kafka"]# 指定消费者组group_id => "linux96-001"# 指定拉取数据offset的位置点,常用值:earliest(从头拉取数据),latest(从最新的位置拉取数据)auto_offset_reset => "earliest"}
}filter {json {source => "message"}mutate {remove_field => [ "agent","@version","ecs","input","log" ]}
}  output { # stdout { #   codec => rubydebug # } elasticsearch {hosts => ["10.0.0.91:9200","10.0.0.92:9200","10.0.0.93:9200"]index => "novacao-logstash-kafka"api_key => "QSYgpJUBD3ll3qToqN4V:EWyBlHEHTnSQlALuB41hpw"ssl => truessl_certificate_verification => false}
}# 启用logstash
[root@elk93 /etc/logstash/conf.d]# logstash -rf 09-logstash-to-ES_api-keys.conf # kibana查看数据

3. Kibana查看数据

三、 总结 

此次博文深入浅出地讲解了ELK对接Kafka集群的实现细节和优势所在,为读者呈现了一套成熟、稳定且高效的日志处理架构方案。在大数据时代背景下,这一架构不仅能够有效应对海量日志数据的挑战,还为企业数字化转型过程中的数据治理和智能分析奠定了坚实基础。希望本文能为从事相关领域的技术人员提供有益的参考和借鉴,助力他们在实际工作中更好地运用这一技术组合,挖掘数据潜力,驱动业务创新发展。

http://www.xdnf.cn/news/10829.html

相关文章:

  • 【相等性比较的通解——理解 JavaScript 中的 Object.is()】
  • 高考数学易错考点02 | 临阵磨枪
  • 深入解析Playwright for Python:浏览器功能与代码实例详解
  • 【Visual Studio 2022】卸载安装,ASP.NET
  • Go Gin框架深度解析:高性能Web开发实践
  • LabVIEW磁悬浮轴承传感器故障识别
  • Windows版PostgreSQL 安装 vector 扩展
  • 服务器被攻击了怎么办
  • pikachu靶场通关笔记11 XSS关卡07-XSS之关键字过滤绕过(三种方法渗透)
  • 华为盘古 Ultra MoE 模型:国产 AI 的技术突破与行业影响
  • 每日算法刷题Day21 6.3:leetcode二分答案2道题,用时1h20min(有点慢)
  • metersphere不同域名的参数在链路测试中如何传递?
  • 【MATLAB代码】制导——三点法,二维平面下的例程|运动目标制导,附完整源代码
  • 采摘机器人项目
  • dvwa5——File Upload
  • 1.6万字测评:deepseek-r1-0528横向对比 gemini-2.5-pro-0506和claude4
  • Cursor + Claude 4:海外工具网站开发变现实战案例
  • 基于PyQt5的相机手动标定工具:原理、实现与应用
  • 【Qt】构建目录设置
  • 从0开始学习R语言--Day16--倾向得分匹配
  • 相机--相机成像原理和基础概念
  • Cursor + Claude 4:微信小程序流量主变现开发实战案例
  • Springboot中Controller接收参数的方式
  • 功能管理:基于 ABP 的 Feature Management 实现动态开关
  • iptables常用命令
  • Spring Boot + MyBatis-Plus 读写分离与多 Slave 负载均衡示例
  • MyBatis 执行 SQL 报错:String 无法转换为 Long 的排查与解决实录
  • 【Linux内核】设备模型之udev技术详解
  • Unity异常上报飞书工具
  • 如何计算H5页面加载时的白屏时间