我有几个管道从流JSON记录中写入avro文件,但我在将它们导入BigQuery时遇到了问题,因为日期字段的logicalType没有在avro模式中定义。
考虑以下简单的PoJo:
@DefaultCoder(AvroCoder.class)
public class SampleClass {
@AvroEncode(using=DateAsLongEncoding.class)
private Date updateTime;
public SampleClass() {
}
// Getters and setters
}
使用此方法,字段将正确地保存到avro中作为长整型。但是,未在架构中设置LogicalType,这会导致在导入到BigQuery时出现问题,因为您希望它是TIMESTAMP
或DATE
,而不是long。
我希望能够像使用@AvroEncode
一样对字段进行注释。设置@LogicalType('timestamp-millis')
会很好。
有没有人做过类似的事情,或者有任何其他简单的方法来为字段指定LogicalType?
发布于 2020-08-17 16:44:03
使用gson typeAdapters反序列化传入的json解决了这个问题,如下所示:
GsonBuilder builder = new GsonBuilder();
builder.registerTypeAdapter(Sample.class, new JsonDeserializer() {
@Override
public Sample deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
try {
JsonObject jObj = json.getAsJsonObject();
return new Sample(
jObj.get("timestamp").getAsString()
);
}
catch (Exception e) {
log.error("Sample parser failed" + e.toString());
return null;
}
}
});
builder.create();
示例类,使用java.time.Instant从ISO日期字符串创建millis:
@DefaultCoder(SnappyCoder.class)
public class Sample implements Serializable {
@AvroSchema("{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}")
private long updateTime;
public Sample(String timestamp) {
this.updateTime = Instant.parse(timestamp).toEpochMilli();
}
}
发布于 2020-08-12 07:37:00
您可以尝试像在this test中那样指定@AvroSchema
,这样您的示例将如下所示
@DefaultCoder(AvroCoder.class)
public class SampleClass {
@AvroEncode(using=DateAsLongEncoding.class)
@AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
private Date updateTime;
public SampleClass() {
}
// Getters and setters
}
另外,您是否考虑过使用BigQueryIO直接编写代码?它有two write methods,一个写出文件并加载它们,另一个使用流插入API。
https://stackoverflow.com/questions/62696580
复制相似问题