首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如果我们没有在kafka中使用delivery回调,那么在produce之后调用poll()有什么意义吗?

在Kafka中,poll()方法主要用于消费者(Consumer)从Kafka集群中拉取消息。如果你没有使用生产者的delivery回调,那么在produce之后调用poll()对于生产者(Producer)来说并没有直接的意义。

基础概念

  1. Kafka Producer: 负责将消息发送到Kafka集群。
  2. Kafka Consumer: 负责从Kafka集群中消费消息。
  3. delivery回调: 生产者在发送消息后,可以通过设置回调函数来获取消息发送的结果,包括成功或失败的信息。

为什么调用poll()没有意义

  • 生产者(Producer): 生产者的主要任务是发送消息到Kafka集群。调用poll()方法是消费者的行为,用于从Kafka集群中拉取消息。因此,生产者在调用produce()发送消息后,调用poll()是没有意义的。
  • 消费者(Consumer): 消费者通过调用poll()方法从Kafka集群中拉取消息。如果生产者没有使用delivery回调,那么生产者在发送消息后无法直接获取消息发送的结果。

解决方案

如果你是生产者,并且希望在发送消息后获取发送结果,应该使用delivery回调。以下是一个简单的示例代码:

代码语言:txt
复制
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

producer.send(new ProducerRecord<>("my-topic", "key", "value"), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            exception.printStackTrace();
        } else {
            System.out.println("Message sent to topic: " + metadata.topic() + " partition: " + metadata.partition() + " offset: " + metadata.offset());
        }
    }
});

producer.close();

参考链接

如果你是消费者,并且需要从Kafka集群中拉取消息,那么调用poll()方法是正确的做法。以下是一个简单的示例代码:

代码语言:txt
复制
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

参考链接

总结来说,生产者在发送消息后调用poll()是没有意义的,应该使用delivery回调来获取消息发送的结果。消费者则需要调用poll()方法来拉取消息。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

Kafka生产者客户端几种异常Case详解

日志里面有提示你是哪个TopicPartition问题。 什么影响: 影响的是你自身的业务逻辑。 那么消息是发送成功还是失败了呢? 判断消息是否发送成不是UserCallBack决定的。...就算你这里抛异常了,那么消息该成功还是成功。 解决办法 UserCallback这个很重要,它是整个I/O线程里面的,它的性能会影响这个生产者发送消息的性能。...解决方案也没有一个统一的办法,我们只能是根据具体现象去做优尝试。 buffer.memory 尝试设置大一点。...因为用户接口是I/O线程执行的, 如果用户该回接口里面写的方法性能很差,会增加整个调用链路的时间, 链路不结束,消息了累加器的消息就一直不能释放。...-> 用户 -> 释放Batch(同时从inFlightBatches移除) Batch自从加入到 inFlightBatches 之后一直迟迟没有完成整个请求链路。

