当前位置: 首页 > java >正文

Java应用Flink CDC监听MySQL数据变动内容输出到控制台

文章目录

  • maven 依赖
  • 自定义数据变化处理器
  • flink cdc监听
  • 验证

maven 依赖

<properties><flink.version>1.14.0</flink.version><flink-cdc.version>2.3.0</flink-cdc.version></properties><dependencies><!-- Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>${flink.version}</version></dependency></dependencies>

自定义数据变化处理器

package org.example;import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;public class CustomSink extends RichSinkFunction<String> {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);}@Overridepublic void close() throws Exception {super.close();}@Overridepublic void invoke(String value, Context context) throws Exception {//0P字段,该字段也有4种取值。分别是C(Create ) , U(Updlate) . D(Delete ),Read 。// 对于U操作,其数据部分同时包含了Before和After.System.out.println(">>>" + value);}
}

flink cdc监听

package org.example;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MysqlSourceExample {public static void main(String[] args) throws Exception {DebeziumDeserializationSchema debeziumDeserializationSchema = new JsonDebeziumDeserializationSchema();MySqlSource<String> source = MySqlSource.builder().hostname("127.0.0.1").port(3306).databaseList("canal_manager")// set captured database.tableList("canal_manager.canal_user")// set captured table.startupOptions(StartupOptions.latest()) // 设置从最新的修改记录开始读取.username("root").password("123456").deserializer(debeziumDeserializationSchema) // converts SourceRecord to JSON string.includeSchemaChanges(true).build();//启动一个webuI。Configuration configuration = new Configuration();configuration.setInteger(RestOptions.PORT, 8081);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);//检者点间隔时间env .enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new CustomSink());env.execute();}
}

验证

启动后web页面地址访问http://localhost:8081/,MySQL数据库canal_manager中的表canal_user数据发生修改,控制台有输出json:
在这里插入图片描述

http://www.xdnf.cn/news/12598.html

相关文章:

  • 家政小程序开发——AI+IoT技术融合,打造“智慧家政”新物种
  • 腾讯 ovCompose 跨平台框架发布,几年后还会有人用吗?
  • VScode 使用 git 提交数据到指定库的完整指南
  • TensorFlow深度学习实战(20)——自组织映射详解
  • 【Java开发日记】说一说 SpringBoot 中 CommandLineRunner
  • PyTorch 中contiguous函数使用详解和代码演示
  • 第4章——springboot自动配置
  • LabVIEW音频测试分析
  • ​React Hooks 的闭包陷阱问题
  • YoloV12改进策略:Block改进|TAB,融合组内自注意力(IASA)和组间交叉注意力(IRCA)|即插即用
  • spring:实例工厂方法获取bean
  • 408考研逐题详解:2009年第33题
  • 第22讲、Odoo18 QWeb 模板引擎详解
  • 【PCIe总线】 -- PCI、PCIe相关实现
  • 第十五届蓝桥杯单片机国赛
  • SQL-labs通关(level1-22)
  • 主流信创数据库对向量功能的支持对比
  • Vue Fragment vs React Fragment
  • Mysql-定时删除数据库中的验证码
  • Golang基础学习
  • 性能测试-jmeter实战2
  • 注意力热图可视化
  • Ubuntu 下开机自动执行命令的方法
  • win11无法打开.bat文件、打开.bat文件闪退解决方案,星露谷smapi mod安装时,.bat安装文件一闪而
  • CMake基础:构建流程详解
  • 变幻莫测:CoreData 中 Transformable 类型面面俱到(一)
  • 通过日志分析华为CangjieMagic智能体框架的工作机理
  • CAD2025安装教程与资源下载
  • AI 开发之 Python 编程技能
  • Redis 实现分布式锁:深入剖析与最佳实践(含Java实现)