当前位置: 首页 > java >正文

Flink Savepoints 总结

官网

https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/ops/state/savepoints/

测试任务

先起一个测试任务,就用之前文章中的 cdc_mysql2mysql

bin/sql-client.sh -f sql/cdc_mysql2mysql.sql

使用 YARN 触发 Savepoint

根据官方文档:

bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId

那么命令应该为

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql -yid application_1750755047138_0076

报错:

java.lang.NoSuchMethodError: org.apache.commons.cli.CommandLine.hasOption(Lorg/apache/commons/cli/Option;)Zat org.apache.flink.client.cli.SavepointOptions.<init>(SavepointOptions.java:45)at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:738)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1118)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1198)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1198)

查看有哪些jar包中有这个冲突的类

grep -rl "org.apache.commons.cli.CommandLine" lib/*lib/flink-dist-1.15.3.jar
lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar
lib/flink-sql-connector-hbase-2.2-1.15.3.jar
lib/hudi-flink1.15-bundle-0.13.0.jar

最终确定冲突的包为 flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar

mv lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar lib/flink-shaded-hadoop-2-uber-3.1.1.3.1.0.0-78-9.0.jar.bak

继续尝试,但是报错:

org.apache.flink.util.FlinkException: No cluster id was specified. Please specify a cluster to which you would like to connect.at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1038)at org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:784)at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1118)at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1198)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1198)

这与不指定yid报的错一样

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql

猜测 -yid 不生效,改为使用 -Dyarn.application.id

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 /savepoint/cdc_mysql2mysql -Dyarn.application.id=application_1750755047138_0076

然后报错:

Caused by: java.io.IOException: Failed to create savepoint directory at /savepoint/cdc_mysql2mysql

观察 savepoint 相关的日志

yarn logs -applicationId application_1750755047138_0076 | grep "Savepoint"
2025-06-27 10:45:31,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]

这时,虽然因为创建 savepoint 文件夹失败了,但是其实已经触发了savepoint,所以有 savepoint 相关的日志

将 savepoint 路径加上 hdfs:// 前缀

bin/flink savepoint 24f61a106d31205a122b66e45b2984e7 hdfs:///savepoint/cdc_mysql2mysql -Dyarn.application.id=application_1750755047138_0076

成功:

Triggering savepoint for job 24f61a106d31205a122b66e45b2984e7.
Waiting for response...
Savepoint completed. Path: hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
You can resume your program from this savepoint with the run command.

查看 savepoint 结果

hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 1 items
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82

观察 savepoint 相关的日志

2025-06-27 10:20:02,159 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 5 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750990802141 for job 2025-06-27 10:45:31,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]
2025-06-27 10:46:02,645 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 19 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992362637 for job 24f61a106d31205a122b66e45b2984e7.

又多了一条 savepoint 相关的日志

我们在 web ui 中也可以看到 最新的 savepoint (这里截的别的任务的图)

-yid 不生效

在 Flink 1.10 及以后的版本中,-yid 参数已被弃用,必须使用 -Dyarn.application.id 来指定 YARN Application ID。Flink 在 2020 年左右(1.10 版本)重构了命令行参数解析系统,将所有 YARN 相关参数统一为 -D 前缀的配置项,目的是:

  • 简化参数体系:避免记忆 -m、-yid、-ytm 等特殊前缀。
  • 统一配置方式:所有参数都可以通过 -D 传递,与 flink-conf.yaml 保持一致。
  • 减少版本间兼容性问题。

使用 savepoint 取消作业 (cancel)

bin/flink cancel -s hdfs:///savepoint/cdc_mysql2mysql 24f61a106d31205a122b66e45b2984e7 -Dyarn.application.id=application_1750755047138_0076
Cancelled job 24f61a106d31205a122b66e45b2984e7. Savepoint stored in hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a.

-s--withSavepoint 的简写

观察 savepoint 相关的日志

2025-06-27 10:45:31,471 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 15 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992331455 for job 24f61a106d31205a122b66e45b2984e7.at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.initializeLocationForSavepoint(AbstractFsCheckpointStorageAccess.java:191) ~[flink-dist-1.15.3.jar:1.15.3]
2025-06-27 10:46:02,645 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 19 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992362637 for job 24f61a106d31205a122b66e45b2984e7.
2025-06-27 10:48:05,420 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 33 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1750992485404 for job 24f61a106d31205a122b66e45b2984e7.
2025-06-27 10:48:05,594 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Savepoint stored in hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a. Now cancelling 24f61a106d31205a122b66e45b2984e7.

savepoint 结果:

hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 2 items
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:48 /savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82

从 web ui 中看状态为 CANCELED :

使用 savepoint 停止作业 (stop)

先试一下停止刚才取消的作业

bin/flink stop -p hdfs:///savepoint/cdc_mysql2mysql/ 24f61a106d31205a122b66e45b2984e7 -Dyarn.application.id=application_1750755047138_0076

因为刚才的作业已经取消了,所以报错:

Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find Flink job (24f61a106d31205a122b66e45b2984e7)

那么我们再启动一个新的任务

bin/flink stop -p hdfs:///savepoint/cdc_mysql2mysql/ 37a54dad1f7c781ffc0001555b5dcee6 -Dyarn.application.id=application_1750755047138_0077
Savepoint completed. Path: hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-37a54d-0e07c7bd195d

-p--savepointPath 的简写

观察日志:

2025-06-27 10:59:34,479 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 14 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1750993174460 for job 37a54dad1f7c781ffc0001555b5dcee6.

观察结果

hadoop fs -ls /savepoint/cdc_mysql2mysql
Found 3 items
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:48 /savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:46 /savepoint/cdc_mysql2mysql/savepoint-24f61a-aa131de59d82
drwxr-xr-x   - hive hdfs          0 2025-06-27 10:59 /savepoint/cdc_mysql2mysql/savepoint-37a54d-0e07c7bd195d

根据结果可知,savepoint 路径的名称中的中间部分是 jobId 的前六个字符。

从 web ui 中看状态为 FINISHED :

SET ‘table.dml-sync’ = ‘true’;

通过 sql-client 提交的任务,默认参数下,当我们 cancel 或者 stop 任务后,只有taskmanger挂掉,jobmanager还存活,所以yarn任务还是running,可以通过设置参数: SET ‘table.dml-sync’ = ‘true’; 这样 cancel 或者 stop 任务后对应的 yarn 任务也会 FINISHED 。
具体可参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/sqlclient/

与 flink run 的 --detached(简写:-d)表现效果一样。

cancel 和 stop

默认不指定 savepoint:

bin/flink cancel 15312da78d337525a36ca3cf40f04ff9 -Dyarn.application.id=application_1750755047138_0092
bin/flink stop 15312da78d337525a36ca3cf40f04ff9 -Dyarn.application.id=application_1750755047138_0092 

这时 cancel 是成功的:

Cancelled job 15312da78d337525a36ca3cf40f04ff9.

但stop会报错:

org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.handler.RestHandlerException: Config key [state.savepoints.dir] is not set. Property [targetDirectory] must be provided.

说明 stop 必须指定 savepoint 路径,或者说在任务中配置了 state.savepoints.dir 会自动生成 savepoint
sql 中添加:

set state.savepoints.dir=hdfs:///flink/savepoints;

再stop:

Savepoint completed. Path: hdfs://cluster1/flink/savepoints/savepoint-9324cd-af49c550593d
特性flink cancelflink stop
终止方式强制终止:立即中断作业执行优雅停止:等待当前处理中的数据完成
savepoint 生成默认不生成,需通过 -s--withSavepoint 参数指定默认生成, 可以通过 -p--savepointPath 参数显式指定
适用场景作业出现故障需紧急终止;无需保留作业状态需要平滑下线作业;希望后续恢复作业(需配合 savepoint)
作业状态最终状态为 CANCELED最终状态为 FINISHED(如果所有算子成功关闭)

可以结合官网了解更多:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/deployment/cli/

从作业恢复

这与 Flink Checkpoint 的恢复方法一样。

SQL

SET execution.savepoint.path = hdfs:///savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a;

命令

-s,--fromSavepoint <savepointPath>

示例

bin/flink run -m yarn-cluster \-d \--fromSavepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-e90151-d60a4ba04076 \-c com.dkl.flink.Test \/opt/dkl/flink-demo-1.0.jar

修改任务算子

如果任务有改动,比如添加或删除了 set pipeline.operator-chaining=false; 那么在恢复任务时会抛出异常:

Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-24f61a-6adabbab053a. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.

可以通过 --allowNonRestoredState(简写:-n) 选项跳过无法映射到新程序的状态,但这样不能保证 Exactly-Once。

-n,--allowNonRestoredState

示例:

bin/flink run -m yarn-cluster \-d \--fromSavepoint hdfs://cluster1/savepoint/cdc_mysql2mysql/savepoint-e90151-d60a4ba04076 \-c com.dkl.flink.Test \--allowNonRestoredState \/opt/dkl/flink-demo-1.0.jar

没有设置 checkpoint 的任务

无论有没有设置checkpoint,都支持savepoint,通过下面的日志可以发现,当没有设置 checkpoint 时,savepoint对应的 checkpoint 数字值是连续的。

yarn logs -applicationId application_1750755047138_0093 | grep "Savepoint"
25/06/27 15:14:40 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
25/06/27 15:14:40 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
2025-06-27 15:12:42,809 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 1 (type=SavepointType{name='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @ 1751008362762 for job 94b5cf983749eec750ab8e9d78b0cd7d.
2025-06-27 15:13:04,664 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 2 (type=SavepointType{name='Suspend Savepoint', postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1751008384640 for job 94b5cf983749eec750ab8e9d78b0cd7d.

迁移支持

特性CheckpointSavepoint
迁移支持有限支持,需严格条件设计支持跨环境迁移
格式稳定性版本敏感,易损坏版本无关,向后兼容
拓扑变更支持通常不支持支持算子增删和并行度调整
手动干预复杂度高,需修改配置和代码低,CLI 命令直接支持
http://www.xdnf.cn/news/14740.html

相关文章:

  • 一文详解Modbus协议原理、技术细节及软件辅助调试
  • 【甲方安全建设】敏感数据检测工具 Earlybird 安装使用详细教程
  • PyTorch 中 nn.Linear() 参数详解与实战解析(gpt)
  • 直线模组精度等级是如何划分的?
  • Python 数据分析与机器学习入门 (五):Matplotlib 数据可视化基础
  • LeetCode Hot100(图论)
  • STM32——DAP下载程序和程序调试
  • 深入理解Webpack的灵魂:Tapable插件架构解析
  • 对selenium进行浏览器和驱动进行配置Windows | Linux
  • 华为云Flexus+DeepSeek征文 | Word办公软件接入华为云ModelArts Studio大模型,实现AI智能办公
  • 设计模式-访问者模式
  • TCPView v4.19 网络检测和拦截工具——东方仙盟
  • 《Effective Python》第十一章 性能——使用 timeit 微基准测试优化性能关键代码
  • xilinx axi datamover IP使用demo
  • HarmonyOS NEXT仓颉开发语言实战案例:电影App
  • Hive SQL 实战:电商销售数据分析全流程案例
  • 【期末分布式】分布式的期末考试资料大题整理
  • PCB工艺学习与总结-20250628
  • 推荐几本关于网络安全的书
  • Linux中《动/静态库原理》
  • python sklearn 机器学习(1)
  • Web应用开发 --- Tips
  • Windows 环境下设置 RabbitMQ 的 consumer_timeout 参数
  • 现代 JavaScript (ES6+) 入门到实战(三):字符串与对象的魔法升级—模板字符串/结构赋值/展开运算符
  • 华为云Flexus+DeepSeek征文 | 二次开发学习顾问系统对接华为云ModelArts Studio 实现智能答疑学习辅导
  • 设计模式之适配器模式
  • Unity Catalog 三大升级:Data+AI 时代的统一治理再进化
  • Leetcode 3598. Longest Common Prefix Between Adjacent Strings After Removals
  • JDK自带的HttpClient,替代Apache的更优解?
  • Spring Cloud:分布式事务管理与数据一致性解决方案