Loading [MathJax]/jax/output/CommonHTML/config.js
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >问答首页 >Kafka Binder不应用序列化程序配置

Kafka Binder不应用序列化程序配置
EN

Stack Overflow用户
提问于 2022-07-11 02:16:43
回答 1查看 140关注 0票数 0

我在我的应用程序中使用流桥,因为要发送的主题是在运行时根据URL路径参数确定的;我从请求体、路径元素和调用流桥发送到一个函数,以便发布到Kafka

代码语言:javascript
运行
AI代码解释
复制
@Bean
public RouterFunction<ServerResponse> webhooks() {
    return route().POST("/webhooks/v1/{cat}/{mat}/{key}", accept(MediaType.APPLICATION_JSON), (serverRequest) -> {
        String cat = serverRequest.pathVariable("cat");
        String mat = serverRequest.pathVariable("mat");
        String key = serverRequest.pathVariable("key");
        String logPrefix = serverRequest.exchange().getLogPrefix();
        log.debug("{}Received HTTP Payload for {}:{} with key {}", logPrefix, cat, mat, key);
        return serverRequest.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
                })
                .map(payload -> MessageBuilder.withPayload(payload)
                        .setHeader(catHeader, cat)
                        .setHeader(matHeader, mat)
                        .setHeader(keyHeader, key)
                        .setHeader(KafkaHeaders.MESSAGE_KEY, "someKey")
                        .setHeader(KafkaHeaders.TOPIC, String.join("-", cat, mat, namespace))
                        .setHeader(webhookRequestId, logPrefix)
                        .build())
                .map(message -> streamBridge.send("producer", message))
                .flatMap(message -> ServerResponse.accepted().build());
    }).build();
}

@Bean
public Function<Flux<Message<?>>, Flux<Message<?>>> producer() {
    return mapFlux -> mapFlux.map(m -> MessageBuilder.withPayload(m.getPayload()).copyHeaders(m.getHeaders()).build());
}

然后将以下属性添加到应用程序yaml中

代码语言:javascript
运行
AI代码解释
复制
spring:
  main:
    banner-mode: off
  mongodb:
    embedded:
      version: 3.4.6
  data:
    mongodb:
      port: 29129
      host: localhost
      database: howler_db
  kafka:
    binder:
      brokers: localhost:9952
  cloud:
    function:
      definition: producer
    stream:
      bindings:
        producer-out-0:
          useTopicHeader: true
          producer:
            configuration:
              retry:
                topic:
                  delay: 200
              key-serializer: org.apache.kafka.common.serialization.StringSerializer
              value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
              retries: 3
              max:
                block:
                  ms: 500
              enable:
                idempotence: true
              acks: all

不过,我的测试在此例外情况下失败。

代码语言:javascript
运行
AI代码解释
复制
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert key of class java.lang.String to class org.apache.kafka.common.serialization.ByteArraySerializer specified in key.serializer
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:949) ~[kafka-clients-3.1.1.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914) ~[kafka-clients-3.1.1.jar:na]
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087) ~[spring-kafka-2.8.7.jar:2.8.7]
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) ~[spring-kafka-2.8.7.jar:2.8.7]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429) ~[spring-kafka-2.8.7.jar:2.8.7]
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:513) ~[spring-integration-kafka-5.5.13.jar:5.5.13]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1074) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.13.jar:5.5.13]
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:235) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:170) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:150) ~[spring-cloud-stream-3.2.4.jar:3.2.4]
    at com.gabbar.cloud.sambha.SholayWebFunctionConfiguration.lambda$webhooks$1(ShokayWebFunctionConfiguration.java:67) ~[classes/:na]

日志显示生产者key.serializer没有设置。

