ConsumerCoordinator继承于AbstractCoordinator,也是其唯一的实现类。AbstractCoordinator定义了有关集群协调的逻辑,定义了消费者与特定的broker(cordinator)交互的逻辑,供消费者加入消费组、探知消费组状态。
/**
* This class manages the coordination process with the consumer coordinator.
*/
public final class ConsumerCoordinator extends AbstractCoordinator {
/**
* AbstractCoordinator implements group management for a single group member by interacting with
* (很长的注释,建议读者阅读一遍,此处省略)
* ...
*/
public abstract class AbstractCoordinator implements Closeable {
ConsumerCoordinator主要负责与消费者组coordinator间的联系,比如发现coordinator、加入group、还有查询提交的offset、提交offset。
在加入group后,还会启动HeartBeatThread维持与coordinator的心跳,维持成员状态。
在KafkaConsumer::pollOnce中,总是会先调用一次ConsumerCoordinator::poll。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
client.maybeTriggerWakeup();
long startMs = time.milliseconds();
coordinator.poll(startMs, timeout);
...
在poll调用中除了处理offset commit,还有与coordinator取得联系、完成rebalance。
阅读过Kafka消费者Rebalance机制的话[1]就知道:
ensureCoordinatorReady
会与某broker联系,找到coordinator,并与之建立连接ensureActiveGroup
会检查自己的group状态,与coordinator联系,完成加入group的流程。/**
* Block until the coordinator for this group is known and is ready to receive requests.
*/
public synchronized void ensureCoordinatorReady() {
// Using zero as current time since timeout is effectively infinite
ensureCoordinatorReady(0, Long.MAX_VALUE);
}
然后阅读ensureCoordinatorReady(long startTimeMs, long timeoutMs)
即可,大概逻辑就是:
lookupCoordinator
向某个broker发送寻找coordinator的请求,等待收到响应,收到响应后,FindCoordinatorResponseHandler进行处理,与coordinator建立连接ConsumerCoordinator、AbstractCoordinator有大量的逻辑判断代码,又会经常修改自身状态和SubscriptionState。如果单纯地去记忆发出xx请求,收到响应/报错后做什么,根本无法记下来,而且也无助于理解。个人觉得:
ConsumerCoordinator和HeartBeatThread都有以下特点,可能有助于代码的阅读:
synchronized (AbstractCoordinator.this)
进行同步