发布于 2021-01-22 07:20:55
没有任何选项允许您设置从Kafka收到的消息数量的最小值。选项maxOffsetsPerTrigger允许您设置最大的消息。
如果您希望您的微批处理同时处理更多消息,您可以考虑增加触发间隔。
此外(指你提供的链接),这也是不可能设置在卡夫卡本身。您可以设置获取字节的最小数量,但不能设置最小数量的消息号。
注意,您可以通过readStream通过前缀kafka.在结构化流中设置所有Kafka选项,如卡夫卡特殊配置一节所解释的。
卡夫卡自己的配置可以通过DataStreamReader.option设置,前缀为kafka.前缀,例如,stream.option("kafka.bootstrap.servers",“主机:端口”)。
这样,您还可以使用Configuration kafka.fetch.min.bytes。但是,在loval Kafka 2.5.0安装上用Spark3.0.1测试它不会产生任何影响。当添加配置kafka.fetch.max.wait.ms时,我的测试中的获取时间确实发生了变化,但没有以可预测的方式(至少对我来说)。
查看Spark的KafkaDataConsumer的源代码,与纯KafkaConsumer相比,fetch似乎没有直接计算任何min/最大字节。
https://stackoverflow.com/questions/65840541
复制相似问题