我用流桥发送卡夫卡消息
streamBridge.send("alm-foo-dev", "kafka", message)
然而,我的测试无法连接到卡夫卡经纪人。
2022-07-12 12:29:41.095 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.158 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.205 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.262 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.378 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.519 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.769 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:41.884 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:42.679 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:42.688 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:43.693 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:43.839 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:44.865 WARN 79618 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
2022-07-12 12:29:44.983 WARN 79618 --- [read | consumer] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-1, groupId=consumer] Connection to node 0 (localhost/127.0.0.1:9952) could not be established. Broker may not be available.
java.lang.IllegalStateException: No records found for topic
at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:181)
at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:142)
at com.foo.bar.baz.WebhooksFunctionTests.testPublishesHttpMessageToKafka(WebhooksFunctionTests.java:61)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
下面是我如何配置嵌入式卡夫卡
@EmbeddedKafka(
brokerProperties = {"listeners=PLAINTEXT://localhost:9952", "port=9952"},
topics = {"alm-jira-dev"},
partitions = 1
)
和src/test/java/application.yml
server:
port: 29191
logging:
level:
root: ERROR
org:
springframework.integration: DEBUG
springframework.cloud.stream: DEBUG
springframework.boot.autoconfigure.mongo: WARN
com.digite.cloud: DEBUG
spring:
application:
name: howler
main:
banner-mode: off
mongodb:
embedded:
version: 3.4.6
data:
mongodb:
port: 29129
host: localhost
database: howler_db
cloud:
stream:
kafka:
default:
producer:
useTopicHeader: true
binder:
defaultBrokerPort: 9952
autoCreateTopics: false
producerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
max.block.ms: 100
发布于 2022-07-28 20:26:42
这个配置对我有效(运行测试)
spring:
embedded:
kafka:
brokers: localhost:9092
cloud:
stream:
kafka:
default:
producer:
useTopicHeader: true
binder:
autoCreateTopics: false
producerProperties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
max.block.ms: 100
但是,当我将应用程序部署到Kube时,还需要弄清楚配置会是什么样子。
https://stackoverflow.com/questions/72948508
复制相似问题