我有一个项目的烧烤,包括发送报告的csv格式的基础上,每天。这些报告是对bigquery查询的结果,然后压缩为csv并邮寄。
使用以下问题的实现来解决我的问题。How to run a BigQuery query and then send the output CSV to Google Cloud Storage in Apache Airflow?
现在,我正试图改变这个实现。
原因是:
尝试使用来自bigquery_hook的“bigquery_hook”,然后将结果通过xcom传递到另一个将负责压缩到csv的任务。由于大船的沉重,这是不可能的。
你知道怎么直接做吗?
发布于 2021-10-25 15:08:19
在气流中,使用现有的操作人员以及编写自己的操作程序都是同样容易的。这都是Python。对于外部服务,气流有两层方法--它有操作符(每个操作员只做一个操作)和钩子(这是一个超级容易使用的接口,提供API与外部服务的通信。
在您的情况下,您应该使用多个钩子来创建自己的操作符,而不是组合现有的运算符。例如,一个钩子可以将数据读取到熊猫的框架中,然后用Python代码提取数据,你可以将其附加到邮件中,然后使用util中的'send_email‘来发送电子邮件(发送电子邮件没有单独的钩子,因为发送电子邮件也是气流核心的一个标准功能)。您可以查看EmailOperator代码,了解如何使用send_email,以及如何使用BigQueryHook的BigQueryOperators。
你可以用两种方式来做:
class MyOperator(BaseOperator):
__init__.....
def execute():
bq_hook = BigQueryHook(.....)
... do stuff ...
send_email(....)
@dag(...)
def my_dag():
@task()
def read_data_and_send_email():
bq_hook = BigQueryHook(.....)
... do stuff ...
send_email(....)
任务流程是我认为更适合您的需求:请参见http://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html
https://stackoverflow.com/questions/69708255
复制相似问题