当前位置: 首页 > news >正文

日志收集(ELK)

日志收集

日志收集方案

技术栈:
logstash(强大、占用大) + elasticsearch + kibana + kafka
一般如果项目简单,可以只使用filebeat(轻量级);如果项目特别复杂,也可以采用filebeat+logstash的方案,filebeat(轻量级适合边缘收集),信息聚合到logstash,做最后的信息处理

技术流程:
收集、传输、存储、(处理)、查询

技术组件流程:
logstash 简单收集和预处理日志 -> kafka -> logstash 聚合、复杂处理日志 -> elasticsearch 存储日志 -> kibana 可视化查询日志

Kafka

kafka 定位

用于做日志消息的缓冲

kafka 安装(docker)

基于 GraalVM 安装的kafka

docker pull apache/kafka-native:4.0.0
docker run -p 9092:9092 apache/kafka-native:4.0.0

kafka 可视化安装 kafdrop

docker run -d \--name kafdrop \-p 9000:9000 \-e KAFKA_BROKERCONNECT=localhost:9092 \  # 替换为你的 Kafka 地址: 可以是集群 kafka-node1:9092,kafka-node2:9092等obsidiandynamics/kafdrop:latest

一行代码部署:

docker run -d --name kafdrop -p 9000:9000 -e KAFKA_BROKERCONNECT=localhost:9092 obsidiandynamics/kafdrop:latest

kafka 和 kafdrop 的 Docker Compose 部署

依赖 Kraft 机制

version: '3.8'services:kafka:container_name: kafkaimage: apache/kafka-native:4.0.0restart: unless-stoppedports:- "9092:9092"- "9094:9094"  # 外部访问端口(对应PLAINTEXT_HOST)environment:KAFKA_NODE_ID: 1KAFKA_PROCESS_ROLES: "broker,controller"KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"# 监听器配置(保持不变)KAFKA_LISTENERS: "PLAINTEXT://:9092,PLAINTEXT_HOST://:9094,CONTROLLER://:9093"KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9094"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT"KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"KAFKA_MESSAGE_MAX_BYTES: 104857600KAFKA_REPLICA_FETCH_MAX_BYTES: 104857600KAFKA_LOG_DIRS: "/tmp/kafka-logs"volumes:- kafka-data:/tmp/kafka-logs# 关键修复:指定容器内运行用户为root(避免权限问题,仅适合开发环境)user: "root"kafdrop:container_name: kafdropimage: obsidiandynamics/kafdrop:latestrestart: unless-stoppedports:- "9000:9000"environment:KAFKA_BROKERCONNECT: "kafka:9092"JVM_OPTS: "-Xms16M -Xmx48M"depends_on:- kafkavolumes:kafka-data:

依赖 zookeeper 组件

  kafdrop:container_name: kafdropimage: obsidiandynamics/kafdroprestart: "no"ports:- "9000:9000"environment:KAFKA_BROKERCONNECT: "kafka:29092"JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"depends_on:- "kafka"kafka:container_name: kafkaimage: obsidiandynamics/kafkarestart: "no"ports:- "2181:2181"- "9092:9092"environment:KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"KAFKA_ZOOKEEPER_SESSION_TIMEOUT: "6000"KAFKA_RESTART_ATTEMPTS: "10"KAFKA_RESTART_DELAY: "5"ZOOKEEPER_AUTOPURGE_PURGE_INTERVAL: "0"

ELK 日志收集和部署

工作原理

logstash 简单收集和预处理日志 -> kafka -> logstash 聚合、复杂处理日志 -> elasticsearch 存储日志 -> kibana 可视化查询日志

组件安装(docker-compose)

