我最近使用了一点气流,现在我的工具已经启动并运行,我在UI的数据分析部分查看了一些数据。做了一些图表,一些满是数据的表格。我希望能够每周在电子邮件中发送一次这些图表和数据,这样我就可以定期在全球范围内更新我不同的后台运行和任务,而不仅仅是在每次运行结束时发送一封电子邮件,说明成功或失败。
有什么方便的方法吗?或者,我是否需要使用jinja模板、电子邮件操作符和可怕的SQLAlchemy语法来构建一个自定义的守护程序来从数据库中重新提取数据?
发布于 2020-07-06 04:02:55
所以我找到了解决办法。如果有更好的方法,可以随意评论我做事的方式。我很乐意在这里
我创建了一个静态函数来从图表中恢复数据:
from airflow import settings
def get_data_profilling_data( chart_label):
"""
Get an array containing data from a data profilling chart.
Args:
chart_label: a string that is the name of a an available chart in Data Profiling section.
session(None, optional): Not in use.
Returns:
A list of tuple with first element representing the header of column.
"""
import logging
query_filters = [models.Chart.label == chart_label]
session = settings.Session()
# Query database to get a chart object.
chart_object = session.query(models.Chart).filter(*query_filters).first()
# Replace useless caracter in query and replace % by %% so it's not interpreted by SQLAlchemy
# Also add a ; at the end of the statement.
sql="{}{}".format(chart_object.sql.replace('\n', ' ').replace('\r',' ').replace('%','%%'), ';')
query = session.connection().engine.execute(sql)
#recover sql request header.
headers = tuple(query.keys())
#recover all data
data = query.fetchall()
#close database connection
#session.connection().close()
# Add headers to first position in the list
data.insert(0,headers)
return data
一旦提取出来,我就使用了jinja模板:
jinja_str = """<!DOCTYPE html>
<html>
<head>
{% if title %}
<title>{{ title }}</title>
{% else %}
<title>Airflow reports</title>
{% endif %}
</head>
<body>
<table>
{% if rows %}
{% for row in rows %}
{% if loop.index == 1 %}
<!-- table header -->
<tr>{% for elem in row %}
<th>{{ elem }}</th>
{% endfor %}</tr>
{% endif %}
{% if loop.index > 1 %}
<tr>{% for elem in row %}
<td>{{ elem }}</td>
{% endfor %}</tr>
{% endif %}
{% endfor %}
{% endif %}
</table>
</body>
</html>"""
def format_data_from_chart(chart, **kwargs):
data = DagStatic.get_data_profilling_data(chart)
report_template = Template( jinja_str )
report_html = report_template.render({"title": chart,"rows":data})
kwargs["ti"].xcom_push(key='report_html', value=report_html)
这将把用jinja模板呈现的图表的结果推入xcom。我现在可以使用电子邮件操作员发送邮件:
send_report= email_task = EmailOperator(
to='you.mail@mail.com',
task_id='send_report',
subject='Airflow HTML report start_date {{ ds }}',
html_content="{{ ti.xcom_pull(key='report_html') }}",
dag=dag)
希望这能帮到你。如果有更好的方法,可以随意评论我做事的方式。我很乐意在这里
发布于 2020-07-02 04:55:37
据我所知,没有什么“开箱即用”的东西能做你想做的事情;你可能在“我必须构建一个定制的dag…”这个词上是正确的。你问题的一部分。幸运的是,由于您在气流中,您可以利用它的代码库来帮助:
from airflow import models, settings
def python_task(**context):
chart_label = context.params["chart_label"]
query_filters = [models.Chart.label == chart_label]
session = settings.Session()
chart_object = session.query(models.Chart).filter(*query_filters).first()
[...] build and send the email [...]
您可能希望查看气流源代码,特别是电子邮件处理部分和界面呈现。请注意,如果您可以使任务相对抽象,您可以非常容易地重用它。
https://stackoverflow.com/questions/62695490
复制相似问题