我在这里(https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_modules/airflow/providers/google/cloud/operators/bigquery.html#BigQueryGetDataOperator)阅读了Airflow的作业操作员文档,但我找不到如何将作业优先级更改为BigQuery。如何做到这一点呢?
发布于 2021-06-23 07:19:51
BigQueryExecuteQueryOperator
具有可使用INTERACTIVE
/BATCH
设置的priority参数,缺省值为INTERACTIVE
execute_insert_query = BigQueryExecuteQueryOperator(
task_id="execute_insert_query",
sql=INSERT_ROWS_QUERY,
use_legacy_sql=False,
location=location,
priority='BATCH',
)
BigQueryInsertJobOperator
没有。我认为您可以创建一个继承自BigQueryInsertJobOperator
的自定义运算符,并通过覆盖_submit_job
函数来添加它:
class MyBigQueryInsertJobOperator(BigQueryInsertJobOperator):
def __init__(
self,
priority: str = 'INTERACTIVE',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.priority = priority
def _submit_job(
self,
hook: BigQueryHook,
job_id: str,
) -> BigQueryJob:
# Submit a new job
job = hook.insert_job(
configuration=self.configuration,
project_id=self.project_id,
location=self.location,
job_id=job_id,
priority=self.priority,
)
# Start the job and wait for it to complete and get the result.
job.result()
return job
虽然我没有测试,但它应该可以工作。
https://stackoverflow.com/questions/68093454
复制相似问题