version: '3.8'services:# Kafka (使用完整镜像,包含控制台工具)kafka:image: confluentinc/cp-kafka:7.5.0  # 完整镜像,包含所有工具container_name: elk-kafkarestart: unless-stoppedports:- "9092:9092"  # 外部访问端口environment:# 基础配置(与原功能保持一致)KAFKA_NODE_ID: 1KAFKA_PROCESS_ROLES: "broker,controller"KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"KAFKA_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"  # 容器内服务名访问KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"KAFKA_LOG_DIRS: "/tmp/kafka-logs"KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"  # 自动创建主题# 新增:固定集群ID(避免重启后集群ID变化导致的元数据问题)CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"volumes:- kafka-data:/tmp/kafka-logs  # 数据卷保持不变,兼容原有数据user: "0:0"  # 解决权限问题(允许读写数据卷)networks:- elk-kafka# 健康检查(使用kafka-topics.sh验证服务可用性)healthcheck:test: ["CMD", "kafka-topics.sh", "--bootstrap-server", "localhost:9092", "--list"]interval: 10stimeout: 5sretries: 5start_period: 60s  # Kafka启动较慢,延长初始检查等待时间# Elasticsearch (保持不变)elasticsearch:image: elasticsearch:8.14.3container_name: elk-elasticsearchrestart: unless-stoppedports:- "9200:9200"environment:- ES_JAVA_OPTS=-Xms512m -Xmx512m- discovery.type=single-node- xpack.security.enabled=false- xpack.ml.enabled=truevolumes:- es-data:/usr/share/elasticsearch/datanetworks:- elk-kafkahealthcheck:test: ["CMD", "curl", "-f", "http://localhost:9200/_cluster/health"]interval: 10stimeout: 5sretries: 5start_period: 30s# Kibana (保持不变)kibana:image: kibana:8.14.3container_name: elk-kibanarestart: unless-stoppedports:- "5601:5601"environment:- ELASTICSEARCH_HOSTS=http://elasticsearch:9200depends_on:elasticsearch:condition: service_healthynetworks:- elk-kafka# Logstash Producer (保持不变)logstash-producer:image: logstash:8.14.3container_name: elk-logstash-producerrestart: unless-stoppedports:- "5000:5000"- "8080:8080"volumes:- ./pipeline/producer:/usr/share/logstash/pipelineenvironment:- LOGSTASH_JAVA_OPTS=-Xms256m -Xmx256mdepends_on:kafka:condition: service_healthynetworks:- elk-kafka# Logstash Consumer (保持不变)logstash-consumer:image: logstash:8.14.3container_name: elk-logstash-consumerrestart: unless-stoppedvolumes:- ./pipeline/consumer:/usr/share/logstash/pipelineenvironment:- LOGSTASH_JAVA_OPTS=-Xms256m -Xmx256mdepends_on:kafka:condition: service_healthyelasticsearch:condition: service_healthynetworks:- elk-kafkavolumes:kafka-data:es-data:networks:elk-kafka:driver: bridge
# 网络配置
# 如果网络不存在,Compose 会自动创建该网络(类型为配置中指定的 bridge)。
# 主要知道三种网络类型:bridge、host、none
# bridge:默认网络类型,单主机、多容器之间通信。外部(主机)无法访问
# host:容器直接使用主机的网络,无需映射,外部(主机)可以直接访问
# none:容器没有网络,只能通过其他容器的 IP 通信。外部(主机)无法访问# telnet localhost 5000
# 输入测试日志(例如JSON格式):
#{"level":"info","message":"test log","time":"2025-08-20 10:00:00"}# 在 Kibana(http://localhost:5601)中创建索引模式 app-logs-*,即可查看日志。

配置 logstash 采集日志

在 logstash 目录下创建 pipeline 目录,在 pipeline 目录下创建 logstash.conf 文件,配置 logstash 采集日志。

logstash-producer 配置

