消息队列 CKafka 版

客户端问题

最近更新时间:2025-05-09 10:14:12

我的收藏

通用问题

Kafka Console 客户端测试时看不到数据如何处理?

消费者采用 latest 时候只会获取最后的数据,需要同时保持生产才可以看到相应数据。
改为 earliest 方式消费数据。

新接入客户端时生产或消费错误?

检查 telnet 是否联通(网络问题,是否 Kafka 和生产者在相同网络环境下)。
访问的 vip - port 是否配置正确。
topic 白名单是否开启,如果开启需要配置正确的 IP 才能访问。

客户端生产消息如何保证在同一分区是有序的?

如果 Topic 只有一个分区,那么消息会根据服务端收到的数据顺序存储,则数据就是分区有序的。
如果 Topic 有多个分区,可以在生产端指定这一类消息的 key,这类消息都用相同的 key 进行消息发送,CKafka 会根据 key 哈希取模选取其中一个分区进行存储,由于一个分区只能由一个消费者进行监听消费,此时消息就具有消息消费的顺序性了。
对于单个生产者来说,对单个分区的生产,是保持有序的。

客户端生产消息一般会与 Broker 建立多少个连接?

从单个客户端实例(new 一个 Producer 对象的角度)来看,和所有服务端建立的连接总数公式如下:总连接数 = 1~n(n 是 Broker 的数量)。
每个 Java Producer 端管理 TCP 连接的方式如下:
1. KafkaProducer 实例创建时启动 Sender 线程,从而创建与 bootstrap.servers 中所有 Broker 的 TCP 连接。
2. KafkaProducer 实例首次更新元数据信息之后,还会再次创建与集群中所有 Broker 的 TCP 连接。
3. 如果 Producer 端发送消息到某台 Broker 时发现没有与该 Broke r的 TCP 连接,那么也会立即创建连接。
4. 如果设置 Producer 端 connections.max.idle.ms 参数大于0,则步骤1中创建的 TCP 连接会被自动关闭;默认情况下该参数值是9分钟,即如果在9分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮您把该 TCP 连接关闭。如果设置该参数=-1,那么步骤1中创建的 TCP 连接将无法被关闭,从而成为“僵尸”连接。

使用客户端发送消息后,如何确定是否发送成功?

大部分客户端在发送之后,会返回 Callback 或者 Future,如果回调成功,则说明消息发送成功。
您还可以在控制台通过以下方式确认消息发送是否正常:
查看 Topic 的分区状态,可以实时看到各个分区的消息数量。
查看 Topic的 流量监控,可以看到生产消息的流量曲线。
可以通过打印 send 方法返回的 partition 和分区信息来确认是否成功:
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, messageKey, messageStr));
RecordMetadata recordMetadata = future.get();
log.info("partition: {}", recordMetadata.partition());
log.info("offset: {}", recordMetadata.offset());
如果能够打印出 partition 和 offset,则表示当前发送的消息在服务端已经被正确保存。此时可以通过消息查询的工具去查询相关位点的消息即可。 如果打印不出 partition 和 offset,则表示消息没有被服务端保存,客户端需要重试。




leader 切换是什么?

