首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >无法配置spring云流Kafka Binder连接到代理

无法配置spring云流Kafka Binder连接到代理
EN

Stack Overflow用户
提问于 2022-07-11 23:25:57
回答 1查看 277关注 0票数 0

我用流桥发送卡夫卡消息

streamBridge.send("alm-foo-dev", "kafka", message)

然而,我的测试无法连接到卡夫卡经纪人。

代码语言:javascript
运行
AI代码解释
复制
2022-07-12 12:29:41.095  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.158  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.205  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.262  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.378  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.519  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.769  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.884  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:42.679  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:42.688  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:43.693  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:43.839  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:44.865  WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:44.983  WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.

java.lang.IllegalStateException: No records found for topic

    at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:181)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:142)
    at com.foo.bar.baz.WebhooksFunctionTests.testPublishesHttpMessageToKafka(WebhooksFunctionTests.java:61)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)

下面是我如何配置嵌入式卡夫卡

代码语言:javascript
运行
AI代码解释
复制
@EmbeddedKafka(
        brokerProperties = {"listeners=PLAINTEXT://localhost:9952", "port=9952"},
        topics = {"alm-jira-dev"},
        partitions = 1
)

src/test/java/application.yml

代码语言:javascript
运行
AI代码解释
复制
server:
  port: 29191
logging:
  level:
    root: ERROR
    org:
      springframework.integration: DEBUG
      springframework.cloud.stream: DEBUG
      springframework.boot.autoconfigure.mongo: WARN
    com.digite.cloud: DEBUG
spring:
  application:
    name: howler
  main:
    banner-mode: off
  mongodb:
    embedded:
      version: 3.4.6
  data:
    mongodb:
      port: 29129
      host: localhost
      database: howler_db
  cloud:
    stream:
      kafka:
        default:
          producer:
            useTopicHeader: true
        binder:
          defaultBrokerPort: 9952
          autoCreateTopics: false
          producerProperties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
            max.block.ms: 100
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-28 20:26:42

这个配置对我有效(运行测试)

代码语言:javascript
运行
AI代码解释
复制
spring:
  embedded:
    kafka:
      brokers: localhost:9092
  cloud:
    stream:
      kafka:
        default:
          producer:
            useTopicHeader: true
        binder:
          autoCreateTopics: false
          producerProperties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
            max.block.ms: 100

但是,当我将应用程序部署到Kube时,还需要弄清楚配置会是什么样子。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72948508

复制
相关文章
spring-cloud-stream-binder-kafka属性配置
本文简单介绍下spring-cloud-stream-binder-kafka的一些属性配置。
code4it
2018/09/17
3.9K0
Hadoop配置datanode无法连接到master
初次在VM上配置Hadoop,开了三台虚拟机,一个作namenode,jobtracker
星哥玩云
2022/06/30
8880
Hadoop配置datanode无法连接到master
配置Oracle Gateway 12连接到SQL server 2014
最近的工作中需要基于Oracle连接到SQLserver2014,我们可以通过配置Gateway的方式来实现这个功能。这个Gateway的实质是透过dblink来实现的。即把SQLserver模拟成一个远端的Oracle实例,这个实例由Gateway来负责进行接收,转发等等。本文简要描述其配置过程。
Leshami
2018/08/13
3K0
配置Oracle Gateway 12连接到SQL server 2014
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
在这个博客系列的第1部分之后,Apache Kafka的Spring——第1部分:错误处理、消息转换和事务支持,在这里的第2部分中,我们将关注另一个增强开发者在Kafka上构建流应用程序时体验的项目:Spring Cloud Stream。
架构师研究会
2019/10/23
2.5K0
【首席架构师看Event Hub】Kafka深挖 -第2部分:Kafka和Spring Cloud Stream
Spring Cloud Stream与Kafka集成
Spring Cloud Stream是一个用于构建消息驱动的微服务的框架,它为Spring Boot应用程序提供了与消息代理集成的声明式模型。在本文中,我们将探讨如何使用Spring Cloud Stream与Kafka集成,以及如何构建一个使用Kafka作为消息代理的Spring Boot应用程序。
堕落飞鸟
2023/04/12
1.4K0
配置客户端以安全连接到Kafka集群- Kerberos
这是有关Apache Kafka安全性的简短博客文章系列的第一部分。在本文中,我们将说明如何配置客户端以使用不同的身份验证机制对集群进行身份验证。
大数据杂货铺
2021/02/07
5.9K0
microsoft edge无法连接到代理服务器(ie代理服务器错误)
电脑里有谷歌浏览器也有系统自带的Microsoft edge浏览器,谷歌浏览器可以上网,edge不能上网,出现无法连接到代理服务器的字样。
全栈程序员站长
2022/07/26
15.3K2
microsoft edge无法连接到代理服务器(ie代理服务器错误)
Druid 加载 Kafka 流数据 Supervisor 配置
https://www.ossez.com/t/druid-kafka-supervisor/13664
HoneyMoose
2021/08/10
8880
Druid 加载 Kafka 流数据 Supervisor 配置
配置客户端以安全连接到Kafka集群–LDAP
在上一篇文章《配置客户端以安全连接到Kafka集群- Kerberos》中,我们讨论了Kerberos身份验证,并说明了如何配置Kafka客户端以使用Kerberos凭据进行身份验证。在本文中,我们将研究如何配置Kafka客户端以使用LDAP(而不是Kerberos)进行身份验证。
大数据杂货铺
2021/02/07
4.8K0
spring for kafka自动配置及配置属性
本文主要列一下spring for apache kafka的一些auto config以及属性配置
code4it
2018/09/17
7900
Spring Cloud Stream核心组件Channel(二)
最后,以下是一个使用Spring Cloud Stream的input Channel来从myInputChannel读取消息的示例:
堕落飞鸟
2023/04/12
5410
w ndows无法连接到System,电脑无法连接到System Event Notification Service服务[通俗易懂]
大家好,又见面了,我是你们的朋友全栈君。 很多用户表示,在使用电脑的时候,电脑右下角任务栏提示“未能连接一个Windows服务,Windows无法连接到System Event Notificatio
全栈程序员站长
2022/08/27
4.2K0
w ndows无法连接到System,电脑无法连接到System Event Notification Service服务[通俗易懂]
Spring Cloud 系列之消息驱动 Stream
  在一个系统中我们可能包含前端页面、接口服务、大数据层,可能在接口服务中使用的是 RabbitMQ 而在大数据层中使用的是 Kafka,那么我只会 RabbitMQ 不会 Kafka 岂不是还要去学习,白天 996 晚上 007 简直要命。那么有没有一个像 JDBC 一样的能够屏蔽细节让我们可以迅速切换。   Spring Cloud Stream 是一个构建消息驱动微服务应用的框架。它基于 Spring Boot 构建独立的、生产级的 Spring 应用,并使用 Spring Integration 为消息代理提供链接。应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。 Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前只实现了 Kafka 和 RabbitMQ 的 Binder。
