我有以下代码:
def chunck_import(**kwargs):
...
logging.info('Number of pages required is: {0}'.format(num_pages))
for i in range(1, num_pages + 1):
...
parameter_where = 'where orders_id between {0} and {1}'.format(start,end)
logging.info(parameter_where)
chunck_import_op = PythonOperator(
task_id='chunck_import',
provide_context=True,
python_callable=chunck_import,
dag=dag)
start_task_op >> ... >> chunck_import_op
此操作符创建多个WHERE
语句:
INFO - From 7557920 to 7793493
INFO - Number of pages required is: 4
where orders_id between 7607920 and 7657920
where orders_id between 7657921 and 7707920
where orders_id between 7707921 and 7757920
where orders_id between 7757921 and 7793493
现在我有一个MySqlToGoogleCloudStorageOperator
,如下所示:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
...
sql = 'select * from orders {{ params.where_cluster }}',
params={'where_cluster': parameter_where},
dag=dag)
chunck_import_op
知道我需要调用MySqlToGoogleCloudStorageOperator
- num_pages
的次数,它还创建了需要作为参数- parameter_where
传递的字符串。
我的问题是如何根据MySqlToGoogleCloudStorageOperator
动态创建num_pages
并将parameter_where
传递给它。
发布于 2018-10-11 19:47:09
我将子类MySqlToGoogleCloudStorageOperator
自定义查询,并覆盖执行步骤,以根据传递给操作符的页面大小参数生成分页查询。这是一些额外的工作,但建议在这里的其他选项。
但是,您不能使用PythonOperator
或任何运算符来修改DAG (并对其进行识别和调度)。它所能做的最多就是:
MySqlToGoogleCloudStorageOperator
,并在PythonOperator
中对其调用execute。这将有效,您将在MySqlToGoogleCloudStorageOperator
的日志中看到来自PythonOperator
的日志消息。PythonOperator
或TriggerDagRunOperator
触发另一个DAG,其中只有MySqlToGoogleCloudStorageOperator
将子句作为参数传递,或者首先将其推送给XCOM。其他DAG可能会将计划设置为@None
。这将使日志的后续工作更加困难,但它可以并行运行DAG。如果是我的DAG,我认为我的方法(如果不是子类化的话)总是在1到X页中处理。让我们建议您的DAG应该处理最多X页的结果,其中X是10,例如。然后从chunck_import_op
的父级定义10个分支。您不需要chunck_import_op
或可调用的分支。
ShortCircuitOperator
开始,它使用不同的offset
参数(0到9)调用相同的可调用函数。此可调用将检查offset * page_size
是否大于end
,如果是,则返回False
,跳过下游运算符。否则,它将向xcom推送一个基于偏移量范围的有效查询,并返回True
来运行它们。MySqlToGoogleCloudStorageOperator
,其中将查询设置为{{ ti.xcom_pull('<ShortCircuitOperator_N>') }}
,其中字符串是前面ShortCircuitOperator
的名称。MySqlToGoogleCloudStorageOperator
第一次添加DummyOperator
作为所有这些MySqlToGoogleCloudStorageOperator
s的子操作符并生成trigger_rule
ALL_DONE
之后需要其他操作符,那么添加其他操作符作为该操作符的子操作符。通过这种方式,您可以在必要时运行1至10个分页查询。他们可能会并行运行,但我不认为这是一个潜在的问题,只是考虑一下。
发布于 2018-10-11 04:33:18
气流为任务(操作者)之间的通信提供了XComs机制。在您的具体场景中,chunck_import
任务可以先预计算所有where子句并将它们推入XCom;然后import_orders
任务可以拉出XCom,读取所有where子句,并根据需要使用它们。
如果这个机制不适用于您的应用程序逻辑,那么请修改您的问题并解释为什么不行。
https://stackoverflow.com/questions/52756378
复制