在建立一个新 topic 时,kafka broker 集群会进行每个 partition 的 leader 分配,将当前 topic 的 partition 均匀分布在每个 broker 上。 但在使用一段时间后,可能会出现 partition 在 broker 上分配不均, 或是出现客户端在生产消费中抛出 BrokerNotAvailableErrorNotLeaderForPartitionError 等异常。 这通常都是由于 partition 发生了 leader 切换导致的,典型场景如下:
当某个 partition leader 所在的 broker 发生某些意外情况,例如网络中断,程序崩溃,机器硬件故障导致无法与 broker controller 通信时, 当前 topic partition 将会发生 leader 切换,leader 将迁移至 follower partition 上。
当 kafka 集群设置 auto.leader.rebalance.enable = true 进行自动 reBalance,或是人工增加/削减 broker 并手动触发 reBalance 时, 由于涉及到 partition 自动平衡,此时也会出现 leader 切换
当由于 broker 意外中断,导致 leader 切换时:
如果客户端设置 ack = all,并且 min.insync.replicas > 1 ,由于消息同时在 leader partition 和 follower partition 都确认,因此消息不会丢失。
如果客户端设置 ack = 1 ,此时可能会出现设置在 replica.lag.time.max.ms 时间中的消息未同步到 follower partition,可能导致消息丢失
当由于 broker 正常,手动/自动(如实例升级、单可用区切换跨可用区、实例迁移等)发起 reBalance 导致 leader 切换时,不会导致消息丢失,原因如下:
如果客户端设置 ack = all,并且 min.insync.replicas > 1 ,由于消息同时在 leader partition 和 follower partition 都确认,因此消息不会丢失。
如果客户端设置 ack = 1 ,leader 切换将会自动同步 partition 中的 offset,因此消息不会丢失。

特定客户端已知问题

Kafka Java 客户端

Producer Partitioner 不均问题

Kafka Java Producer在2.3版本之前默认使用轮询分区策略进行消息发送,在2.4版本时候,开始提供可选的分区选择器,提供RoundRobinPartitioner和UniformStickyPartitioner两种分区策略, 默认使用UniformStickyPartitioner轮询分区策略进行分区选择。 其中RoundRobinPartitioner存在未实现的方法法 onNewBatch(),可能出现部分分区进行发送的现象,从而导致不均现象,针对这类问题,可以考虑使用自定义分区器MyRoundRobinPartitioner,在初始化生产者时候指定该类即可。由于社区推荐UniformStickyPartitioner分区器作为默认分区器,但是由于Kafka开源Java SDK,版本>=2.4.1 && < 3.3.0,其中Kafka Producer默认的粘性分区策略存在数据倾斜问题,尤其linger.ms为0(短时间的的连续消息均被发送至同一个分区内),客户如果对热点数据倾斜较为敏感的场景,可以考虑Kafka Java SDK升级到3.3.0以及以上,Kafka Java SDK Producer在3.3.0开发了Strictly Uniform Sticky Partitioner对此进行优化。高版本:写入一定要指明ack,幂等,linger.ms。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.producer;

import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The "Round-Robin" partitioner - MODIFIED TO WORK PROPERLY WITH STICKY PARTITIONING (KIP-480)
* <p>
* This partitioning strategy can be used when user wants to distribute the writes to all
* partitions equally. This is the behaviour regardless of record key hash.
*/
public class MyRoundRobinPartitioner implements Partitioner {
private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinPartitioner.class);
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Queue<Integer>> topicPartitionQueueMap = new ConcurrentHashMap<>();

public void configure(Map<String, ?> configs) {
}

/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
@Override
public int partition(
String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);
Integer queuedPartition = partitionQueue.poll();
if (queuedPartition != null) {
LOGGER.trace("Partition chosen from queue: {}", queuedPartition);
return queuedPartition;
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
int partition = availablePartitions.get(part).partition();
LOGGER.trace("Partition chosen: {}", partition);
return partition;
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
}

private int nextValue(String topic) {
AtomicInteger counter =
topicCounterMap.computeIfAbsent(
topic,
k -> {
return new AtomicInteger(0);
});
return counter.getAndIncrement();
}

private Queue<Integer> partitionQueueComputeIfAbsent(String topic) {
return topicPartitionQueueMap.computeIfAbsent(topic, k -> {
return new ConcurrentLinkedQueue<>();
});
}

public void close() {
}

/**
* Notifies the partitioner a new batch is about to be created. When using the sticky partitioner,
* this method can change the chosen sticky partition for the new batch.
*
* @param topic The topic name
* @param cluster The current cluster metadata
* @param prevPartition The partition previously selected for the record that triggered a new
* batch
*/
@Override
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
LOGGER.trace("New batch so enqueuing partition {} for topic {}", prevPartition, topic);
Queue<Integer> partitionQueue = partitionQueueComputeIfAbsent(topic);
partitionQueue.add(prevPartition);
}
}

