我想从google云存储(Bucket) CSV文件中检索数据,并将这些文件中的数据加载到bigquery表中,而不需要重复数据。目标是在性能上拥有一个性能比成本更优的代码。我的当前代码如下:
def load_data_in_BQT():
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("id", "INTEGER"),
bigquery.SchemaField("name", "STRING"),
],
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
autodetect=True,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND, # (Addition of the data (possibility of having duplications)
# write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, # (Formatting of the table and insertion of the new data (Loss of the old data))
)
uri = "gs://mybucket/myfolder/myfile.csv"
load_job = self.client.load_table_from_uri(
uri, self.table_ref["object"], job_config=job_config,
)目前,我的想法是读取熊猫中的CSV文件,以获得数据,加载bigquery表的数据并将其转换为数据,对整个数据进行处理,以删除重复数据,并在最后重新插入(使用选项截断)所有已清除的数据。但是,如果我们有大量的数据,我们必须在桶中的每个新输入文件中加载这些数据,我发现这种方法是有害的。
你有什么建议吗?提前谢谢你
发布于 2022-09-20 08:36:41
您可以将merge查询与Bigquery一起使用:
https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax?hl=en#merge_statement
背后的想法是:
table
Truncate Python脚本导出为在临时表和最终表中摄取数据:如果最终表中不存在元素,则可以插入该元素,否则可以更新它。F 215
合并查询示例:
MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
INSERT (product, quantity) VALUES(product, quantity)例如,像Airflow或Cloud workflow这样的协调器可以很容易地连锁这些步骤。
https://stackoverflow.com/questions/73783400
复制相似问题