6.4K80
  • confluent-kafka-go源码分析

    confluent-kafka-go是已知的kafka 客户端中最快的,为什么呢?因为它非常轻量,通过cgo 对librdkafka做了一个封装,所以本质上运行的是一个c客户端。...但是如果我们直接用go来连接会报错 1617546888.931|FAIL|rdkafka#producer-1| [thrd:bogon:9092/0]: bogon:9092/0: Failed...1, kafka.NewProducer 2, for e := range p.Events() 协程监听生产者事件 3, p.Produce 生产消息 消费者也主要调用了三个接口 1, kafka.NewConsumer...,两种解决方法: 1,编译一个静态链接库,放在librdkafka_vendor 下面,修改bundle-import.sh 文件,编译 2,编译librdkafka成功后,在编译调用代码的时候,指定为动态加载...7,Events()仅仅是返回了事件的channel func (p *Producer) Events() chan Event { return p.events } 8,produce函数和初始化的时候注册的函数底层调用的是同一个

    1.1K10

    Kafka的消息会丢失和重复?——如何实现Kafka精确传递一次语义

    如果消息重复了呢,我们是否需要复杂的逻辑来自己处理消息重复的情况呢,这种情况恐怕相当复杂而难以处理。但是如果我们能保证消息exactly once,那么一切都容易得多。 ?...图 无人机实时监控 下面我们来简单了解一下消息传递语义,以及kafka的消息传递机制。 首先我们要了解的是message delivery semantic 也就是消息传递语义。...两次中有一次会丢失消息,或者一次会重复,那么最终的结果就是可能丢失或者重复的。...: 0:producer完全不管broker的处理结果 也就没有用了 并不能保证消息成功发送 但是这种吞吐量最高 all或者-1:leader broker会等消息写入 并且ISR都写入后 才会响应...确实在kafka 0.11.0.0版本之前producer端确实是不可能的,但是kafka 0.11.0.0版本之后kafka正式推出了idempotent producer。

    2.5K11

    Kafka 最佳实践

    Kafka 配置、监控、优化的内容,绝对是在实践总结出的精华,很大的借鉴参考意义,本文主要是根据 PPT 的内容进行翻译及适当补充。...OS 优 OS page cache:应当可以缓存所有活跃的 Segment(Kafka 中最基本的数据存储单位); fd 限制:100k+; 禁用 swapping:简单来说,swap 作用是当内存的使用达到一个临界值时就会将内存的数据移动到...Producer 的相关配置、性能优及监控 Quotas 避免被恶意 Client 攻击,保证 SLA; 设置 produce 和 fetch 请求的字节速率阈值; 可以应用在 user、client-id...Consumer 配置 fetch.min.bytes 、fetch.max.wait.ms; max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll...() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance; max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500; session.timeout.ms

    2.3K41

    多图详解kafka生产者消息发送过程

    如果链中间的拦截器(通常会修改记录)抛出异常,则链的下一个拦截器将使用前一个未抛出异常的拦截器返回的记录调用调用地方 ①. 拦截器执行时机键值序列化之前 ②....那么客户端的准备条件哪些呢? 生产者客户端最开始的时候都没有跟任何Node建立连接的, 当我们尝试发送之前会去检验一下连接是否建立成功(就是当前这一步), 如果没有的话,则会去尝试建立连接。...是否能够重试判断逻辑:batch没有超过delivery.timeout.ms && 重启次数<retiries 如果是DuplicateSequenceException异常的话,那么并不会做其他的处理...(), record.key(), record.value())); 注意: 这里的并不是指的一个Batch一个,这里是一个Batch里面有多少条消息,就有多少个。...每个ProducerBatch里面都有一个对象专门保存所有消息的信息 thunks . 处理ProducerBatch返回信息的时候会遍历这个trunks, 来执行每个消息的

    55510

    使用生成器把Kafka写入速度提高1000倍

    [如果代码显示问题,请点击阅读原文] 通过本文你会知道Python里面什么时候用yield最合适。本文不会给你讲生成器是什么,所以你需要先了解Python的yield,再来看本文。...但是当年我始终想不明白,这种写法与直接调用函数什么区别,如下图所示。 ? 直到后来我需要操作Kafka的时候,我明白了使用yield的好处。...一个Kafka生产者对象展开列表,再把数据一条一条塞入Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。 ?...这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。 于是我又修改了代码。...后记 读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一事。

    1.5K20

    多图详解kafka生产者消息发送过程

    如果链中间的拦截器(通常会修改记录)抛出异常,则链的下一个拦截器将使用前一个未抛出异常的拦截器返回的记录调用调用地方 ①. 拦截器执行时机键值序列化之前 ②....那么客户端的准备条件哪些呢? 生产者客户端最开始的时候都没有跟任何Node建立连接的, 当我们尝试发送之前会去检验一下连接是否建立成功(就是当前这一步), 如果没有的话,则会去尝试建立连接。...其他异常或者没有异常则会走正常流程, 并且调用,如果有Exception也会返回。这个InterceptorCallback里面包含在拦截器和(用户自己的)。...())); 注意: 这里的并不是指的一个Batch一个,这里是一个Batch里面有多少条消息,就有多少个。...每个ProducerBatch里面都有一个对象专门保存所有消息的信息 thunks . 处理ProducerBatch返回信息的时候会遍历这个trunks, 来执行每个消息的

    1.7K30

    使用生成器把Kafka写入速度提高1000倍

    但是当年我始终想不明白,这种写法与直接调用函数什么区别,如下图所示。 [2018-04-13-21-51-37.png] 直到后来我需要操作Kafka的时候,我明白了使用yield的好处。...一个Kafka生产者对象展开列表,再把数据一条一条塞入Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。...[witoutyield2.png] 这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。 于是我又修改了代码。...这样的写法,在上面这段代码,一共1003条数据,每100条数据获取一次生产者对象,那么需要获取11次生产者对象,耗时至少为110秒。...[withyield.png] 后记 读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一事。

    92410

    【Rust日报】2023-09-13 RustRover – JetBrains 推出的独立 Rust IDE

    RustRover – JetBrains 推出的独立 Rust IDE “什么时候会有 Rust IDE?” 这是用户经常提出的问题(八年了,你知道这八年我怎么过的?)...多年来 JetBrains Rust 功能多以插件形式被支持。...Rust Kafka 客户端库 一个 librdkafka 基础上完全异步、基于 future 的 Rust Kafka 客户端库 目前提供的主要功能有: 支持自 0.8.x 以来的所有 Kafka...可定制的 rebalance,带有 rebalance 前和 rebalance 后的。 同步或异步消息生成。 可定制的偏移量提交。 创建和删除 topic 以及添加和编辑 partition。...访问生产者和消费者指标、错误和。 通过幂等和事务性生产者以及已提交读取的消费者实现一次性语义 (EOS)。

    42520

    Kafka 新版消费者 API(二):提交偏移量

    可能造成的问题:数据重复读 假设我们仍然使用默认的 5s 提交时间间隔,最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。...commitAsync()也支持broker作出响应时会执行: // 把auto.commit.offset设为false,让应用程序决定何时提交偏移量 props.put("auto.commit.offset...每次提交偏移量之后或在里提交偏移量时递增序列号。进行重试前,先检查回的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。...如果把存储到数据库和提交偏移量一个原子操作里完成,就可以避免这样的问题,但数据存到数据库,偏移量保存到kafka是无法实现原子操作的,而如果把数据存储到数据库,偏移量也存储到数据库,这样就可以利用数据库的事务来把这两个操作设为一个原子操作...* 下一次调用 poll() 就会在本次设置的偏移量上加1,开始处理没有处理过的数据 * 如果seek()发生错误,比如偏移量不存在,则会抛出异常 */ consumer.poll(0); for

    5.6K41

    【愚公系列】2023年03月 MES生产制造执行系统-004.Kafka使用

    生产者负责向Kafka发送消息,消费者负责从Kafka接收消息,主题是消息的分类,分区是主题的分片,偏移量是消息分区的位置。...Kafka四个核心的API: The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。...-消费(持续订阅) /// /// 函数,若配置为非自动提交(默认为否),则通过函数的返回值判断是否提交</param...-消费(持续订阅) /// /// 函数,若配置为非自动提交(默认为否),则通过函数的返回值判断是否提交</param...-消费(持续订阅) /// /// 函数,若配置为非自动提交(默认为否),则通过函数的返回值判断是否提交</param

    43120

    深入理解 Linux 的 epoll 机制

    epoll_ctl 的内部实现,除了把句柄结构用红黑树管理,另一个核心步骤就是设置 poll 。 思考来了:poll 什么?怎么设置?...换句话说,如果一个“文件”所在的文件系统没有实现 poll 接口,那么就用不了 epoll 机制。 第二个问题:poll 怎么设置?... epoll_ctl 下来的实现一步是调用 vfs_poll 这个里面就会有个判断,如果 fd 所在的文件系统的 file_operations 实现了 poll那么就会直接调用如果没有,...当 fd 满足可读可写的时候就会经过层层,最终调用到这个函数,把对应 fd 的结构体放入就绪队列,从而把 epoll 从 epoll_wait 出唤醒。 这个对应结构体是什么?...->poll ,把这个 fd 就绪之后路径安排好。

    8K125

    聊聊storm-kafka-client的ProcessingGuarantee

    消息,poll拉取消息之后判断如果是ProcessingGuarantee.AT_MOST_ONCE类型的,则调用kafkaConsumer.commitSync同步提交,然后返回拉取的记录(最后设置到...partitions,针对这些partitions,先pause再调用poll,最后再resume,也就是此次poll不会从pausedPartitions拉取消息 poll消息之后还有一个动作就是调用...,然后从emitted删除 KafkaSpoutRetryExponentialBackoff.schedule storm-kafka-client-1.2.2-sources.jar!...false,表示不再调度了,之后KafkaSpoutfail方法tupleListener.onMaxRetryReached方法,然后进行ack,表示不再处理了 没有超过maxRetries的话...ProcessingGuarantee.AT_MOST_ONCE是pollKafkaBroker方法里头,调用完kafkaConsumer.poll之后调用kafkaConsumer.commitSync

    1.4K20

    【云原生进阶之PaaS中间件】第三章Kafka-4.4-消费者工作流程

    3、提交偏移量 当我们调用 poll 方法的时候, broker 返回的是生产者写入 Kafka 但是还没有被消费者读取过的记录,消费者可以使用 Kafka 来追踪消息分区里的位置,我们称之为偏移量...Kafka 但是还没有被消费者读取过的记录,消费者可以使用此记录来追踪消息分区里的位置,我们称之为偏移量 。...假设我们仍然使用默认的 5s 提交时间间隔 , 最近一次提交之后的 3s 发生了再均衡,再均衡之后 , 消费者从最后一次提交的偏移量位置开始读取消息。...这个时候如果发生再均衡 , 就会出现重复消息。 commitAsync() 也支持 , broker 作出响应时会执行调经常被用于记录提交错误或生成度量指标。...现在的问题是: 如果偏移量是保存在数据库里而不是 Kafka 里 , 那么消费者得到新分区时怎么知道该从哪里开始读取 ? 这个时候可以使用 seek() 方法。

    15610

    Kafka 消费者

    对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组两个消费者,那么会是这样的 在这个场景,消费组G1和消费组G2都能收到T1主题的全量消息,逻辑意义上来说它们属于不同的应用。...另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式连接Kafka与其他系统时非常有用。...我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。...每次发起异步提交时增加此序号,并且将此时的序号作为参数传给方法;当消息提交失败时,检查参数的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经更新的位移提交了)。...另外注意的是,seek()只是指定了poll()拉取的开始位移,这并不影响Kafka中保存的提交位移(当然我们可以seek和poll之后提交位移覆盖)。

    2.3K41

    Kafka基础篇学习笔记整理

    这种情况是可能出现的,达到了retries上限或delivery.timeout.ms上限之后,消息发送重试了多次,仍然没有发送成功。...等待异常通过人为干预的方式解除之后,再重新发往kafka如果消息数据是用户网页点击量、商品阅读量这类的数据,数据量大、对于数据处理延时也没有太多的要求,甚至异常情况下出现数据丢失也不是不能容忍。...因此,如果消息多个分区具有相同的键,那么它们每个分区中都将被视为不同的消息,无法实现全局的幂等性。 ---- kafka实现事务 kafka幂等性解决的是同一个消息被发送多次,发送至同一个分区。...为了能够使生产者能够感知到消息是否真的发送成功了,两种方式: 同步发送 异步发送 + 函数 添加回函数写法如下: @Test void testAsyncWithCallBack(...如果你正在使用消息队列,那么我建议你考虑设计时考虑毒丸消息的使用。确保你的消费者能够识别和正确处理毒丸消息,并在必要时能够停止消费并退出队列。

    3.7K21

    Consumer位移管理-Kafka从入门到精通(十一)

    kafkaConsumer的poll方法在用户主线程运行,这同时也表明:消费者组的rebalance、消息获取、coordinator管理、异步任务结果的处理、位移提交等操作这些都在主线程的,因此仔细优参数至关重要...Poll使用方法 Consumer订阅topic之后通常以事件循环的方法来获取消息读取,poll方法根据当前consumer的消费位移返回消息集合。...,如果看到了这样的报错,那么说明kafkaConsumer运用在多线程,对于目前的kafka设计而言,是不被允许的。...显然,若consumer消费之前就提交位移,那么多在位移提交完成之后,消费还未消费就崩溃了,这时候consumer重启,则会从新的位移开始消费,则这个已提交的位移会丢失。...相反的,若consumer消费之后再提交,则可以实现at least once。好消息是这个出现多次处理的情况,已经kafka0.11.0.0版本得到解决。

    40120
    领券