Consumer coordinator 无法刷新重连

kafka java consumer客户端,版本>=2.6.x&&<3.2.1,在 commitOffsetsAsync 异常后,无法恢复与coordinator的连接,从而导致后续无法提交位点,该issue在3.2.1以及之后的版本修复,建议存在该问题的客户端,可要么升级到3.2.1进行解决,要么回滚到<2.6.x版本。

Consumer 粘性分配器重复消费问题

kafka java consumer客户端<2.3.0版本,如果使用Sticky assignor,会出现分区重复分配,即一个分区可能同时分配给同一个消费组的不同消费者消费,从而造成重复消费问题,如果使用Sticky assignor,建议客户调整版本到2.3.0以及以上,避免该问题。

Go sarama客户端

Go sarama 元数据刷新问题

1. Go sarama 客户端元数据更新参数由Metadata.Full 控制,设计实现上考虑方便默认为true,会更新全量Topic的元数据,对性能有一定影响,对于大量topic的情况下,可能影响客户使用kafka生产和消费的性能,建议客户修改该配置为false。
2. Go sarama 客户端默认的元数据刷新时间参数由Metadata.RefreshFrequency控制,默认值为10分钟,容易触及 broker 默认的连接空闲超时(10min)导致连接被中断,建议调整成5分钟与kafka java 标准sdk默认参数保持一致。

Go sarama Offset 提交过期问题

Go sarama客户端CommitOffset请求可以指定offset的过期时间,该参数由Retention.Minutes控制,默认值为0,该值会导致提交后位点会立马过期,建议设置改值与Ckafka位点过期时间保持一致,建议为7天。

Go sarama cluster 客户端的使用问题

目前 sarama cluster 已经不再维护,存在诸多缺陷,目前存在如下已知缺陷:消费者客户端底层代码 PartitionConsumer 会调用 FetchRequest 解包FetchResponse,此时 sarama cluster 实现没有按照标准协议实现,存在缺陷,即使服务端实际返回全量的元数据,但是客户端有概率出现解析到的元数据不全的问题,从而出现无法生产或者消费,报错特征为:response did not contain all the expected topic/partition blocks。
具体场景举例:在客户实例进行升降配操作,因为会增加节点,此时会触发客户端进行元数据更新,如果在元数据中,按照 test-0,a-0,test-1,a-1,test-2 的顺序返回 test 的3个分区,a的2个分区的全量元数据,虽然 broker 会返回全量的元数据,但是存在一定概率出现顺序有间隔,sarama cluster 的客户端解析时候会出现解析到不完整的元数据,比如只能解析到 a-1,test-2两个元数据,引发如上报错问题,即响应没有包含预期所有分区元数据,从而导致无法生产或者消费。
针对该情况,标准协议实现的客户端是不会出现的,例如 Java,confluent go,因此建议客户在升降配前,务必了解是否使用该客户端,因为升降配会导致元数据更新的场景十分频繁,如果有,则强烈建议客户升级为 confluent go 客户端再进行升降配,如果无法升级,在出现该问题后立刻进行客户端重启,则有概率恢复正常。

trpc-go 客户端

trpc-go kafka 库会出现消费卡住问题

trpc-go 消费端使用 kafka 库的默认配置情况下,Handle 是同步的嵌入到消息接收整个流程中的。这意味着,如果在 Handle 中做耗时非常长的任务,或者在 Handle 中返回错误,均会导致框架认为消息消费不成功,并一直重试,于是出现了不断重试同一个消息的情况,会导致消费者一直卡在当前这个消息的位置,因此,推荐客户在使用trpc-go时候,自行处理错误重试,无论何种情况不要将任何错误返回给框架。


如果遇到产品相关问题,您可咨询 在线客服寻求帮助。