首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >春云流卡夫卡-抓起大块头,错过心跳。

春云流卡夫卡-抓起大块头,错过心跳。
EN

Stack Overflow用户
提问于 2019-02-28 18:55:54
回答 1查看 5.6K关注 0票数 3

我正在查看一个spring引导服务,它读取来自apache的消息,通过http请求来自另一个服务的消息所指示的记录,对它们进行处理,将一些数据保存到数据库中,并将结果发布到另一个主题。

这是通过

代码语言:javascript
运行
复制
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)

这是在几个服务中完成的,而且通常工作得很好。唯一的属性集是

代码语言:javascript
运行
复制
spring.cloud.stream.binder.consumer.concurrency=20

主题本身有20个分区,应该适合。

当监视kafka的读取时,我们发现吞吐量很低,行为也很奇怪:

该应用程序一次读取多达500条消息,随后是1到2分钟的空白。在此期间,使用者反复记录它“丢失心跳,因为分区被重新平衡”,“重新分配分区”,有时甚至抛出一个异常,称它“未能提交,因为轮询间隔已经过去”。

我们得出的结论是,这意味着使用者获取500条消息,需要很长时间来处理所有消息,错过了时间窗口,因此无法向代理提交500条消息中的任何一条--后者重新分配分区并重新发送相同的消息。

在查看了线程和文档之后,我发现了"max.poll.records“属性,但是作为设置此属性的位置,建议相互矛盾。

有人说要把它放在下面

代码语言:javascript
运行
复制
spring.cloud.stream.bindings.consumer.<input>.configuration

有人说

代码语言:javascript
运行
复制
spring.cloud.stream.kafka.binders.consumer-properties

我尝试将两者都设置为1,但服务行为并没有改变。

如何正确处理使用者无法在默认设置下满足所需轮询间隔的情况?

共同-yaml:

代码语言:javascript
运行
复制
spring.cloud.stream.default.group=${spring.application.name}

服务-yaml

代码语言:javascript
运行
复制
spring:
  clould:
    stream:
      default:
        consumer.headerMode: embeddedHeaders
        producer.headerMode: embeddedHeaders
      bindings:
       someOutput:
         destination: outTopic
       someInput:
         destination: inTopic
           consumer:
             concurrency: 30
      kafka:
        bindings:
          consumer:
            someInput:
              configuarion:
                max.poll.records: 20 # ConsumerConfig ignores this
              consumer:
                enableDlq: true
                configuarion:
                  max.poll.records: 30 # ConsumerConfig ignores this
          someInput:
            configuarion:
              max.poll.records: 20 # ConsumerConfig ignores this
            consumer:
              enableDlq: true
              configuarion:
                max.poll.records: 30 # ConsumerConfig ignores this
        binder:
          consumer-properties:
            max.poll.records: 10 # this gets used first
          configuration:
            max.poll.records: 40 # this get used when the first one is not present

“忽略此”始终意味着,如果没有设置其他属性,则ConsumerConfiguration对max民意测验记录保持默认值为500。

编辑:我们已经接近了:

这个问题与spring重试exponentialBackoffStrategy集有关--以及大量错误有效地停止了应用程序。

我不明白的是,我们通过向所讨论的主题发布格式错误的消息来强制200个错误,这会导致应用程序读取200,花了很长时间(用旧的重试配置),然后一次完成所有200个错误。

如果我们有

代码语言:javascript
运行
复制
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-02-28 19:06:04

它是

代码语言:javascript
运行
复制
spring.cloud.stream.kafka.bindings.consumer.<input>.consumer.configuration.max.poll.records
.

文献资料..。

卡夫卡消费者地产 以下属性仅适用于卡夫卡消费者,必须以spring.cloud.stream.kafka.bindings.<channelName>.consumer.作为前缀 ..。 配置 使用包含一般Kafka使用者属性的键/值对映射。 默认值:空映射。 ..。

你也可以增加max.poll.interval.ms

编辑

我刚刚用2.1.0 described进行了测试--它的工作原理与我所描述的一样:

无设置

代码语言:javascript
运行
复制
2019-03-01 08:47:59.560  INFO 44698 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 500
    ...

启动默认

代码语言:javascript
运行
复制
spring.kafka.consumer.properties.max.poll.records=42

2019-03-01 08:49:49.197  INFO 45044 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 42
    ...

绑定器默认#1

代码语言:javascript
运行
复制
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.consumer-properties.max.poll.records=43

2019-03-01 08:52:11.469  INFO 45842 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

绑定器默认#2

代码语言:javascript
运行
复制
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43

2019-03-01 08:54:06.211  INFO 46252 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 43
    ...

绑定默认

代码语言:javascript
运行
复制
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44

2019-03-01 09:02:26.004  INFO 47833 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 44
    ...

结合特异性

代码语言:javascript
运行
复制
spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

2019-03-01 09:05:01.452  INFO 48330 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    ...
    max.poll.records = 45
    ...

EDIT2

这是完整的测试应用程序。我只是在http://start.spring.io上创建了一个新的应用程序,并选择了“Kafka”和“Cloud”。

代码语言:javascript
运行
复制
@SpringBootApplication
@EnableBinding(Sink.class)
public class So54932453Application {

    public static void main(String[] args) {
        SpringApplication.run(So54932453Application.class, args).close();
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {

    }

}

代码语言:javascript
运行
复制
spring.cloud.stream.bindings.input.group=so54932453

spring.kafka.consumer.properties.max.poll.records=42
spring.cloud.stream.kafka.binder.configuration.max.poll.records=43
spring.cloud.stream.kafka.default.consumer.configuration.max.poll.records=44
spring.cloud.stream.kafka.bindings.input.consumer.configuration.max.poll.records=45

代码语言:javascript
运行
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>net.gprussell</groupId>
    <artifactId>so54932453</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>so54932453</name>
    <description>Demo</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>
票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54932453

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档