我正在将数据从GCS插入到BigQuery,并且不确定如何获得数据何时插入列的当前日期。
这是我的模式:
load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='cloud-samples-data',
source_objects=['SOURCE-FILE-LOCATION'],
destination_project_dataset_table='airflow_test.gcs_to_bq_table',
schema_fields=[
{'name': 'item', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'date', 'type': 'DATE', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag)
所以,在我的模式中,我有item
和date
。
因此,当触发我的DAG将数据从GCS桶插入到BigQuery时,如何使date
列包含插入数据的当前日期?
例如,如果我今天插入它,那么date
列应该是2022-11-24
。
发布于 2022-11-24 16:23:10
可能有两种方法可以达到预期的结果,但这两种方法都不确定。
第一种方法是使用所描述的这里的默认值,并将一列添加到模式中:
schema_fields=[
{'name': 'item', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'date', 'type': 'DATE', 'mode': 'NULLABLE'},
{'name': 'load_date', 'type': 'DATE', 'default': 'CURRENT_DATE'},
]
然而,这是前GA所以不确定你是否可以使用它(而且我还没有测试抱歉)。
另一种可能是使用气流模板技术并增加另一步:
load_csv = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq_example',
bucket='cloud-samples-data',
source_objects=['SOURCE-FILE-LOCATION'],
destination_project_dataset_table='airflow_test.gcs_to_bq_table_{{ ds_nodash }}',
schema_fields=[
{'name': 'item', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'date', 'type': 'DATE', 'mode': 'NULLABLE'},
],
write_disposition='WRITE_TRUNCATE',
dag=dag)
使用此操作,您将在表中获取文件,并在表名中显示摄入日期(如果使用ts_nodash
,则使用时间戳)。然后,您可以使用BigqueryOperator
使用一些SQL将这些阶段性数据插入到目标数据中。
https://stackoverflow.com/questions/74561928
复制相似问题