当前位置: 首页 > ops >正文

实战分享:DolphinScheduler 中 Shell 任务环境变量最佳配置方式

在使用 Apache DolphinScheduler 编排任务的过程中,Shell 类型任务是最常见的任务类型之一。然而,很多用户在实际使用中都会遇到一个看似简单却常常引发问题的问题——环境变量怎么设置才有效?

如果你也曾经因为任务执行环境不一致、找不到命令路径、引用变量失败等问题而抓狂,那么这篇文章将为你拨开迷雾。本文将深入解析 DolphinScheduler 中 Shell 任务的环境变量设置机制,分享几种常见的配置方式、注意事项以及实战踩坑经验,帮助你高效、稳定地配置任务运行环境。

任务类型总结

  • SHELL任务类型:
    SHELL、JAVA、PYTHON、FLINK、MR、FLINK_STREAM、HIVECLI、SPARK、SEATUNNEL、DATAX、SQOOP、DATA_QUALICY、JUPYTER、MLFLOW、OPENMLDB、DVC、PYTORCH、KUBEFLOW、CHUNJUN、LINKIS

注意 : 所谓的SHELL任务类型,都是对SHELL任务类型进行的封装,说白了底层调用的就是Java ProcessBudiler封装的SHELL。

  • SQL任务类型(JDBC):
    SQL、PROCEDURE

注意 : SQL任务类型其实使用的就是各个DB驱动的JDBC进行的操作。

  • HTTP任务类型:
    HTTP、DINKY、PIGEON(WebSocket)

注意 : HTTP任务类型其实就是访问其OPEN API,进行HttpClient封装调用的操作。

  • 逻辑节点:
    SUB_PROCESS、DEPENDENT、CONDITIONS、SWITHC、DYNAMIC

注意 : 所谓的逻辑节点是虚拟任务,这类任务不会调度到Worker节点上去运行,只会在Master节点作为控制节点。

  • Client任务类型:
    EMR、K8S、DMS、DATA_FACTORY、SAGEMAKER、ZEPPELIN、DATASYNC、REMOTESHELL

注意 : 其实就是调用各个任务的开放的Client进行任务的封装。

Shell任务怎么配置环境变量呢?

因为可能涉及到一个组件的不同的版本的客户端,比如说Spark2、Spark3。还有就是针对不同集群的不同客户端,比如说集群1的Spark3客户端和集群2的Spark客户端。 像这样的需求,怎么在dolphinscheduler中进行配置呢?或者说有几种配置方式呢?

两种方式 : 1、通过task不同的环境变量 2、默认的环境变量

1. 通过task不同的环境变量

安全中心 -> 环境管理
1

任务中引用
2

默认的环境变量

common.properties

# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
shell.env_source_list=/etc/profile
# The interceptor type of Shell task, e.g. bash, sh, cmd
shell.interceptor.type=bash

org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory

