一、相同任务不同参数并列执行
最近几周一直在折腾 Airflow ,本周在写一个流水线任务,分为 4 个步骤,第一步会读取数据库 db ,然后是对读取的数据根据某个数据指标进行分组处理,同一个任务接收多组数据参数并列执行任务,并发执行提高任务的执行效率,流程执行如下:
在代码上,任务函数返回一个列表 list ,下一个任务接收参数使用 expand
任务执行顺序没有变化,还是串行执行。
Airflow 的 Web 页面上的体现:
这样的话,一个人任务就对应一个 MAP INDEX。
二、任务之间实现信息共享
一个 Dag 中在可能会包含多个调度任务,这些任务之间可能需要实现信息共享,即怎么把 task A 执行得到的结果传递给 task B,让 task B 可以基于 task A 的结果进行后续操作。XCom 就是给出的答案。
XCom 是 cross-communication 的缩写。它被设计于用来在 Airflow 各个 task 间进行数据共享。XCom 的本质就是把 task 需要传递的信息以 KV 的形式存到 DB 中,而其他 task 则可以从DB中获取。由于XCom是存在DB而不是内存中,这也说明了对于已经执行完的 DAG,如果重跑其中某个 task 的话依然可以获取到同次DAG运行时其他task传递的内容。
XCom 存储的是 KV 形式的数据对,Airflow 包装了 xcom_push 和 xcom_pull 两个方法,可以方便进行存取操作。如果没有特殊的需求,我们只需关注里面的key和value 这两个参数即可。其他参数 Airflow 会根据 task 的上下文自动添加。看个 PythonOperator 的例子更能说明:
def push_data(**context):
context['ti'].xcom_push(key='test_key', value='test_val')
push_data_op = PythonOperator(
task_id = 'push_data',
python_callable = push_data,
provide_context=True,
dag = dag
)
def pull_data(**context):
test_data = context['ti'].xcom_pull(key='test_key')
pull_data_op = PythonOperator(
task_id = 'pull_data',
python_callable = pull_data,
provide_context=True,
dag = dag
)
push_data_op >> pull_data_op
上面的代码就在 push_data和 pull_data 两个任务中传递了key='test_key', value='test_val'这一条数据。更方便的是,这里的value不仅限于str类型,这就提供了更大的自由度。注意,在opreator中必须要有provide_context=True,才能在operator内部通过context['ti'](获得当前 task 的 TaskInstance ,进行XCom push/pull的相关操作。
注意:
如果 Airflow 部署在 k8s 上,就建议不要使用 xcom ,在 K8s 中运行自定义 XCom 后端会给 Airflow 部署带来更多的复杂性。可以把任务输出的结果保存到数据库 DB 中,本质上和使用 xcom 是一样的。
本文分享自 pythonista的日常 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!