Fetcher是与KafkaConsumer交互的各大组件之一。在各大博客上,比如某csdn博客中提到,Fetcher的作用是:
Fetcher负责组织拉取消息的请求,以及处理返回。不过需要注意它并不做网络IO,网络IO还是由ConsumerNetworkClient完成。它其实对应生产者中的Sender。
Fetcher负责拉取什么消息?如何处理消息?它到底有什么功能,我们需要查阅源码。然而在类声明上的注释只有可怜兮兮的一句话:
所以我们要转换角度去观察。 首先,Fetcher没有继承Runnable或Thread,那么它只是一个API组件,而不是单独运行的线程。
然后要观察一个类的作用,可以从两个角度入手:
与上游组件的交互,就是指它所暴露出的public方法,因为只有public方法能被其它组件调用,这就是它提供的功能。所以我们要研究下这些public方法。 从Idea左侧栏->Structure,点击"Show non-public"按钮,隐藏非公有方法
从方法栏可以看到,Fetcher主要提供了四块功能:
fetchedRecords
可知,这些方法作用都与从服务器拉取消息有关,能够向服务器发送消息。查看Fetcher的成员变量可知,Fetcher主要与ConsumerNetworkClient组件交互。后者负责请求、响应的IO[1],那么前者就负责构造请求、处理响应。
搜索client.
查看与ConsumerNetworkClient发生交互的地方,总共有8处。
其中client.send
、client.poll(
代表发送请求、等待响应的调用。
// ConsumerNetworkClient.java
public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder)
public boolean poll(RequestFuture<?> future, long timeout)
通过对这两种方法的使用,可以向ConsumerNetworkClient发送请求,并添加处理响应的逻辑。有两种
Fetcher利用监听器的机制,添加异步响应的逻辑。
比如sendFetches
中,先调用client.send
发出请求,再调用addListener添加请求完成后的回调逻辑。
Fetcher调用client.send
发出请求,调用client.poll
等待请求完成,添加同步响应的逻辑。以getTopicMetadata为例
在sendMetadataRequest内部调用了client.send
发送请求
查看poll可知,内部会循环等待,直到请求完成。
sendFetches调用client.send
发送请求,通过addListener
设置请求完成后的逻辑。在onSuccess
中将拉取的数据,按照TopicPartition分别添加到completedFetches
public int sendFetches() {
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
...
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
...
...
// 发送请求、设置回调逻辑
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
...
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
...
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion())); // 添加到completedFetches
}
...
}
@Override
public void onFailure(RuntimeException e) {
...
}
});
}
return fetchRequestMap.size();
}
sendFetches在请求完成后,通过OnSuccess执行成功逻辑
外界调用fetchedRecords来收获已经收到的消息。fetchedRecords从completedFetches取出拉取的消息,通过while循环,将消息从CompletedFetch类型转为PartitionRecords,再转为List<ConsumerRecord<K, V>>,添加到fetched中。
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords;
try {
// 通过循环完成拉取到的消息的加工,最多拉取maxPollRecords条消息
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
CompletedFetch completedFetch = completedFetches.peek(); // 从completedFetches查看拉取的消息
if (completedFetch == null) break; // 没有消息了,退出循环
nextInLineRecords = parseCompletedFetch(completedFetch); // 处理成PartitionRecords类型,也就是一个分区上拉到的数据
completedFetches.poll(); // 去除队头
} else {
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining); // 处理成List<ConsumerRecord<K, V>>类型
TopicPartition partition = nextInLineRecords.partition;
// 将拉取到的消息放入fetched
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
如图示:
在fetchedRecords的循环中,一条CompletedFetch的变化轨迹
Fetcher向上游提供了拉取消息、获取topic元数据、获取/刷新offset的功能,并由ConsumerNetworkClient完成请求/响应的IO操作。