我正在查看一个spring引导服务,它读取来自apache的消息,通过http请求来自另一个服务的消息所指示的记录,对它们进行处理,将一些数据保存到数据库中,并将结果发布到另一个主题。
这是通过
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
这是在几个服务中完成的,而且通常工作得很好。唯一的属性集是
spring.cloud.stream.binder.consumer.concurrency=20
主题本身有20个分区,应该适合。
当监视kafka的读取时,我们发现吞吐量很低,行为也很奇怪:
该应用程序一次读取多达500条消息,随后是1到2分钟的空白。在此期间,使用者反复记录它“丢失心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出一个异常,称它“未能提交,因为轮询间隔已经过去”。
我们得出的结论是,这意味着使用者获取500条消息,需要很长时间来处理所有消息,错过了时间窗口,因此无法向代理提交500条消息中的任何一条--后者重新分配分区并重新发送相同的消息。
在查看了线程和文档之后,我发现了"max.poll.records“属性,但是作为设置此属性的位置,建议相互矛盾。
有人说要把它放在下面
spring.cloud.stream.bindings.consumer.<input>.configuration
有人说
spring.cloud.stream.kafka.binders.consumer-properties
我尝试将两者都设置为1,但服务行为并没有改变。
如何正确处理使用者无法在默认设置下满足所需轮询间隔的情况?
共同-yaml:
spring.cloud.stream.default.group=${spring.application.name}
服务-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
“忽略此”始终意味着,如果没有设置其他属性,则ConsumerConfiguration对max民意测验记录保持默认值为500。
编辑:我们已经接近了:
这个问题与spring重试exponentialBackoffStrategy集有关--以及大量错误有效地停止了应用程序。
我不明白的是,我们通过向所讨论的主题发布格式错误的消息来强制200个错误,这会导致应用程序读取200,花了很长时间(用旧的重试配置),然后一次完成所有200个错误。
如果我们有
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
发布于 2019-02-28 11:06:04
它是
spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.
见文献资料..。
卡夫卡消费者地产 以下属性仅适用于卡夫卡消费者,必须以
spring.cloud.stream.kafka.bindings.<channelName>.consumer.
作为前缀 ..。 配置 使用包含一般Kafka使用者属性的键/值对映射。 默认值:空映射。 ..。
你也可以增加max.poll.interval.ms
。
编辑
我刚刚用2.1.0 described进行了测试--它的工作原理与我所描述的一样:
无设置
2019-03-01 08:47:59.560 INFO 44698 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 500
...
启动默认
spring.kafka.consumer.properties.max.poll.records=42
2019-03-01 08:49:49.197 INFO 45044 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 42
...
绑定器默认#1
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43
2019-03-01 08:52:11.469 INFO 45842 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
绑定器默认#2
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
2019-03-01 08:54:06.211 INFO 46252 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 43
...
绑定默认
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
2019-03-01 09:02:26.004 INFO 47833 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 44
...
结合特异性
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
2019-03-01 09:05:01.452 INFO 48330 --- [ main] o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values:
...
max.poll.records = 45
...
EDIT2
这是完整的测试应用程序。我只是在http://start.spring.io上创建了一个新的应用程序,并选择了“Kafka”和“Cloud”。
@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {
public static void main(String[] args) {
SpringApplication.run(So54932453Application.class, args).close();
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
}
}
和
spring.cloud.stream.bindings.input.group=so54932453
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45
和
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>net.gprussell</groupId>
<artifactId>so54932453</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>so54932453</name>
<description>Demo</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
</project>
https://stackoverflow.com/questions/54932453
复制