我们需要在创建dag实例时传递参数,每个任务都可以从任务实例中获取需要的参数。
$ airflow trigger_dag -h
[2017-04-14 18:47:28,576] {__init__.py:57} INFO - Using executor CeleryExecutor
usage: airflow trigger_dag [-h] [-sd SUBDIR] [-r RUN_ID] [-c CONF]
[-e EXEC_DATE]
dag_id
positional arguments:
dag_id The id of the dag
optional arguments:
-h, --help show this help message and exit
-sd SUBDIR, --subdir SUBDIR
File location or directory from which to look for the
dag
-r RUN_ID, --run_id RUN_ID
Helps to identify this run
-c CONF, --conf CONF JSON string that gets pickled into the DagRun's conf
attribute
-e EXEC_DATE, --exec_date EXEC_DATE
The execution date of the DAG
我们把json格式的字符串参数 '{"foo":"bar"}' 传递给DAG实例,如下
airflow trigger_dag example_passing_params_via_test_command -c '{"foo":"bar"}'
def my_py_command(ds, **kwargs):
logging.info(kwargs)
logging.info(kwargs.get('dag_run').conf.get('foo'))
# Print out the "foo" param passed in via
# `airflow test example_passing_params_via_test_command run_this <date>
# -tp '{"foo":"bar"}'`
if kwargs["test_mode"]:
print(" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
logging.info(" 'foo' was passed in via test={} command : kwargs[params][foo] \
= {}".format(kwargs["test_mode"], kwargs["params"]["foo"]))
# Print out the value of "miff", passed in below via the Python Operator
print(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
logging.info(" 'miff' was passed in via task params = {}".format(kwargs["params"]["miff"]))
return 1
my_templated_command = """
echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} "
echo " 'miff was passed in via BashOperator with value {{ params.miff }} "
"""
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=my_py_command,
params={"miff":"agg"},
dag=dag)
包含logging的代码部分就是获取参数的地方
每个DAG 实例都有一个上下文的概念,以context参数的形式会透传给所有的任务,以及所有任务的回调函数。
context的内容如下:
{
u'next_execution_date': None,
u'dag_run': <DagRun example_passing_params_via_test_command @ 2017-04-14 18:28:07: manual__2017-04-14T18:28:07, externally triggered: True>,
u'tomorrow_ds_nodash': u'20170415',
u'run_id': 'manual__2017-04-14T18:28:07',
u'dag': <DAG: example_passing_params_via_test_command>,
u'prev_execution_date': None,
u'conf': <module 'airflow.configuration' from '/usr/local/lib/python2.7/site-packages/airflow-1.8.0-py2.7.egg/airflow/configuration.pyc'>,
u'tables': None,
u'task_instance_key_str': u'example_passing_params_via_test_command__run_this__20170414',
u'END_DATE': '2017-04-14',
u'execution_date': datetime.datetime(2017, 4, 14, 18, 28, 7),
u'ts': '2017-04-14T18:28:07',
u'macros': <module 'airflow.macros' from '/usr/local/lib/python2.7/site-packages/airflow-1.8.0-py2.7.egg/airflow/macros/__init__.pyc'>,
u'params': {'miff': 'agg'},
u'ti': <TaskInstance: example_passing_params_via_test_command.run_this 2017-04-14 18:28:07 [running]>,
u'var': {u'json': None, u'value': None},
u'ds_nodash': u'20170414',
u'test_mode': False,
u'end_date': '2017-04-14',
'templates_dict': None,
u'task': <Task(PythonOperator): run_this>,
u'task_instance': <TaskInstance: example_passing_params_via_test_command.run_this 2017-04-14 18:28:07 [running]>,
u'yesterday_ds_nodash': u'20170413',
u'latest_date': '2017-04-14',
u'yesterday_ds': '2017-04-13',
u'ts_nodash': u'20170414T182807',
u'tomorrow_ds': '2017-04-15'
}
可以看到上下文中包含了dag_run的值
实例参数使用pickle序列化存储在dag_run表中
字段类型如下
conf = Column(PickleType)
在执行PythonOperator时,会将上下文context参数,传递给回调函数中的self.op_kwargs
class PythonOperator(BaseOperator):
template_fields = ('templates_dict',)
template_ext = tuple()
ui_color = '#ffefeb'
@apply_defaults
def __init__(
self,
python_callable,
op_args=None,
op_kwargs=None,
provide_context=False,
templates_dict=None,
templates_exts=None,
*args, **kwargs):
super(PythonOperator, self).__init__(*args, **kwargs)
self.python_callable = python_callable
self.op_args = op_args or []
self.op_kwargs = op_kwargs or {}
self.provide_context = provide_context
self.templates_dict = templates_dict
if templates_exts:
self.template_ext = templates_exts
def execute(self, context):
if self.provide_context:
context.update(self.op_kwargs)
context['templates_dict'] = self.templates_dict
self.op_kwargs = context
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
logging.info("Done. Returned value was: " + str(return_value))
return return_value
注意execute函数的context参数,当self.provide_context为True时,可以对上下文参数进行扩展
并将扩展后的self.op_kwargs传递给执行回调函数
在执行Operator时,就可以从上下文实例中获取DagRun实例
kwargs.get('dag_run')
再从DagRun实例中获取conf参数,值为json对象类型
dag_run_conf = kwargs.get('dag_run').conf
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。