当前位置: 首页 > news >正文

Flink CDC—实时数据集成框架

Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API),它基于数据库日志的 CDC(变更数据捕获)技术实现了统一的增量和全量数据读取。 该工具使得用户能够以 YAML 配置文件的形式,优雅地定义其 ETL(Extract, Transform, Load)流程,并协助用户自动化生成定制化的 Flink 算子并且提交 Flink 作业。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

  • ✅ 端到端的数据集成框架
  • ✅ 为数据集成的用户提供了易于构建作业的 API
  • ✅ 支持在 Source 和 Sink 中处理多个表
  • ✅ 整库同步
  • ✅具备表结构变更自动同步的能力(Schema Evolution)

一、如何使用 Flink CDC

Flink CDC 提供了基于 YAML 格式的用户 API,更适合于数据集成场景。以下是一个 YAML 文件的示例,它定义了一个数据管道(Pipeline),该Pipeline从 MySQL 捕获实时变更,并将它们同步到 Apache Doris:

source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2

通过使用 flink-cdc.sh 提交 YAML 文件,一个 Flink 作业将会被编译并部署到指定的 Flink 集群。

二、理解核心概念

1、Data Pipeline

由于Flink CDC中的事件以管道方式从上游流向下游,因此整个ETL任务被称为数据管道。

我们可以使用下面的yaml文件来定义一个简洁的数据管道,描述将MySQL app_db数据库下的所有表同步到Doris:

  source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""pipeline:name: Sync MySQL Database to Dorisparallelism: 2

 我们可以使用下面的yaml文件定义一个复杂的数据管道,描述将MySQL app_db数据库下的所有表同步到Doris,并给出特定的目标数据库名称ods_db和特定的目标表名称前缀ods_:

 source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*sink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: ""transform:- source-table: adb.web_order01projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 10 AND order_id > 100description: project fields and filter- source-table: adb.web_order02projection: \*, format('%S', product_name) as product_namefilter: addone(id) > 20 AND order_id > 200description: project fields and filterroute:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 2user-defined-function:- name: addoneclasspath: com.example.functions.AddOneFunctionClass- name: formatclasspath: com.example.functions.FormatFunctionClass

Pipeline 配置: 

支持数据管道级别的以下配置选项:

parametermeaningoptional/required
name管道的名称,将作为作业名称提交到Flink集群。optional
parallelism管道的全局并行性。默认为1。optional
local-time-zone本地时区定义当前会话时区id。optional

2、Data Source

数据源用于访问元数据,并从外部系统读取更改的数据。数据源可以同时从多个表中读取数据。

要描述数据源,需要以下内容:

parametermeaningoptional/required
type数据源的类型,如mysql。required
name数据源的名称,由用户定义(提供默认值)。optional
configurations of Data Source用于构建数据源的配置,例如连接配置和源表属性。optional

source:type: mysqlname: mysql-source   #optional,description informationhost: localhostport: 3306username: adminpassword: passtables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_\.*

3、Data Sink

4、Table Id

5、Transform 

http://www.xdnf.cn/news/452017.html

相关文章:

  • [已解决] VS Code / Cursor / Trae 的 PowerShell 终端 conda activate 进不去环境的常见问题
  • JAVA实战开源项目:校园网上店铺系统 (Vue+SpringBoot) 附源码
  • 用 wireshark 解密 SIP over TLS 以及 SRTP 解密
  • libmemcached库api接口讲解三
  • 速来体验丨MaxKB v1.10.7 LTS版本发布,支持接入Qwen3
  • 嵌入式学习笔记DAY20(链表,gdb调试)
  • vue2 头像上传+裁剪组件封装
  • FFplay 音视频同步机制解析:以音频为基准的时间校准与动态帧调整策略
  • 动态稀疏化训练系统设计:从算法到GPU硬件协同优化
  • C语言—再学习(指针)
  • C++(2)
  • 中国古代史7
  • 230. 二叉搜索树中第 K 小的元素
  • day25 python异常处理
  • c#中equal方法与gethashcode方法之间有何关联?
  • 2025五一杭州西湖三天游
  • 大涡模拟实战:从区域尺度到街区尺度的大气环境模拟
  • 【python】UnicodeDecodeError: ‘gbk‘ codec can‘t decode byte 0xb2
  • 一种资源有限单片机处理cJSON数据的方法
  • 编写第一个MCP Client之Hello world
  • Android RTL语言视图适配(保加利亚,阿拉伯语种等)
  • JAVA中的文件操作
  • sqli—labs第六关——双引号报错注入
  • BitMart合约交易体验 BitMart滑点全赔的底层逻辑
  • 朱老师,3518系列,第八季
  • 使用Git+Cron实现BIND的Named域名配置自动化管理!
  • D2203使用手册—高压、小电流LDO产品4.6V~36V、150mA
  • AD 异性铺铜
  • 破解商业综合体清洁管理困局:商业空间AI智能保洁管理系统全场景解决方案
  • CodeBuddy 接入 MCP,一键生成网站!