在这个文档中,Apache建议在写入BigQuery时使用死信模式。此模式允许您用'FailedRows'
标记从转换输出中获取未能写入的行。
然而,当我尝试使用它时:
WriteToBigQuery(
table=self.bigquery_table_name,
schema={"fields": self.bigquery_table_schema},
method=WriteToBigQuery.Method.FILE_LOADS,
temp_file_format=FileFormat.AVRO,
)
我的一个元素中的架构不匹配导致以下异常:
Error message from worker: Traceback (most recent call last):
File
"/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630,
in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
in fastavro._write.write_union ValueError: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy During handling of the above exception, another exception occurred: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1198,
in apache_beam.runners.common.DoFnRunner.process File "apache_beam/runners/common.py", line 718,
in apache_beam.runners.common.PerWindowInvoker.invoke_process File "apache_beam/runners/common.py", line 841,
in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window File "apache_beam/runners/common.py", line 1334,
in apache_beam.runners.common._OutputProcessor.process_outputs File "/my_code/apache_beam/io/gcp/bigquery_file_loads.py", line 258,
in process writer.write(row) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1635,
in write ex, self._avro_writer.schema, row)).with_traceback(tb) File "/my_code/apache_beam/io/gcp/bigquery_tools.py", line 1630,
in write self._avro_writer.write(row) File "fastavro/_write.pyx", line 647,
in fastavro._write.Writer.write File "fastavro/_write.pyx", line 376,
in fastavro._write.write_data File "fastavro/_write.pyx", line 320,
in fastavro._write.write_record File "fastavro/_write.pyx", line 374,
in fastavro._write.write_data File "fastavro/_write.pyx", line 283,
in fastavro._write.write_union ValueError: Error writing row to Avro: [] (type <class 'list'>) do not match ['null', 'double'] on field safety_proxy Schema: ...
据我所知,架构不匹配会导致fastavro._write.Writer.write
失败并引发异常。相反,我希望WriteToBigQuery
应用死信行为,并将格式错误的行作为FailedRows
标记的输出返回。有办法做到这一点吗?
谢谢
编辑:添加更详细的示例,说明我要做的事情:
from apache_beam import Create
from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery
from apache_beam.io.textio import WriteToText
...
valid_rows = [{"some_field_name": i} for i in range(1000000)]
invalid_rows = [{"wrong_field_name": i}]
pcoll = Create(valid_rows + invalid_rows)
# This fails because of the 1 invalid row
write_result = (
pcoll
| WriteToBigQuery(
table=self.bigquery_table_name,
schema={
"fields": [
{'name': 'some_field_name', 'type': 'INTEGER', 'mode': 'NULLABLE'},
]
},
method=WriteToBigQuery.Method.FILE_LOADS,
temp_file_format=FileFormat.AVRO,
)
)
# What I want is for WriteToBigQuery to partially succeed and output the failed rows.
# This is because I have pipelines that run for multiple hours and fail because of
# a small amount of malformed rows
(
write_result[BigQueryWriteFn.FAILED_ROWS]
| WriteToText('gs://my_failed_rows/')
)
发布于 2022-09-05 10:08:47
您可以在管道中使用一个死信队列,而不是让BigQuery
为您捕获错误。Beam
提出了一种使用TupleTags
处理错误和死信队列的本机方法,但是代码很少冗长。
我为Python sdk
和Java sdk
创建了一个名为Python sdk
和Java sdk
的开源库,用于为更少的代码、更简洁和更有表现力的代码应用错误处理:
https://github.com/tosun-si/pasgarde
(也是Java版本:https://github.com/tosun-si/asgarde)
您可以使用pip安装它:
asgarde==0.16.0
pip install asgarde==0.16.0
from apache_beam import Create
from apache_beam.io.gcp.bigquery import BigQueryWriteFn, WriteToBigQuery
from apache_beam.io.textio import WriteToText
from asgarde.collection_composer import CollectionComposer
def validate_row(self, row) -> Dict :
field = row['your_field']
if field is None or field == '':
# You can raise your own custom exception
raise ValueError('Bad field')
...
valid_rows = [{"some_field_name": i} for i in range(1000000)]
invalid_rows = [{"wrong_field_name": i}]
pcoll = Create(valid_rows + invalid_rows)
# Dead letter queue proposed by Asgarde, it's return output and Failure PCollection.
output_pcoll, failure_pcoll = (CollectionComposer.of(pcoll)
.map(self.validate_row))
# Good sink
(
output_pcoll
| WriteToBigQuery(
table=self.bigquery_table_name,
schema={
"fields": [
{'name': 'some_field_name', 'type': 'INTEGER', 'mode': 'NULLABLE'},
]
},
method=WriteToBigQuery.Method.FILE_LOADS
)
)
# Bad sink : PCollection[Failure] / Failure contains inputElement and
# stackTrace.
(
failure_pcoll
| beam.Map(lambda failure : self.your_failure_transformation(failure))
| WriteToBigQuery(
table=self.bigquery_table_name,
schema=your_schema_for_failure_table,
method=WriteToBigQuery.Method.FILE_LOADS
)
)
Failure
对象的结构由Asgarde
库提出:
@dataclass
class Failure:
pipeline_step: str
input_element: str
exception: Exception
在validate_row
函数中,您将应用验证逻辑并检测不良字段。在这种情况下,您将引发异常,Asgarde
将为您捕获错误。
CollectionComposer
流的结果是:
PCollection
,在本例中,我认为是一个PCollection[Dict]
PCollection[Failure]
最后,您可以处理多个接收器:
您还可以将相同的逻辑应用于本机Beam
错误处理和TupleTags
,我在TupleTags
存储库中的一个项目中给出了一个示例:
发布于 2022-09-03 04:52:20
让我们在所期望的目标和结果上稍微后退一步。
为什么需要"FILE_LOADS“作为bigquery方法?
您是否也知道BigQuery存储写API:https://cloud.google.com/bigquery/docs/write-api?
它看起来像java支持BI刻写API,但目前不支持python。我相信使用写API可以通过gRPC连接到BigQuery,而不是需要序列化到avro,然后调用遗留的批处理加载过程?
也许,看看这是否有帮助--模式很重要,但似乎AVRO与您的目标无关,就因为您正在调用的代码?
https://stackoverflow.com/questions/73589662
复制相似问题