Airflow全局异常捕获实现消息通知实践
之前我们有讲到如何基于Helm部署Airflow, 为了实现定时任务异常实时捕获通知,我们改造values.yaml文件, 主要是覆盖/opt/airflow/config/airflow_local_settings.py文件
airflowVersion: 2.5.3
defaultAirflowTag: 2.5.3
config:core:dags_folder: /opt/airflow/dags/repo/dagshostname_callable: airflow.utils.net.get_host_ip_address
env: - name: "docker_conn_id"value: "docker://admin:*****@http://repo.*****.com"
airflowLocalSettings: |-import sysimport loggingimport osfrom sfxs_utils import dingutilsfrom airflow.models.dag import DAGfrom functools import wrapsdef send_dingding_alert(context=None, exc_type=None, exc_value=None, exc_traceback=None):"""Send DingTalk notification for task failures or global exceptions"""try:if context: # Task failure scenariotask_instance = context.get('task_instance')url = task_instance.log_url.replace("http://localhost:8080", "https://airflow.******.com")message = f"【airflow】 任务{task_instance.task_id}错误,请点击{url}查看!"else: # Global exception scenariomessage = f"【Airflow Global Error】 发生全局异常: {exc_type.__name__}: {exc_value}\n" \f"请检查 Airflow 服务器日志以获取详细信息!"print(message)token = os.getenv("DINGDING_TOKEN", "ae14f")# 自定义的发送工具类dingutils.send_text(message, token)print("success")except Exception as e:logging.error(f"Failed to send DingTalk notification: {e}")# Monkey-patch DAG to set default on_failure_callbackoriginal_init = DAG.__init__@wraps(original_init)def patched_init(self, *args, **kwargs):original_init(self, *args, **kwargs)if 'default_args' not in kwargs:kwargs['default_args'] = {}if 'on_failure_callback' not in kwargs['default_args']:kwargs['default_args']['on_failure_callback'] = send_dingding_alertself.default_args = kwargs['default_args']DAG.__init__ = patched_init# Global exception handlingoriginal_excepthook = sys.excepthookdef custom_excepthook(exc_type, exc_value, exc_traceback):send_dingding_alert(exc_type=exc_type, exc_value=exc_value, exc_traceback=exc_traceback)original_excepthook(exc_type, exc_value, exc_traceback)sys.excepthook = custom_excepthook