当前位置: 首页 > news >正文

Airflow全局异常捕获实现消息通知实践

之前我们有讲到如何基于Helm部署Airflow, 为了实现定时任务异常实时捕获通知,我们改造values.yaml文件, 主要是覆盖/opt/airflow/config/airflow_local_settings.py文件

airflowVersion: 2.5.3
defaultAirflowTag: 2.5.3
config:core:dags_folder: /opt/airflow/dags/repo/dagshostname_callable: airflow.utils.net.get_host_ip_address
env: - name: "docker_conn_id"value: "docker://admin:*****@http://repo.*****.com"
airflowLocalSettings: |-import sysimport loggingimport osfrom sfxs_utils import dingutilsfrom airflow.models.dag import DAGfrom functools import wrapsdef send_dingding_alert(context=None, exc_type=None, exc_value=None, exc_traceback=None):"""Send DingTalk notification for task failures or global exceptions"""try:if context:  # Task failure scenariotask_instance = context.get('task_instance')url = task_instance.log_url.replace("http://localhost:8080", "https://airflow.******.com")message = f"【airflow】 任务{task_instance.task_id}错误,请点击{url}查看!"else:  # Global exception scenariomessage = f"【Airflow Global Error】 发生全局异常: {exc_type.__name__}: {exc_value}\n" \f"请检查 Airflow 服务器日志以获取详细信息!"print(message)token = os.getenv("DINGDING_TOKEN", "ae14f")# 自定义的发送工具类dingutils.send_text(message, token)print("success")except Exception as e:logging.error(f"Failed to send DingTalk notification: {e}")# Monkey-patch DAG to set default on_failure_callbackoriginal_init = DAG.__init__@wraps(original_init)def patched_init(self, *args, **kwargs):original_init(self, *args, **kwargs)if 'default_args' not in kwargs:kwargs['default_args'] = {}if 'on_failure_callback' not in kwargs['default_args']:kwargs['default_args']['on_failure_callback'] = send_dingding_alertself.default_args = kwargs['default_args']DAG.__init__ = patched_init# Global exception handlingoriginal_excepthook = sys.excepthookdef custom_excepthook(exc_type, exc_value, exc_traceback):send_dingding_alert(exc_type=exc_type, exc_value=exc_value, exc_traceback=exc_traceback)original_excepthook(exc_type, exc_value, exc_traceback)sys.excepthook = custom_excepthook
http://www.xdnf.cn/news/106921.html

相关文章:

  • LeetCode-46. 全排列
  • 洛谷P3196C语言题解
  • PHP CURL发送POST请求(支持HEADER参数配置)
  • Kubernetes 集群内访问外部服务的三种实践方案
  • 软件工程的13条“定律”:从Hyrum定律到康威定律,再到Zawinski定律
  • 锤子线,买入准确概率是多少
  • leetcode-数组
  • Retrofit框架分析(二):注解、反射以及动态代理,Retrofit框架动态代理的源码分析
  • bert学习
  • AIGC的伦理困境:机器生成内容是否该被监管?
  • 动态脚本引擎QLExpress,实现各种复杂的业务规则
  • 深度学习驱动的车牌识别:技术演进与未来挑战
  • 创建第一个Spring Boot项目
  • pytorch(gpu版本安装)
  • Javase 基础入门 —— 04 继承
  • 数据结构与算法学习笔记(Acwing提高课)----动态规划·数字三角形
  • openssh-10.0p1用于修复CVE-2025-26465、CVE-2025-26466
  • java springBoot 整合 扣子cozeAI 智能体 对话
  • AI 人工智能模型:从理论到实践的深度解析⚡YQW · Studio ⚡【Deepseek】【Chat GPT】
  • python函数与模块
  • PyCharm 链接 Podman Desktop 的 podman-machine-default Linux 虚拟环境
  • YOLO学习笔记 | 从YOLOv5到YOLOv11:技术演进与核心改进
  • JVM学习笔记
  • Spark论述及其作用
  • 五、实现隐藏(Hiding the Implementation)
  • 记录一次OGG进程abended,报错OGG-01431、OGG-01003、OGG-01151、OGG-01296问题的处理
  • Windows 同步技术-一次性初始化
  • Discuz!与DeepSeek的AI融合:打造智能网址导航新体验——以“虎跃办公”为例
  • 15.FineReport动态展示需要的列
  • 运维案例:让服务器稳定运行,守护业务不掉线!