生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts Launch plans
生产级编排AI工作流套件:Flyte全面使用指南 — Core concepts Launch plans
Flyte 是一个开源编排器,用于构建生产级数据和机器学习流水线。它以 Kubernetes 作为底层平台,注重可扩展性和可重复性。借助 Flyte,用户团队可以使用 Python SDK 构建流水线,并将其无缝部署在云端和本地环境中,从而实现分布式处理和高效的资源利用。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。
启动计划
启动计划是工作流调用的模板。它将以下元素整合在一起:
- 一个工作流
- 启动该工作流所需的(可能不完整)输入参数集合
- 可选的通知和调度计划
当调用时,启动计划会传递输入参数来启动工作流。如果启动计划未包含所有必需的工作流输入参数,则需要在执行时提供额外的输入参数。
默认启动计划
每个工作流自动附带一个_默认启动计划_。该启动计划不定义任何默认输入参数,因此所有参数必须在执行时提供。默认启动计划始终与其对应工作流同名。
启动计划具有版本控制
与任务和工作流一样,启动计划具有版本控制。可以更新启动计划来更改输入参数集合、调度计划或通知配置。每次更新都会创建新的启动计划版本。
自定义启动计划
除了默认启动计划外,可以为任何工作流定义额外的启动计划。通常,一个工作流可以关联多个启动计划,但每个启动计划只能关联一个特定工作流。
查看工作流的启动计划
要查看指定工作流的启动计划,在UI中导航至工作流页面并点击Launch Workflow。从Launch Plan下拉菜单中可选择用于启动工作流的启动计划。默认情况下会选中默认启动计划。如果未为该工作流定义任何自定义启动计划,则仅显示默认计划。若已定义自定义启动计划,它们将与默认计划一起显示在下拉菜单中。更多细节请参考运行启动计划。
注册启动计划
通过命令行注册启动计划
大多数情况下,启动计划与项目代码中的工作流和任务一起定义,并通过CLI与其他实体一起批量注册(参见运行代码)。
使用FlyteRemote
在Python中注册启动计划
与所有Flyte命令行操作类似,您也可以通过编程方式使用FlyteRemote
注册启动计划,具体方法是调用FlyteRemote.register_launch_plan
。
注册结果
当上述代码注册到Flyte时,会创建四个对象:
- 任务
workflows.launch_plan_example.my_task
- 工作流
workflows.launch_plan_example.my_workflow
- 默认启动计划
workflows.launch_plan_example.my_workflow
(注意其名称与工作流相同) - 自定义启动计划
my_workflow_custom_lp
(即我们在代码中定义的那个)
修改启动计划
通过修改代码中的定义并重新注册来更改启动计划。当重新注册具有相同项目(project)、域(domain)和名称的启动计划时,将创建该启动计划的新版本。
定义启动计划
您可以使用 LaunchPlan
类 定义启动计划。
以下是一个定义启动计划的简单示例:
import flytekit as fl@fl.workflow
def my_workflow(a: int, b: str) -> str:return f"Result: {a} and {b}"# 创建默认启动计划
default_lp = @fl.LaunchPlan.get_or_create(workflow=my_workflow)# 创建命名启动计划
named_lp = @fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_custom_launch_plan"
)
默认与固定输入
默认输入可在执行时被覆盖,而固定输入不可修改。
import flytekit as fl# 带默认输入的启动计划
lp_with_defaults = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_defaults",default_inputs={"a": 42, "b": "default_value"}
)# 带固定输入的启动计划
lp_with_fixed = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_fixed",fixed_inputs={"a": 100} # 'a' 将始终为 100,只有 'b' 可被指定
)# 组合默认与固定输入
lp_combined = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="combined_inputs",default_inputs={"b": "default_string"},fixed_inputs={"a": 200}
)
定时执行
import fl
from datetime import timedelta
from flytekit.core.schedule import CronSchedule, FixedRate# 使用 cron 调度(每周一 UTC 时间 10:00 AM 运行)
cron_lp = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="weekly_monday",default_inputs={"a": 1, "b": "weekly"},schedule=CronSchedule(schedule="0 10 * * 1", # Cron 表达式: 分钟 小时 日 月 周几kickoff_time_input_arg=None)
)# 使用固定频率调度(每 6 小时运行)
fixed_rate_lp = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="every_six_hours",default_inputs={"a": 1, "b": "periodic"},schedule=FixedRate(duration=timedelta(hours=6))
)
标签与注解
标签和注解有助于组织管理,可用于过滤或添加元数据。
import fl
from flytekit.models.common import Labels, Annotations# 添加标签和注解
lp_with_metadata = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_metadata",default_inputs={"a": 1, "b": "metadata"},labels=Labels({"team": "data-science", "env": "staging"}),annotations=Annotations({"description": "测试用启动计划", "owner": "jane.doe"})
)
执行参数
import fl# 设置最大并行度限制并发任务执行
lp_with_parallelism = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_parallelism",default_inputs={"a": 1, "b": "parallel"},max_parallelism=10 # 最多允许 10 个任务节点并发执行
)# 禁用该启动计划的执行缓存
lp_no_cache = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="no_cache",default_inputs={"a": 1, "b": "fresh"},overwrite_cache=True # 总是全新执行,忽略缓存结果
)# 注册时自动激活
lp_auto_activate = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="auto_active",default_inputs={"a": 1, "b": "active"},auto_activate=True # 注册后立即激活启动计划
)
安全与认证
我们可以覆盖用于执行启动计划的认证角色(IAM 角色或 Kubernetes 服务账户)。
import fl
from flytekit.models.common import AuthRole
from flytekit import SecurityContext# 为启动计划设置认证角色
lp_with_auth = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_auth",default_inputs={"a": 1, "b": "secure"},auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/my-execution-role")
)# 设置安全上下文
lp_with_security = fl.LaunchPlan.get_or_create(workflow=my_workflow,name="with_security",default_inputs={"a": 1, "b": "context"},security_context=SecurityContext(run_as=SecurityContext.K8sServiceAccount(name="my-service-account"))
)
原始输出数据配置
from flytekit.models.common import RawOutputDataConfig# 配置大型输出存储位置
lp_with_output_config = LaunchPlan.get_or_create(workflow=my_workflow,name="with_output_config",default_inputs={"a": 1, "b": "output"},raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://my-bucket/workflow-outputs/")
)
完整整合示例
以下是一个较为全面的示例。该自定义启动计划包含:
comprehensive_lp = LaunchPlan.get_or_create(workflow=my_workflow,name="comprehensive_example",default_inputs={"b": "configurable"},fixed_inputs={"a": 42},schedule=CronSchedule(schedule="0 9 * * *"), # 每日 UTC 时间 9:00 AMnotifications=[\Notification(\phases=["SUCCEEDED", "FAILED"],\email=EmailNotification(recipients_email=["team@example.com"])\)\],labels=Labels({"env": "production", "team": "data"}),annotations=Annotations({"description": "每日数据处理"}),max_parallelism=20,overwrite_cache=False,auto_activate=True,auth_role=AuthRole(assumable_iam_role="arn:aws:iam::12345678:role/workflow-role"),raw_output_data_config=RawOutputDataConfig(output_location_prefix="s3://results-bucket/daily-run/")
)
这些示例展示了 Flyte 中启动计划的灵活性,您可以根据工作流需求自定义执行参数、输入、调度等多种配置。
查看启动计划
通过用户界面查看启动计划
在侧边栏选择Launch Plans,将显示项目与域中所有已注册启动计划的列表:
您可以通过以下方式筛选:
- 按名称搜索启动计划
- 过滤仅显示已归档的启动计划
启动计划表格列定义如下:
- 名称:启动计划的名称。点击可查看具体详情
- 触发器:
- 若启动计划处于激活状态,会显示绿色Active徽章。激活状态下,所有关联的调度将生效并按计划触发
- 显示是否包含触发器。可通过右上角Has Triggers复选框过滤含触发器的启动计划
- 最后执行:最近一次执行的时间戳(包含调度触发、手动触发等所有方式)
- 最近10次执行:以可视化方式展示最近10次执行记录(包含所有触发方式)
点击列表条目可进入具体启动计划视图:
在此界面可查看:
- 启动计划详情(最新版本):
- 预期输入:启动计划的输入输出类型
- 固定输入:显示预定义的输入值(如有)
- 启动计划版本:该计划的所有历史版本列表
- 所有执行记录:该计划的所有执行历史
右上角显示激活状态(若激活则显示具体激活版本),并提供版本切换或完全停用的控制选项。详见激活与停用
通过uctl
命令行查看启动计划
查看项目与域中所有启动计划:
$ uctl get launchplans \--project <project-id> \--domain <domain>
查看具体启动计划:
$ uctl get launchplan \--project <project-id> \--domain <domain> \<launch-plan-name>
更多详情请参考Uctl CLI文档
通过Python的FlyteRemote
查看启动计划
使用FlyteRemote.client.list_launch_plans_paginated
方法获取启动计划列表。
通知
一个启动计划(launch plan)可以关联一个或多个通知,当该启动计划关联的工作流(workflow)执行完成时,这些通知会被触发。
共有三种类型的通知:
Email
: 向指定收件人发送电子邮件PagerDuty
: 向配置的PagerDuty服务发送通知(需指定接收方)。PagerDuty将根据您的配置转发通知Slack
: 向指定Slack频道的关联邮箱地址发送通知。此功能要求预先配置Slack账户以接收通知
可以根据工作流执行的不同最终状态发送对应的通知。可选状态包括:
WorkflowExecutionPhase.ABORTED
(执行中止)WorkflowExecutionPhase.FAILED
(执行失败)WorkflowExecutionPhase.SUCCEEDED
(执行成功)WorkflowExecutionPhase.TIMED_OUT
(执行超时)
示例:
from datetime import datetimeimport flytekit as flfrom flytekit import (WorkflowExecutionPhase,Email,PagerDuty,Slack
)@fl.task
def add_numbers(a: int, b: int, c: int) -> int:return a + b + c@fl.task
def generate_message(s: int, kickoff_time: datetime) -> str:return f"sum: {s} at {kickoff_time}"@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime) -> str:return generate_message(add_numbers(a, b, c),kickoff_time,)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},notifications=[\Email(\phases=[WorkflowExecutionPhase.FAILED],\recipients_email=["me@example.com", "you@example.com"],\),\PagerDuty(\phases=[WorkflowExecutionPhase.SUCCEEDED],\recipients_email=["myboss@example.com"],\),\Slack(\phases=[\WorkflowExecutionPhase.SUCCEEDED,\WorkflowExecutionPhase.ABORTED,\WorkflowExecutionPhase.TIMED_OUT,\],\recipients_email=["your_slack_channel_email"],\),\],
)
调度计划
启动计划允许您对工作流的定时调用进行调度。一个启动计划可以关联一个或多个调度方案,但同一时间最多只能有一个调度处于激活状态。如果在启动计划上激活了调度,系统将按照预定时间自动调用工作流,并使用启动计划提供的输入参数。
要为启动计划添加调度方案,请按以下方式向启动计划添加调度对象:
from datetime import timedeltaimport flytekit as fl
from flytekit import FixedRate@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:return my_task(a=a, b=b, c=c)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=FixedRate(duration=timedelta(minutes=10))
)
这里我们指定了FixedRate调度方案,系统将每10分钟调用一次工作流。固定频率调度也可以使用天数或小时数来定义。
或者,您也可以指定CronSchedule:
import flytekit as fl
from flytekit import CronSchedule@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int) -> int:return my_task(a=a, b=b, c=c)fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=CronSchedule(schedule="*/10 * * * *")
)
kickoff_time_input_arg
FixedRate
和CronSchedule
都可以接受名为kickoff_time_input_arg
的可选参数。
该参数用于指定工作流输入参数的名称。每次系统通过此调度调用工作流时,调用的时间将通过指定的参数传递给工作流。例如:
from datetime import datetime, timedeltaimport flytekit as fl
from flytekit import FixedRate@fl.task
def my_task(a: int, b: int, c: int) -> int:return a + b + c@fl.workflow
def my_workflow(a: int, b: int, c: int, kickoff_time: datetime ) -> str:return f"sum: {my_task(a=a, b=b, c=c)} at {kickoff_time}"fl.LaunchPlan.get_or_create(workflow=my_workflow,name="my_workflow_custom_lp",fixed_inputs={"a": 3},default_inputs={"b": 4, "c": 5},schedule=FixedRate(duration=timedelta(minutes=10),kickoff_time_input_arg="kickoff_time")
)
在此示例中,每次调度调用my_workflow
时,调用时间都会通过kickoff_time
参数传递。
激活与停用
您可以为启动计划设置激活/停用状态。具体规则如下:
-
在具有相同名称的启动计划版本中,最多只能有一个版本处于激活状态,其他所有版本均为停用状态
-
若激活包含执行计划的启动计划版本,其关联的执行计划也会自动激活,工作流将按照该计划自动触发
-
当包含执行计划的启动计划版本处于停用状态时,其关联的执行计划也会停用,不会用于触发工作流程
未关联执行计划的启动计划也可以设置激活版本。对于此类非计划型启动计划,激活状态可作为版本标识,用于区分不同版本。例如,管理逻辑可依据此状态决定使用哪个版本进行新调用。
新注册的启动计划首个版本默认为停用状态。若该版本包含执行计划,该计划同样处于停用状态。一旦激活,该版本将保持激活状态,即使后续注册新版本也不会改变其状态。
包含执行计划的启动计划版本可通过以下方式激活:用户界面、uctl
命令行工具或FlyteRemote
通过用户界面激活/停用启动计划
激活操作步骤:
- 进入启动计划视图
- 点击屏幕右上角的添加激活启动计划按钮:
- 在弹出的模态框中选择要激活的版本:
该列表仅显示包含执行计划的版本。注意同一时间只能激活一个版本(即最多一个执行计划)。
选择版本后点击更新即可激活该版本及其执行计划。系统将根据执行计划定期触发工作流。
注意:
- 未关联执行计划的启动计划无法通过UI激活
- UI不支持管理无执行计划的启动计划状态,需使用
uctl
或FlyteRemote
停用操作步骤:
- 定位到包含激活计划的启动计划
- 点击Active launch plan旁的**…**图标
- 选择"停用"选项:
- 在确认模态框中完成停用操作
注意:
- 未关联执行计划的启动计划无法通过UI停用
- 需使用
uctl
或FlyteRemote
管理无执行计划的启动计划状态
使用uctl
命令行工具管理启动计划状态
激活命令:
$ uctl update launchplan \--activate \--project <project-id> \--domain <domain> \<launch-plan-name> \--version <launch-plan-version>
停用命令:
$ uctl update launchplan \--deactivate \--project <project-id> \--domain <domain> \<launch-plan-name> \--version <launch-plan-version>
详细说明请参考Uctl CLI文档
使用Python的FlyteRemote
管理启动计划状态
激活示例代码:
from union.remote import FlyteRemote
from flytekit.configuration import Configremote = FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>).id
remote.client.update_launch_plan(launch_plan.id, "ACTIVE")
停用示例代码:
from union.remote import FlyteRemote
from flytekit.remote import Configremote = FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(ame=<launch-plan-name>, version=<launch-plan-version>)
remote.client.update_launch_plan(launch_plan.id, "INACTIVE")
运行启动计划
在用户界面中运行启动计划
要调用启动计划,请进入工作流列表,选择目标工作流,点击启动工作流。在新执行对话框中,从启动计划下拉菜单中选择目标启动计划,然后点击启动。
使用 uctl
命令行运行启动计划
要通过命令行调用启动计划,首先生成启动计划的执行规范文件:
$ uctl get launchplan \--project <project-id>--domain <domain> \<launch-plan-name> \--execFile <execution-spec-file-name>.yaml
然后使用以下命令执行启动计划:
$ uctl create execution \--project <project-id> \--domain <domain> \--execFile <execution-spec-file-name>.yaml
更多细节请参阅 Uctl CLI。
使用 FlyteRemote
在 Python 中运行启动计划
以下代码使用 FlyteRemote
执行启动计划:
import flytekit as fl
from flytekit.remote import Configremote = fl.FlyteRemote(config=Config.auto(), default_project=<project-id>, default_domain=<domain>)
launch_plan = remote.fetch_launch_plan(name=<launch-plan-name>, version=<launch-plan-version>)
remote.execute(launch_plan, inputs=<inputs>)
更多细节请参阅 FlyteRemote。
子启动计划
上述调用示例假设您希望将启动计划作为项目中的顶级实体运行。但您也可以从_工作流内部_调用启动计划,创建_子启动计划_。这将使被调用的启动计划触发其对应工作流,并向该工作流传递指定的参数。
这与子工作流的情况不同——当您在一个工作流函数中调用另一个工作流函数时,子工作流会成为父工作流执行图的一部分,并共享相同的 execution ID 和执行上下文。而调用子启动计划时,会启动一个完整的顶级工作流,该工作流拥有独立的 execution ID 和执行上下文。
更多细节请参阅子工作流与子启动计划。
引用启动计划
引用启动计划是指引用先前已定义、序列化并注册的启动计划。您可以跨项目引用其他启动计划,创建使用他人声明的启动计划的工作流。
创建引用启动计划时,请务必验证工作流接口是否与引用工作流的接口一致。
引用启动计划无法在本地运行。若需本地测试,请使用模拟实现。
示例
本例演示如何为Flytesnacks仓库中的simple_wf
工作流创建引用启动计划。
-
克隆Flytesnacks仓库:
git clone git@github.com:flyteorg/flytesnacks.git
-
进入
basics
目录:cd flytesnacks/examples/basics
-
注册
simple_wf
工作流:pyflyte register --project flytesnacks --domain development --version v1 basics/workflow.py.
-
创建
simple_wf_ref_lp.py
文件并复制以下代码:import flytekit as fl from flytekit import reference_launch_plan@reference_launch_plan(project="flytesnacks",domain="development",name="basics.workflow.simple_wf",version="v1", )def simple_wf_lp(x: list[int], y: list[int] ) -> float:return 1.0@fl.workflow def run_simple_wf() -> float:x = [-8, 2, 4]y = [-2, 4, 7]return simple_wf_lp(x=x, y=y)
-
注册
run_simple_wf
工作流:pyflyte register simple_wf_ref_lp.py
-
在Flyte UI中运行
run_simple_wf
工作流
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。