在 Apache Airflow 中,您可以配置任务在失败时发送警报。通常,您可以使用 Airflow 的 on_failure_callback
参数来实现这一点。以下是一个详细的示例,展示如何在任务失败时发送电子邮件警报。
首先,确保您的 Airflow 配置文件 (airflow.cfg
) 中已正确配置 SMTP 服务器。以下是一个示例配置:
[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = your_email@example.com
smtp_password = your_password
smtp_port = 587
smtp_mail_from = your_email@example.com
接下来,创建一个自定义的失败回调函数,该函数将在任务失败时发送电子邮件警报。
from airflow.utils.email import send_email
from airflow.hooks.base_hook import BaseHook
def task_failure_alert(context):
dag_id = context.get('dag').dag_id
task_id = context.get('task').task_id
execution_date = context.get('execution_date')
log_url = context.get('task_instance').log_url
subject = f"Airflow alert: {dag_id}.{task_id} Failed"
html_content = f"""
<h3>Task Failed</h3>
<p><strong>Dag:</strong> {dag_id}</p>
<p><strong>Task:</strong> {task_id}</p>
<p><strong>Execution Time:</strong> {execution_date}</p>
<p><strong>Log URL:</strong> <a href="{log_url}">{log_url}</a></p>
"""
send_email('alert@example.com', subject, html_content)
在您的 DAG 文件中,将自定义的失败回调函数添加到任务中。
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': task_failure_alert
}
dag = DAG(
'example_failure_alert',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
)
def failing_task():
raise ValueError("This task is supposed to fail.")
start = DummyOperator(
task_id='start',
dag=dag,
)
fail = PythonOperator(
task_id='fail',
python_callable=failing_task,
dag=dag,
)
start >> fail
确保您的 Airflow 实例正在运行,并且 DAG 已正确加载。您可以通过 Airflow Web 界面手动触发 DAG 以验证电子邮件警报是否在任务失败时发送。
领取专属 10元无门槛券
手把手带您无忧上云