我们正在使用Apache光束来处理从pubsub源到GCS接收器的具有动态文件名的流数据。我们可以写文本文件,但不能写wav文件。
我们可以写出使用StringUtf8Coder的字节数组字符串(linear16 wav编码),但使用ByteArrayEncoder时会出现编译错误
//这行得通:
pipelineBeginStage
.apply(
FileIO.<String, KamiAppData>writeDynamic()
.by((SerializableFunction<KamiAppData, String>) input -> input.GCSurl)
.via(
Contextful.fn((SerializableFunction<KamiAppData, String>) input -> input.audioStream),
TextIO.sink())
.to(outputBucket)
.withNaming(url -> FileNaming.getNaming(url, "wav"))
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1));
//这会产生编译错误:
pipelineBeginStage
.apply(
FileIO.<String, KamiAppData>writeDynamic()
.by((SerializableFunction<KamiAppData, String>) input -> input.GCSurl)
.via(
Contextful.fn((SerializableFunction<KamiAppData, byte[]>) input -> input.audioStream.getBytes()),
TextIO.sink())
.to(outputBucket)
.withNaming(url -> FileNaming.getNaming(url, "wav"))
.withDestinationCoder(ByteArrayCoder.of())
.withNumShards(1));
我们得到的错误是:
cannot resolve method 'via(org.apache.beam.sdk.transforms.Contextful.Fn<InputT,OutputT>>,org.apache.beam.sdk.TextIO.Sink)'
我们如何使用Apache Beam编译音频并将其写入文件?
发布于 2019-06-24 22:51:06
Anton在上面的评论中的解释在我看来非常清楚。
我只想补充一下,为了让它与字节类型一起工作,你可能需要实现你自己的FileIO.Sink<byte[]>
类。
https://stackoverflow.com/questions/56725569
复制相似问题