当前位置: 首页 > news >正文

Logstash数据迁移之mysql-to-kafka.conf详细配置

在 Logstash 中配置从 MySQL 到 Kafka 的数据传输是一个非常经典且强大的用例,常用于数据同步、CDC(变更数据捕获)和实时数据管道

下面我将详细解析配置文件的每个部分,并提供多个场景的示例。

核心架构与组件

数据流:MySQL → Logstash (jdbc inputfilterkafka output) → Kafka

为了实现高效的增量同步,其核心工作机制如下所示:

在这里插入图片描述

基础配置文件详解 (mysql-to-kafka.conf)

input {jdbc {# 【必需】JDBC 连接字符串jdbc_connection_string => "jdbc:mysql://localhost:3306/your_database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"# 【必需】数据库用户名和密码jdbc_user => "your_username"jdbc_password => "your_password"# 【必需】MySQL JDBC 驱动路径# 需要手动下载 https://dev.mysql.com/downloads/connector/j/jdbc_driver_library => "/path/to/mysql-connector-java-8.0.x.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver" # 注意类名# 【必需】要执行的 SQL 语句# 1. 使用增量字段(如update_time, id)进行增量查询# 2. :sql_last_value 是Logstash提供的变量,记录上一次执行的值statement => "SELECT * FROM your_table WHERE update_time > :sql_last_value"# 【强烈建议】定时执行,使用cron表达式。例如每分钟一次。schedule => "* * * * *"# 【强烈建议】记录上次查询结果的字段值(如最大的update_time或id)# 此文件由Logstash管理,用于下次查询的:sql_last_valuerecord_last_run => truelast_run_metadata_path => "/path/to/.logstash_jdbc_last_run" # 【可选】是否强制将JDBC列的字符串转换为UTF-8jdbc_default_timezone => "UTC"jdbc_force_standard_timezone => true# 【可选】分页查询,用于处理大表jdbc_paging_enabled => truejdbc_page_size => 100000}
}filter {# 此处是进行数据清洗和转换的地方,根据需求添加。# 例如:# 1. 删除不必要的字段mutate {remove_field => ["@version", "@timestamp"]}# 2. 如果需要,可以将记录转换为JSON字符串(如果Kafka希望接收字符串消息)# json {#   source => "message"#   target => "value"# }
}output {kafka {# 【必需】Kafka集群的broker列表bootstrap_servers => "kafka-broker1:9092,kafka-broker2:9092"# 【必需】目标Topic的名称topic_id => "mysql-your_table-topic"# 【必需】指定消息的序列化格式。通常使用JSON。codec => json# 【可选】消息
http://www.xdnf.cn/news/1381123.html

相关文章:

  • 领悟8种常见的设计模式
  • 导入文件允许合并表格
  • HBase Compaction HFile 可见性和并发安全性分析
  • audioMAE模型代码分析
  • 流程控制语句(3)
  • 帕萨特盘式制动器cad+设计说明书
  • 【C语言16天强化训练】从基础入门到进阶:Day 13
  • week5-[一维数组]归并
  • 公共字段自动填充
  • 云计算学习100天-第29天
  • 基于SamOut的音频Token序列生成模型训练指南
  • Linux shell getopts 解析命令行参数
  • 算力沸腾时代,如何保持“冷静”?国鑫液冷SY4108G-G4解锁AI服务器的“绿色空调”!
  • 使用Rag 命中用户feedback提升triage agent 准确率
  • Elasticsearch数据迁移方案深度对比:三种方法的优劣分析
  • linu 网络 :TCP粘包及UDP
  • 【C++】C++11的右值引用和移动语义
  • STAGEWISE实战指南:从集成到使用的完整解决方案
  • vscode pyqt5设置
  • 【ai编辑器】使用cursor-vip获得cursor的pro版 pro plan(mac)
  • uniapp vue3 canvas实现手写签名
  • Flask测试平台开发,登陆重构
  • (二分查找)Leetcode34. 在排序数组中查找元素的第一个和最后一个位置+74. 搜索二维矩阵
  • 并发编程——05 并发锁机制之深入理解synchronized
  • 学习数据结构(13)二叉树链式结构下
  • 线程池及线程池单例模式
  • 带动态条件的模糊查询SQL
  • DINOv2 vs DINOv3 vs CLIP:自监督视觉模型的演进与可视化对比
  • LeetCode 3446. 按对角线进行矩阵排序
  • UE5提升分辨率和帧率的方法