# 生产端:接收 TCP 日志 → 处理 → 发送到 Kafka
input {tcp {port => 5000  # 监听端口(集群扩展:多实例需不同主机端口,通过负载均衡统一入口)codec => "json"  # 解析 JSON 格式日志type => "tcp-logs"mode => "server"host => "0.0.0.0"  # 允许所有IP连接(生产环境可限制来源IP)}# 追加 HTTP 输入,便于本机 curl 测试http {port => 8080host => "0.0.0.0"codec => "json"}
}filter {# 时间字段转换(将日志中的time字段映射到ES的@timestamp)date {match => ["time", "yyyy-MM-dd HH:mm:ss", "ISO8601"]  # 支持多种时间格式target => "@timestamp"tag_on_failure => ["_time_parse_failure"]  # 解析失败标记,便于排查}# 新增字段:标记日志来源(集群扩展时可区分不同生产端)mutate {add_field => {"log_source" => "tcp-5000""producer_node" => "${HOSTNAME}"  # 生产端节点名,便于追踪}}# 过滤无效日志(如空消息)if [message] == "" {drop {}}
}output {# 发送到 Kafka(日志缓冲)kafka {bootstrap_servers => "kafka:9092"  # Kafka 地址(集群扩展:改为Kafka集群地址列表,如"kafka1:9092,kafka2:9092")topic_id => "app-logs"  # 目标主题(集群扩展:可按日志类型拆分多个主题,如"app-logs-error"、"app-logs-info")codec => "json"  # 保持JSON格式# 性能与可靠性配置batch_size => 500  # 批量发送大小(集群扩展时增大,如1000)linger_ms => 500  # 批量等待时间(毫秒)compression_type => "snappy"  # 启用压缩,减少网络传输retries => 3  # 失败重试次数acks => "1"  # 至少一个副本确认(集群扩展:可设为"all"提高可靠性)}# 调试输出(生产环境可注释)stdout {codec => rubydebug { metadata => false }}
}

logstash-consumer 配置

# 消费端:从 Kafka 接收日志 → 处理 → 发送到 Elasticsearch
input {kafka {bootstrap_servers => "kafka:9092"  # Kafka 地址(集群扩展:改为Kafka集群地址列表)topics => ["app-logs"]  # 消费的主题(集群扩展:如需多主题,可改为数组如["app-logs", "system-logs"])group_id => "logstash-es-group"  # 消费者组ID(集群扩展:多个消费端实例需使用相同group_id实现负载均衡)# 消费策略auto_offset_reset => "earliest"  # 首次启动从最早消息开始(生产环境可设为"latest")enable_auto_commit => true  # 自动提交偏移量auto_commit_interval_ms => 5000  # 自动提交间隔(毫秒)# 性能配置(集群扩展时调整)consumer_threads => 3  # 消费线程数(建议不超过Kafka分区数)fetch_min_bytes => 1048576  # 1MB,批量拉取最小大小fetch_max_wait_ms => 500  # 拉取等待超时(毫秒)max_poll_records => 500  # 每次拉取最大记录数codec => "json"  # 解析Kafka中的JSON日志}
}filter {# 清洗数据:移除不需要的字段(根据实际需求调整)mutate {remove_field => ["@version", "kafka"]  # 移除Logstash内部字段和Kafka元数据}# 按日志级别添加标签(便于后续查询过滤)if [level] == "error" {mutate { add_tag => ["critical"] }}
}output {# 发送到 Elasticsearchelasticsearch {hosts => ["http://elasticsearch:9200"]  # ES地址(集群扩展:改为ES集群地址列表,如"http://es1:9200,http://es2:9200")index => "logstash"  # 按日期分片索引(集群扩展:可按日志类型拆分索引,如"error-logs-%{+YYYY.MM.dd}")# 性能与可靠性配置document_id => "%{[@metadata][kafka][offset]}-%{[@metadata][kafka][partition]}"  # 生成唯一ID,避免重复action => "index"retry_on_conflict => 3  # 文档冲突重试次数}# 调试输出(生产环境可注释)stdout {codec => rubydebug { metadata => false }}
}

测试

# PowerShell发送HTTP测试日志
$payload = '{"level":"info","message":"http test success","time":"2025-08-21 12:00:00"}'
Invoke-RestMethod -Uri http://127.0.0.1:8080 -Method Post -ContentType 'application/json' -Body $payload

命令返回ok后,对每一个链路进行检查,是否有异常。

  1. 查看发送后,logstash-producer是否收到日志
    过滤包含 “test-service” 的日志
    docker logs --tail 100 elk-logstash-producer | findstr “http test success”

  2. 查看kafka是否收到日志
    进入 Kafka 容器
    docker exec -it elk-kafka bash

消费 app-logs 主题的消息(从开头)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic app-logs --from-beginning

  1. 查看logstash-consumer是否消费了日志存入es中
    过滤包含 “test-service” 的日志
    docker logs --tail 100 elk-logstash-consumer | findstr “http test success”

  2. 查看elasticsearch是否收到日志(5. 查看kibana是否收到日志)
    询 app-logs-2025.08.22 索引中的测试日志
    curl.exe “http://127.0.0.1:9200/app-logs-2025.08.22/_search?q=service:test-service&pretty”

查看日志

在 Kibana(http://localhost:5601)中创建索引模式 app-logs-*,即可查看日志。

http://www.xdnf.cn/news/1339255.html

相关文章:

  • javaweb开发笔记——微头条项目开发
  • 【笔记】Facefusion3.3.2 之 NSFW 检测屏蔽测试
  • Windows 系统中,添加打印机主要有以下几种方式
  • macos使用FFmpeg与SDL解码并播放H.265视频
  • Git常用操作大全(附git操作命令)
  • 【LeetCode】18. 四数之和
  • 微服务的编程测评系统13-我的竞赛列表-elasticSearch
  • javaweb开发笔记—— 前端工程化
  • Spring Boot 集成 Redis 发布订阅实现消息通信
  • 计算机网络技术学习-day6《三层交换机配置》
  • 01 网络信息内容安全--绪论
  • 2025.7.19卡码刷题-回溯算法-组合
  • Web 安全之 HTTP 响应截断攻击详解
  • 数据结构初阶:排序算法(三)归并排序、计数排序
  • 【数据结构】深入解析选择排序与堆排序:从基础到高效实现的完全指南
  • 深度卷积神经网络AlexNet
  • openEuler系统中r如何将docker安装在指定目录
  • 神经网络中 标量求导和向量求导
  • 如何通过传感器选型优化,为设备寿命 “续航”?
  • 机器学习6
  • RootDB:一款开源免费的Web报表工具
  • 0821 sqlite3_get_table函数(数据库函数的补充)
  • Vue.js 中使用 Highcharts 构建响应式图表 - 综合指南
  • 遥感机器学习入门实战教程|Sklearn案例⑤:集成学习方法全览
  • Python学习-- 数据库和MySQL入门
  • CentOS 7常用国内源配置:阿里云、腾讯云、华为云、清华源
  • pycharm编译器如何快速掌握一个新模块的使用方法
  • JeeSite 快速开发平台:全能企业级快速开发解决方案
  • 【图像算法 - 20】慧眼识病:基于深度学习与OpenCV的植物叶子疾病智能识别系统
  • Python-Pandas GroupBy 进阶与透视表学习