public class ShellInterceptorBuilderFactory {private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");@SuppressWarnings("unchecked")public static IShellInterceptorBuilder newBuilder() {// TODO 默认的走的是这个逻辑if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {return new BashShellInterceptorBuilder();}if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {return new ShShellInterceptorBuilder();}if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {return new CmdShellInterceptorBuilder();}throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);}}

org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder

public class BashShellInterceptorBuilderextendsBaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {@Overridepublic BashShellInterceptorBuilder newBuilder() {return new BashShellInterceptorBuilder();}@Overridepublic BashShellInterceptor build() throws FileOperateException, IOException {// TODO 这里是生成shell脚本的核心点generateShellScript();List<String> bootstrapCommand = generateBootstrapCommand();// TODO 实例化BashShellInterceptorreturn new BashShellInterceptor(bootstrapCommand, shellDirectory);}@Overrideprotected String shellInterpreter() {return "bash";}@Overrideprotected String shellExtension() {return ".sh";}@Overrideprotected String shellHeader() {return "#!/bin/bash";}}

org.apache.dolphinscheduler.plugin.task.api.AbstractCommandExecutor#run

public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,TaskCallBack taskCallBack) throws Exception {TaskResponse result = new TaskResponse();int taskInstanceId = taskRequest.getTaskInstanceId();// todo: we need to use state like JDK Thread to make sure the killed task should not be executediShellInterceptorBuilder = iShellInterceptorBuilder// TODO 设置执行路径.shellDirectory(taskRequest.getExecutePath())// TODO 这里设置shell 名字.shellName(taskRequest.getTaskAppId());// Set system env// TODO 在这里是设置默认的,其实也是可以设置为 /opt/dolphinscheduler/bin/env/dolphinscheduler_env.shif (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {// TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);}// Set custom env// TODO 设置自定义的envif (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {// TODO 向 customEnvScripts 中加入iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());}// Set k8s config (This is only work in Linux)if (taskRequest.getK8sTaskExecutionContext() != null) {iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());}// Set sudo (This is only work in Linux)// TODO 设置sudo为true的模式iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());// Set tenant (This is only work in Linux)// TODO 设置租户iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());// Set CPU Quota (This is only work in Linux)if (taskRequest.getCpuQuota() != null) {iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());}// Set memory Quota (This is only work in Linux)if (taskRequest.getMemoryMax() != null) {iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());}IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();// TODO 使用ProcessBuilder进行执行,支持sudo模式,和bash模式process = iShellInterceptor.execute();// parse process output// TODO 这里解析到进程的输出parseProcessOutput(this.process);// collect pod logcollectPodLogIfNeeded();int processId = getProcessId(this.process);result.setProcessId(processId);// cache processIdtaskRequest.setProcessId(processId);// print process idlog.info("process start, process id is: {}", processId);// if timeout occurs, exit directlylong remainTime = getRemainTime();// update pid before waiting for the run to finishif (null != taskCallBack) {// TODO 这里其实就是更新任务实例西悉尼taskCallBack.updateTaskInstanceInfo(taskInstanceId);}// waiting for the run to finishboolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);TaskExecutionStatus kubernetesStatus =ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());if (taskOutputFuture != null) {try {// Wait the task log process finished.taskOutputFuture.get();} catch (ExecutionException e) {log.error("Handle task log error", e);}}if (podLogOutputFuture != null) {try {// Wait kubernetes pod log collection finishedpodLogOutputFuture.get();// delete pod after successful execution and log collectionProcessUtils.cancelApplication(taskRequest);} catch (ExecutionException e) {log.error("Handle pod log error", e);}}// if SHELL task exitif (status && kubernetesStatus.isSuccess()) {// SHELL task stateresult.setExitStatusCode(this.process.exitValue());} else {log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",taskRequest.getTaskTimeout());result.setExitStatusCode(EXIT_CODE_FAILURE);cancelApplication();}int exitCode = this.process.exitValue();String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);return result;}

重点就是:

// Set system env// TODO 在这里是设置默认的,其实也是可以设置为 /opt/dolphinscheduler/bin/env/dolphinscheduler_env.shif (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {// TODO 这里其实就是向 systemEnvs 中加入ENV_SOURCE_LIST中配置的环境文件的列表ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);}// Set custom env// TODO 设置自定义的envif (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {// TODO 向 customEnvScripts 中加入iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());}

其实就是说自定的环境变量是可以覆盖默认的环境变量的。

转载自Journey
原文链接:https://segmentfault.com/a/1190000044954252

http://www.xdnf.cn/news/9148.html

相关文章:

  • K8s边缘集群赋能工业自动化:从传感器监控到智能决策的全流程升级
  • Pic手机拼图软件:创意拼图,轻松上手
  • React JSX语法介绍(JS XML)(一种JS语法扩展,允许在JS代码中编写类似HTML的标记语言)Babel编译
  • ​扣子Coze飞书多维表插件-查询数据
  • 【无标题】使用JEasyOpc开发OPCDA采集中间件
  • Lua中的`self`参数:揭秘隐藏的“对象上下文”
  • 1992-2021年各省工业增加值数据(无缺失)
  • Linux的五种IO模型
  • Rust语言学习教程、案例与项目实战指引
  • c/c++的opencv双边滤波
  • 八大员-质量员考试复习资料有哪些?
  • 【Marp】自定义主题 - box01
  • Kotlin 实战:Android 设备语言与国家地区的 5 种获取方式
  • Playwright 常用命令、参数详解及使用示例
  • 精益数据分析(88/126):从营收平衡到规模化扩张——企业增长的最后一道关卡
  • 如何保护网络免受零日漏洞攻击?
  • php 实现基数排序
  • 编程规范Summary
  • ASP.NET Web Forms框架识别
  • 【论文精读】2024 arXiv --VEnhancer现实世界视频超分辨率(RealWorld VSR)
  • 【数据结构】——二叉树堆(下)
  • Windows系统下 NVM 安装 Node.js 及版本切换实战指南
  • 什么是 WPF 技术?什么是 WPF 样式?下载、安装、配置、基本语法简介教程
  • 云效流水线Flow使用记录
  • 论文阅读笔记——Step1X-Edit: A Practical Framework for General Image Editing
  • Oracle 正则表达式匹配(Oracle 11g)
  • Rockey Linux 安装ffmpeg
  • 抖音不获取位置会显示ip属地吗?全面解析
  • AWS EC2 实例告警的创建与删除
  • some面试题2