代码语言:javascript
运行
AI代码解释
复制
2022-07-11 18:24:15.500  INFO 15424 --- [ctor-http-nio-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [127.0.0.1:9952]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.connect.timeout.ms = null
    sasl.login.read.timeout.ms = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.login.retry.backoff.max.ms = 10000
    sasl.login.retry.backoff.ms = 100
    sasl.mechanism = GSSAPI
    sasl.oauthbearer.clock.skew.seconds = 30
    sasl.oauthbearer.expected.audience = null
    sasl.oauthbearer.expected.issuer = null
    sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
    sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
    sasl.oauthbearer.jwks.endpoint.url = null
    sasl.oauthbearer.scope.claim.name = scope
    sasl.oauthbearer.sub.claim.name = sub
    sasl.oauthbearer.token.endpoint.url = null
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 30000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-07-11 05:38:46

这就是现在对我有用的东西。

代码语言:javascript
运行
AI代码解释
复制
spring:
  embedded:
    kafka:
      brokers: localhost:9092
  cloud:
    stream:
      kafka:
        default:
          producer:
            useTopicHeader: true
        binder:
          autoCreateTopics: false
          producerProperties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: org.springframework.kafka.support.serializer.JsonSerializer
            max.block.ms: 100
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72936981

复制
相关文章
LeetCode 1893. 检查是否区域内所有整数都被覆盖(差分)
给你一个二维整数数组 ranges 和两个整数 left 和 right 。每个 ranges[i] = [starti, endi] 表示一个从 starti 到 endi 的 闭区间 。
Michael阿明
2021/09/06
4340
JavaScript 检查是否是数字
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/109551.html原文链接:https://javaforall.cn
全栈程序员站长
2022/07/05
1.1K0
JS检查是否支持Storage
查看效果:http://hovertree.com/code/html5/q69kvsi6.htm
全栈程序员站长
2022/07/15
3K0
js检查日期是否合法
检查日期是否合法 <script> function CheckDateTime(str) { var reg = /^(\d+)-(\d{1,2})-(\d{1,2}) (\d{1,2}):(\d{1,2}):(\d{1,2})$/; var r = str.match(reg); if (r == null) return false; r[2] = r[2] - 1; var d = new Date(r[
江一铭
2022/06/16
6.5K0
js检查是否是数组
该方法兼容Chrome 5, Firefox 4.0, IE 9, Opera 10.5 和 Safari 5,详细兼容性,可以查阅这篇文章。
IT工作者
2022/01/27
3.5K0
python 检查是否存在ddos攻击
import dpkt import socket import optparse
用户5760343
2019/07/31
3.8K0
python 检查是否存在ddos攻击
php-检查网站是否宕机
浏览量 2 <?php $url="http://www.google.com"; $agent = "Mozilla/4.0 (compatible; MSIE 5.01; Windows NT
kdyonly
2023/03/03
1.7K0
用python实现选择截图区域
一直想用python实现一个类似QQ截图的功能,但不直接截图,而是返回截图的区域,以下是代码
py3study
2020/01/10
3.8K0
是时候检查一下使用索引的姿势是否正确了!
索引,可以有效提高我们的数据库搜索效率,各种数据库优化八股文里都有相关的知识点可背,不过单纯的被条目其实很容易忘记。
江南一点雨
2022/01/24
7610
是时候检查一下使用索引的姿势是否正确了!
Python脚本检查TCP端口是否正常
#!/usr/bin/python import socket import re import sys def check_server(address,port): s = socket.socket() print "Attempting to connect to %s on port %s" % (address,port) try: s.connect((address,port)) print "Connected to %s on port %s" % (address,port) retu
院长技术
2021/02/19
1.7K0
检查评论者是否有Gravatar头像
今天Angel来了,同时为风风找到了一个评论方面的BUG——游客身份若使用的邮箱没有Gravatar头像就不能发评论,这对于一些有特殊需要的朋友来说可能算不上BUG,但却不是我的需求,之后发现是comments-ajax.php文件中的以下代码在起作用:
WindCoder
2018/09/20
9660
Kettle之“检查表是否存在”
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wzy0623/article/details/53884107
用户1148526
2019/05/25
3.9K0
检测系统是否为检查版本(Checked)
<<Windows internals 5th editon>> page 48.
obaby
2023/02/28
2.2K0
检测系统是否为检查版本(Checked)
检查app的activity是否在当前
APP开发中经常会遇到这种需求,需要检查当前的APP是不是可见的,比如,如果是可见的就维持一个socket长连接,如果切到后台不可见了,就断开这个连接。Android本来并不允许APP去监听home键事件,所以我们没发像iOS那样通过监听home键事件来做对应的处理。不过还好Android的Activity的生命周期给我们提供了一种解决这个问题的方法,我们只需要在每个Activity的onStart和onStop中去维持一个计数,如果这个数值大于0,就表示我们的APP还有Activity是在前台运行的,等
xiangzhihong
2018/01/30
1K0
检查Android是否具有摄像头
通常我们进行摄像头操作,如扫描二维码需要判断是否有后置摄像头(Rear camera),比如Nexus 7 一代就没有后置摄像头,这样在尝试使用的时候,我们需要进行判断进行一些提示或者处理。
技术小黑屋
2018/09/04
1.1K0
检查Linux是否被入侵的方法
一、检查系统日志 lastb //检查系统错误登陆日志,统计IP重试次数 二、检查系统用户 1、cat /etc/passwd //查看是否有异常的系统用户 2、grep "0" /etc/passwd //查看是否产生了新用户,UID和GID为0的用户 3、ls -l /etc/passwd //查看passwd的修改时间,判断是否在不知的情况下添加用户 4、awk -F : '$3==0 {print $1}' /etc/passwd //查看是否存在特权用户 5、awk -F : 'length($2
joshua317
2018/04/16
2.2K0
shell脚本 检查文件是否被修改
1.进行检测目录的添加,可以多个目录 2.添加定时任务,可以一分钟一次,有改动会输出改动情况 #!/bin/bash #2020-3-14 #监测文件是否被修改脚本 #监测目录 dir_file=(/etc) file_list=/usr/local/file_list.txt error_file=/usr/local/error_list.txt display_path() { for i in `ls` do if [[ -d $i ]];then             cd $i    
陈不成i
2021/06/23
2.3K0
RedHat 8 如何检查端口是否联通
其中可能有各种原因导致端口没有联通,通常为操作系统本身的防火墙,托管服务器中心的防火墙等。
HoneyMoose
2021/09/04
2.3K0
RedHat 8 如何检查端口是否联通
检查原生 JavaScript 函数是否被覆盖
原文链接:https://mmazzarolo.com/blog/2022-07-30-checking-if-a-javascript-native-function-was-monkey-patched/[1]
chuckQu
2022/09/20
6430
检查原生 JavaScript 函数是否被覆盖
点击加载更多

相似问题

使用特权帐户重置AD用户帐户密码

10

在Meteor帐户中重置密码后禁用登录-密码

120

用户在更改密码后将被注销。

11

编辑用户的帐户详细信息

79

Django:帐户创建后密码重置的无效令牌

216
添加站长 进交流群

领取专属 10元无门槛券

AI混元助手 在线答疑

扫码加入开发者社群
关注 腾讯云开发者公众号

洞察 腾讯核心技术

剖析业界实践案例

扫码关注腾讯云开发者公众号
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档