在没有shell的Kafka 0.10.x中获取当前偏移量,可以通过编写Java代码来实现。以下是一个示例代码:
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class KafkaOffsetFetcher {
private static final int TIMEOUT = 10000;
private static final int BUFFER_SIZE = 64 * 1024;
public static void main(String[] args) {
String topic = "your_topic";
int partition = 0;
String broker = "your_broker";
int port = 9092;
long offset = getLatestOffset(topic, partition, broker, port);
System.out.println("Latest offset: " + offset);
}
private static long getLatestOffset(String topic, int partition, String broker, int port) {
SimpleConsumer consumer = new SimpleConsumer(broker, port, TIMEOUT, BUFFER_SIZE, "offsetLookup");
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, Long> requestInfo = new HashMap<>();
requestInfo.put(topicAndPartition, kafka.api.OffsetRequest.LatestTime());
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return -1;
}
long[] offsets = response.offsets(topic, partition);
consumer.close();
if (offsets.length > 0) {
return offsets[0];
} else {
return -1;
}
}
}
上述代码使用Kafka的Java API来获取指定主题和分区的最新偏移量。你需要替换代码中的your_topic
、your_broker
和port
为实际的主题、Kafka broker地址和端口。
这段代码创建了一个SimpleConsumer
对象,然后构建了一个OffsetRequest
请求,指定获取最新的偏移量。通过调用consumer.getOffsetsBefore(request)
方法来发送请求并获取响应。如果响应中存在错误,会打印错误信息并返回-1。如果响应中存在偏移量,会返回最新的偏移量。
请注意,这只是一个简单的示例代码,实际使用时可能需要根据具体情况进行适当的修改和扩展。
领取专属 10元无门槛券
手把手带您无忧上云