Airflow docs说:You can use Jinja templating with every parameter that is marked as “templated” in the documentation
。气流自动模板化气流世界中的特定参数(例如PythonOperator
的某些参数)是有意义的。我想知道最好的/正确的方法是让一个非气流变量模板化。我的具体用例类似于:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from somewhere import export_votes_data, export_queries_data
from elsewhere import ApiCaucus, ApiQueries
dag = DAG('export_training_data',
description='Export training data for all active orgs to GCS',
schedule_interval=None,
start_date=datetime(2018, 3, 26), catchup=False)
HOST = "http://api-00a.dev0.solvvy.co"
BUCKET = "gcs://my-bucket-name/{{ ds }}/" # I'd like this to get templated
votes_api = ApiCaucus.get_votes_api(HOST)
queries_api = ApiQueries.get_queries_api(HOST)
export_votes = PythonOperator(task_id="export_votes", python_callable=export_votes_data,
op_args=[BUCKET, votes_api], dag=dag)
export_queries = PythonOperator(task_id="export_queries", python_callable=export_query_data,
op_args=[BUCKET, queries_api, export_solutions.task_id], dag=dag,
provide_context=True)
发布于 2018-03-29 15:32:49
PythonOperator
的provide_context
参数将传递用于模板的参数。来自the documentation
Airflow provide_context (bool) -如果设置为true,
将传递一组可在函数中使用的关键字参数。这组kwargs与您可以在jinja模板中使用的kwargs完全对应。为了让它工作,你需要在你的函数头中定义**kwargs。
有了提供给callable的上下文,您就可以在函数中进行插值了:
def your_callable(bucket, api, **kwargs):
bucket = bucket.format(**kwargs)
[...]
发布于 2018-08-07 03:08:33
在操作员的方法(execute/pre_execute/post_execute,以及可以获得Airflow context
的任何地方)中:
BUCKET = "gcs://my-bucket-name/{{ ds }}/" # I'd like this to get templated
jinja_context = context['ti'].get_template_context()
rendered_content = self.render_template('', BUCKET, jinja_context)
https://stackoverflow.com/questions/49538414
复制