Demo_Null
2020/11/24
1.4K0
Spring Cloud 系列之消息驱动 Stream
springboot实战之stream流式消息驱动
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它可以基于Spring Boot 来创建独立的,可用于生产的Spring 应用程序。他通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现
lyb-geek
2019/08/21
4.8K0
部署和配置 Spring Cloud Data Flow
Spring Cloud Data Flow 是一个开源的数据处理管道平台,它提供了一组标准化的组件和工具,可以用于构建、部署和监控复杂的数据处理管道。
堕落飞鸟
2023/04/19
1.8K0
Spring Cloud Stream核心组件Binder(一)
Spring Cloud Stream是一个基于Spring Boot的框架,用于构建基于消息传递的微服务应用程序。其中核心组件Binder是用于处理输入和输出消息的中间件。在Spring Cloud Stream中,Binder提供了与各种消息代理(如Kafka、RabbitMQ、ActiveMQ等)的连接,同时提供了一些高级特性,如消息分区、事务性等。
堕落飞鸟
2023/04/12
5840
[菜鸟SpringCloud实战入门]第八章:通过消息总线Spring Cloud Bus实现配置文件刷新(使用Kafka)
欢迎来到菜鸟SpringCloud实战入门系列(SpringCloudForNoob),该系列通过层层递进的实战视角,来一步步学习和理解SpringCloud。
蛮三刀酱
2019/03/26
1K0
[菜鸟SpringCloud实战入门]第八章:通过消息总线Spring Cloud Bus实现配置文件刷新(使用Kafka)
Spring Cloud Stream 高级特性-消息桥接(二)
在使用消息桥接时,您需要权衡这些优缺点,并根据应用程序的需求进行相应的配置和调整。
堕落飞鸟
2023/04/13
5480
微服务架构之Spring Boot(五十七)
通过提供 spring-kafka 项目的自动配置来支持Apache Kafka。
用户1289394
2022/05/23
9530
Spring的cglib代理类无法取到被代理类的成员属性
cglib的原理是生成一个被代理类的子类进行增强, 那么为什么子类访问不到父类的属性呢
code-x
2023/04/19
1.8K0

相似问题

Kafka - spring云流

227

Kubernetes的Spring数据流-无法配置多个kafka代理

10

使用Spring云流Kafka Binder在批处理功能中发布多条消息

16

无法在spring云流kafka中设置kafka属性

10

Spring云流Kafka Binder的一个输入主题多输出主题

28
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文