首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

如何在没有shell的Kafka 0.10.x中获取当前偏移量?

在没有shell的Kafka 0.10.x中获取当前偏移量,可以通过编写Java代码来实现。以下是一个示例代码:

代码语言:java
复制
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

public class KafkaOffsetFetcher {
    private static final int TIMEOUT = 10000;
    private static final int BUFFER_SIZE = 64 * 1024;

    public static void main(String[] args) {
        String topic = "your_topic";
        int partition = 0;
        String broker = "your_broker";
        int port = 9092;

        long offset = getLatestOffset(topic, partition, broker, port);
        System.out.println("Latest offset: " + offset);
    }

    private static long getLatestOffset(String topic, int partition, String broker, int port) {
        SimpleConsumer consumer = new SimpleConsumer(broker, port, TIMEOUT, BUFFER_SIZE, "offsetLookup");
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, Long> requestInfo = new HashMap<>();
        requestInfo.put(topicAndPartition, kafka.api.OffsetRequest.LatestTime());
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());

        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return -1;
        }

        long[] offsets = response.offsets(topic, partition);
        consumer.close();

        if (offsets.length > 0) {
            return offsets[0];
        } else {
            return -1;
        }
    }
}

上述代码使用Kafka的Java API来获取指定主题和分区的最新偏移量。你需要替换代码中的your_topicyour_brokerport为实际的主题、Kafka broker地址和端口。

这段代码创建了一个SimpleConsumer对象,然后构建了一个OffsetRequest请求,指定获取最新的偏移量。通过调用consumer.getOffsetsBefore(request)方法来发送请求并获取响应。如果响应中存在错误,会打印错误信息并返回-1。如果响应中存在偏移量,会返回最新的偏移量。

请注意,这只是一个简单的示例代码,实际使用时可能需要根据具体情况进行适当的修改和扩展。

页面内容是否对你有帮助?
有帮助
没帮助

相关·内容

领券