本篇是《关于Kafka producer管理TCP连接的讨论》的续篇,主要讨论Kafka java consumer是如何管理TCP连接。实际上,这两篇大部分的内容是相同的,即consumer也是把TCP连接的管理交由底层的Selector类(org.apache.kafka.common.network)和NetworkClients类(org.apache.kafka.clients)来维护。我依然以“何时创建/创建多少/何时关闭/潜在问题/总结”的顺序来讨论。和上一篇一样,本文将无差别地混用名词TCP和Socket。
一、何时创建TCP连接
首先明确的是,在构建KafkaConsumer实例时是不会创建任何TCP连接的;另外在调用诸如subscribe或assign的时候也不会创建任何TCP连接。那么TCP连接是在什么时候创建的呢?严格来说有几个可能的时间点。从粗粒度层面来说,我们可以安全地认为Socket连接是在调用consumer.poll()创建的;从细粒度层面来说,TCP连接创建的时机有3个:1. 请求METADATA时;2. 进行组协调时;3. 发送数据时。
二、创建多少个TCP连接
对于每台broker而言,kafka consumer实例通常会创建3个TCP连接,第一个TCP连接是consumer请求集群元数据时创建的,之后consumer会使用这个Socket继续请求元数据以及寻找group对应的coordinator,如下列日志所示:
[2019-01-01 17:38:22,301] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending METADATA with correlation id 2 tonode -1(org.apache.kafka.clients.NetworkClient:492)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR with correlation id 0 tonode -1(org.apache.kafka.clients.NetworkClient:492)
至于这里为什么是node -1是因为首次请求元数据时尚不确定broker.id,所以只能先用-1替代。
第二个TCP连接供consumer执行组协调操作使用,这里的组协调操作包括:JOIN_GROUP(加入组)、SYNC_GROUP(等待组分配方案)、HEARTBEAT(心跳请求)、OFFSET_FETCH(获取位移)、OFFSET_COMMIT(提交位移)以及其他请求(比如LEAVE_GROUP,但本例中没有演示组成员退出的情形,故日志中没有出现这个请求类型),如下列日志所示:
[2019-01-01 17:38:22,379] TRACE [Consumer clientId=consumer-1, groupId=test] Sending JOIN_GROUP ]} with correlation id 3 tonode 2147483647(org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,382] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive fromnode 2147483647for JOIN_GROUP with correlation id 3, received ]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,386] TRACE [Consumer clientId=consumer-1, groupId=test] Sending SYNC_GROUP ]} with correlation id 5 tonode 2147483647(org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,388] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive fromnode 2147483647for SYNC_GROUP with correlation id 5, received (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,396] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_FETCH ]},]}]} with correlation id 6 tonode 2147483647(org.apache.kafka.clients.NetworkClient:492)
[2019-01-03 17:38:22,397] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive fromnode 2147483647for OFFSET_FETCH with correlation id 6, received ]},]}],error_code=0} (org.apache.kafka.clients.NetworkClient:810)
...
[2019-01-01 17:38:23,401] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_COMMIT ]},]}]} with correlation id 10 tonode 2147483647(org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,403] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive fromnode 2147483647for OFFSET_COMMIT with correlation id 10, received ]},]}]} (org.apache.kafka.clients.NetworkClient:810)
上面标红的节点ID看上去有些奇怪,实际上它是由Integer.MAX_VALUE - coordinator的broker.id计算得来的,因为我的测试环境中只有一台broker且该id是0,所以这个Socket连接的节点ID就是Integer.MAX_VALUE,即2147483647。针对这个node ID的计算方式,Kafka代码是故意为之的,目的就是要让组协调请求和真正的数据获取请求使用不同的Socket连接。
第三个Socket连接就非常好理解了,就是用于发送FETCH请求的。当consumer代码使用第一个Socket连接获取到集群元数据之后,每个broker的真实ID已经缓存在consumer本地的内存中,此时代码会使用真实的broker ID创建第三个Socket连接并用于消息获取,如下列日志所示:
[2019-01-01 17:38:23,424] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH with correlation id 11 tonode 0(org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,927] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive fromnode 0for FETCH with correlation id 11, received (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:23,928] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH with correlation id 12 tonode 0(org.apache.kafka.clients.NetworkClient:492)
...
上面标红的节点0是真实的broker.id,可见consumer是使用这个Socket进行消息获取操作的。值得一提的是,当这个Socket连接成功建立之后,第一个Socket连接就会被废弃掉,之后所有的元数据请求皆通过第三个Socket发送。
三、何时关闭TCP连接
四、可能的问题?
Consumer端和producer端的问题是一样的,即第一个Socket连接仅仅是为了首次(最多也就是几次)获取元数据之用,后面就会被废弃掉。根本的原因在于它使用了“假”的broker id去注册,当 后面consumer获取了真实的broker id之后它无法区分哪个broker id对应这个假ID,所以只能重新创建另外的Socket连接。
五、总结
最后总结一下当前的结论,针对最新版本Kafka(2.1.0)而言,Java consumer端管理TCP连接的方式是:
1. KafkaConsumer实例初始化时不会创建任何Socket连接
2. KafkaConsumer实例拿到元数据信息之后随机寻找其中一个broker去定位对应的coordinator,然后向coordinator所在broker创建第二个Socket连接。之后所有的组协调请求处理都经由该Socket
3. 步骤1中创建的TCP连接只用于首次获取元数据信息,后面会被废弃掉
5. 当前consumer判断是否存在与某broker的TCP连接依靠的是broker id,这是有问题的,依靠对可能是更好的方式
领取专属 10元无门槛券
私享最新 技术干货