首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Airflow -2警报已发送on_failure

在 Apache Airflow 中,您可以配置任务在失败时发送警报。通常,您可以使用 Airflow 的 on_failure_callback 参数来实现这一点。以下是一个详细的示例,展示如何在任务失败时发送电子邮件警报。

1. 配置 SMTP 服务器

首先,确保您的 Airflow 配置文件 (airflow.cfg) 中已正确配置 SMTP 服务器。以下是一个示例配置:

代码语言:javascript
复制
[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = your_email@example.com
smtp_password = your_password
smtp_port = 587
smtp_mail_from = your_email@example.com

2. 创建自定义失败回调函数

接下来,创建一个自定义的失败回调函数,该函数将在任务失败时发送电子邮件警报。

代码语言:javascript
复制
from airflow.utils.email import send_email
from airflow.hooks.base_hook import BaseHook

def task_failure_alert(context):
    dag_id = context.get('dag').dag_id
    task_id = context.get('task').task_id
    execution_date = context.get('execution_date')
    log_url = context.get('task_instance').log_url

    subject = f"Airflow alert: {dag_id}.{task_id} Failed"
    html_content = f"""
    <h3>Task Failed</h3>
    <p><strong>Dag:</strong> {dag_id}</p>
    <p><strong>Task:</strong> {task_id}</p>
    <p><strong>Execution Time:</strong> {execution_date}</p>
    <p><strong>Log URL:</strong> <a href="{log_url}">{log_url}</a></p>
    """

    send_email('alert@example.com', subject, html_content)

3. 将失败回调函数添加到任务中

在您的 DAG 文件中,将自定义的失败回调函数添加到任务中。

代码语言:javascript
复制
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'on_failure_callback': task_failure_alert
}

dag = DAG(
    'example_failure_alert',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2023, 1, 1),
    catchup=False,
)

def failing_task():
    raise ValueError("This task is supposed to fail.")

start = DummyOperator(
    task_id='start',
    dag=dag,
)

fail = PythonOperator(
    task_id='fail',
    python_callable=failing_task,
    dag=dag,
)

start >> fail

4. 验证配置

确保您的 Airflow 实例正在运行,并且 DAG 已正确加载。您可以通过 Airflow Web 界面手动触发 DAG 以验证电子邮件警报是否在任务失败时发送。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

没有搜到相关的沙龙

领券