Kafka stream自动设置内部主题的retention.ms
和cleanup.policy
,例如。物化的KTables。但是,我注意到WindowStoreBuilder创建的日志主题上没有设置retention.ms
。
直观地说,日志主题也是内部主题,可以/应该是自动配置的。在我的例子中,商店有很短的保留时间,但默认的retention.ms
是5天。恢复本地存储需要一个小时,而我们只关心最后几分钟的数据,较旧的数据将从存储中清除。
是我做错了什么,还是我应该显式地设置这些配置项?
var retention = Duration.ofMinutes( 5 );
var storeBuilder =
new WindowStoreBuilder<>(
Stores.persistentWindowStore(
"name",
retention,
Duration.ofMinutes( 1 ),
false ),
Serdes.String( ), new JaegerSpanSerde( ), Time.SYSTEM )
.withLoggingEnabled( Map.of(
TopicConfig.RETENTION_MS_CONFIG, Long.toString( retention.toMillis( ) ),
TopicConfig.SEGMENT_MS_CONFIG, Long.toString( retention.toMillis( ) ) ) );
如果在后续版本中增加了retention
,并且changelog topic已经存在,是否会更新topic配置?
发布于 2020-07-08 21:40:23
内部主题仅由Kafka Streams在创建时进行配置。如果您在代码中更改保留时间设置,则不会更新相应的主题配置。这是一个已知的问题:https://issues.apache.org/jira/browse/KAFKA-7591
作为解决办法,您可以手动重新配置changelog主题。
https://stackoverflow.com/questions/62799766
复制