问题中的错误:
AttributeError: 'function' object has no attribute 'tableId'
有问题的代码:
def get_table_spec(row: Dict[str, str]):
data = json.loads(opts.data.get())
network = data['network']
gco = opts.view_as(GoogleCloudOptions)
project = gco.project
return f'{gco.project}:reports.{network}'
result = rows | 'SaveToBigQueryTable' >> beam.io.WriteToBigQuery(
table=get_table_spec,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
因此,当我使用DirectRunner时,这可以在本地运行。python -m template --runner DirectRunner --otherargs ...
没有抛出错误,数据被保存到我的BigQuery表中。
当我尝试编译模板并将其部署到存储桶python -m template --runner DataflowRunner --template_location ... --staging_location etc...
时,它抛出了上面的错误。
已在版本2.15和2.16的Apache Beam for Python上试用。
发布于 2019-10-17 05:06:19
在Python SDK的2.12.0版本中引入了在写入BigQuery时使用动态目的地的功能。这是由于标记为实验性的新BigQuery sink造成的。
要将其与最新SDK上的DataflowRunner
一起使用,您可以在python
命令中添加此标志:
--experiments use_beam_bq_sink
https://stackoverflow.com/questions/58423390
复制相似问题