当前位置: 首页 > ds >正文

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的适配问题

前言

记录 Flink jdbc、mysql-cdc 连接 mysql8 碰到的小问题

版本

  • Flink 1.15.3
  • mysql-cdc 2.3.0
  • MySQL 8.0.27

cdc_mysql2mysql

MySQL5

之前主要用 MySQL5 ,下面是 MySQL5 的 sql ,具体见 Flink MySQL CDC 使用总结

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

MySQL8

同样的 SQL 会报错:

Caused by: java.lang.ClassCastException: java.math.BigInteger cannot be cast to java.lang.Longat com.mysql.jdbc.ConnectionImpl.buildCollationMapping(ConnectionImpl.java:1024)

最初怀疑是该版本的 cdc 不支持 MySQL8,后来发现只需要在 jdbc 添加 driver 参数解决:

'driver' = 'com.mysql.cj.jdbc.Driver'

完整的sql:

set yarn.application.name=cdc_mysql2mysql;
set execution.target=yarn-per-job;
set parallelism.default=1;
set taskmanager.memory.process.size=3g;set execution.checkpointing.interval=10000; 
set state.checkpoints.dir=hdfs:///flink/checkpoints/cdc_mysql2mysql;
set execution.checkpointing.externalized-checkpoint-retention= RETAIN_ON_CANCELLATION;CREATE TABLE mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED, --主键必填,且要求源表必须有主键,flink主键可以和mysql主键不一致name string,price double,ts bigint,dt string
) WITH ('connector' = 'mysql-cdc','hostname' = '19.168.44.128','port' = '3306','username' = 'root','password' = 'root-123','database-name' = 'cdc','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://19.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

小结:

  • mysql-cdc: MySQL5和8的写法一样
  • jdbc: MySQL8 要添加 dirver 参数:‘driver’ = ‘com.mysql.cj.jdbc.Driver’

jdbc_mysql2mysql

MySQL8

根据上面 cdc_mysql2mysql 的经验,jdbc_mysql2mysql source 和 sink 应该都添加driver:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts bigint,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

本来以为这样就没问题了,但是会报错:

Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long

经排查发现是字段类型不一致导致的问题,因为 mysql 建表时 ts的类型为 int ,那么在flink sql 中 ts也应该为 int 而不应该为 bigint,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','driver' = 'com.mysql.cj.jdbc.Driver','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

MySQL5

验证一下 MySQL5 是不是存在和 MySQL8 一样的问题,经验证问题一样,在 MySQL5 中 ts 的类型为bigint 也会报同样的错误,完整的正确 sql 为:

set yarn.application.name=jdbc_mysql2mysql;
set execution.target=yarn-per-job;create table mysql_cdc_source (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'mysql_cdc_source'
);create table test_sink_mysql (id int PRIMARY KEY NOT ENFORCED,name string,price double,ts int,dt string
) with ('connector' = 'jdbc','url' = 'jdbc:mysql://192.168.44.128:3306/cdc?useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&rewriteBatchedStatements=true','username' = 'root','password' = 'root-123','table-name' = 'test_sink_mysql','sink.buffer-flush.max-rows' = '1000000'
);insert into test_sink_mysql(id,name,price,ts,dt) select * from mysql_cdc_source;

sink

在 cdc_mysql2mysql 中 ts bigint 就没问题, 尝试把 sink 表中 ts 的字段类型改为 bigint,最终发现 MySQL5 和 MySQL8 都没问题,也就是只有 jdbc 的 source 表对字段类型限制比较严格。

driver 参数

  • jdbc:MySQL5 添加 driver 参数也可以正常运行,但不是必须的,MySQL8 必须添加 driver 参数,所以无论是 5 还是 8 都加上 driver 参数,这样就不用区分 mysql的版本了。
  • cdc : 不支持 driver 参数

字段类型映射

官方文档:

  • jdbc: https://nightlies.apache.org/flink/flink-docs-release-2.0/zh/docs/connectors/table/jdbc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84
  • cdc : https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/#%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B%E6%98%A0%E5%B0%84

MySQL8 适配

由此可见,Flink jdbc、mysql-cdc 均适配 MySQL5 和 MySQL8,对应 jar 如下:

  • jdbc: flink-connector-jdbc-1.15.3.jar
  • cdc : flink-sql-connector-mysql-cdc-2.3.0.jar

仅需要这两个包,不需要额外的 mysql-connector-java jar包

但在 cdc 3.1 版本以上,需要额外的 mysql-connector-java jar包,具体见官网:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/mysql-cdc/

cdc 版本支持

官方文档:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.3/zh/docs/connectors/flink-sources/overview/

http://www.xdnf.cn/news/3074.html

相关文章:

  • 4.28-4.29 Vue
  • phpstudy修改Apache端口号
  • Azure Synapse Dedicated SQL pool企业权限管理
  • 论文阅读:2024 arxiv FlipAttack: Jailbreak LLMs via Flipping
  • 怎样学习Electron
  • 驱动开发硬核特训 · Day 25 (附加篇):从设备树到驱动——深入理解Linux时钟子系统的实战链路
  • PSO详解变体上新!新型混合蛾焰粒子群优化(MFPSO)算法
  • GA-Transformer遗传算法优化编码器多特征分类预测/故障诊断,作者:机器学习之心
  • 【Redis——数据类型和内部编码和Redis使用单线程模型的分析】
  • EtherCAT 分布式时钟(DC)补偿技术解析
  • React Native 动态切换主题
  • 使用js写一个发布订阅者
  • 给 BBRv2/3 火上浇油的 drain-to-target
  • 26考研 | 王道 | 计算机网络 | 第一章 计算机网络的体系结构
  • Python核心机制与实战技巧:从变量作用域到GIL的深度解析
  • 基于Springboot + vue实现的列书单读书平台
  • 技术赋能与模式重构:开源AI大模型驱动下的“一盘货”渠道革命——基于美的案例与S2B2C生态融合的实证研究
  • 部署一个自己的Spring Ai 服务(deepseek/通义千问)
  • 20250429 垂直地表发射激光测量偏转可以验证相对性原理吗
  • 学习海康VisionMaster之线圆测量
  • 一个SciPy图像处理案例的全过程
  • java 加入本地lib jar处理方案
  • 暑假里系统学习新技能(马井堂)
  • AWS创建多块盘并创建RAID0以及后增加空间
  • 【OSG学习笔记】Day 14: 操作器(Manipulator)的深度使用
  • Go语言Context机制深度解析:从原理到实践
  • 【Java核心】一文理解Java面向对象(超级详细!)
  • 测试基础笔记第十六天
  • 【沉浸式求职学习day29】【信科知识面试题第一部分】【新的模块,值得收藏】
  • Opencv中图像深度(Depth)和通道数(Channels)区别