探究Elasticsearch7.10.2 节点之间的故障探测以及熔断故障是怎么做的,思考生产上的最佳实践。
服务端故障场景:
从服务端如何应对这些场景以及客户端如何应对这些场景。
Elasticsearch 故障监测官方文档地址:
https://www.elastic.co/guide/en/elasticsearch/reference/7.10/cluster-fault-detection.html
leader check 和 follower check 实际上都是线程,由 same 线程池执行,same 线程池是一种DIRECTl类型的线程池,当某个任务不需要在独立的线程执行,又想被线程池管理时,于是诞生了这种特殊类型的线程池:
在调用者线程中执行任务,这个same线程池是对用户不可见的,所以通过_cat/thread_pool看不到这个线程池。
map.put(Names.SAME, ThreadPoolType.DIRECT);
static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
关于DirectExecutorService的关键定义:
@Override
public void execute(Runnable command) {
command.run(); // 直接运行,并没有实际的线程池
rethrowErrors(command);
}
Elasticsearch 允许这些检查偶尔失败或超时而不采取任何行动。只有在连续多次检查失败后,才认为节点出现故障。
cluster.fault_detection. settings 相关配置如下:
💡 注意事项:
选出的主节点检测到某个节点已断开连接,这种情况会被立即认为是故障,主节点绕过超时和重试设置值并尝试从集群中删除节点。
类似地,如果节点检测到选出的主节点已断开连接,则这种情况将被视为立即故障。节点绕过超时和重试设置并重新启动其发现阶段以尝试查找或选举新的主节点。
因为 master节点挂掉的时候,可能有两种情况, 主master挂掉和备master挂掉。如果主master挂掉就会触发选举。
所以分析一下相关选举配置如下:
在节点被判断离开集群的时候,会触发一个node-left
的状态更新任务。
this.followersChecker = new FollowersChecker(settings, transportService, this::onFollowerCheckRequest, this::removeNode,
nodeHealthService);
private void removeNode(DiscoveryNode discoveryNode, String reason) {
synchronized (mutex) {
if (mode == Mode.LEADER) {
// 提交一个node-left状态更新任务
masterService.submitStateUpdateTask("node-left",
new NodeRemovalClusterStateTaskExecutor.Task(discoveryNode, reason),
ClusterStateTaskConfig.build(Priority.IMMEDIATE),
nodeRemovalExecutor,
nodeRemovalExecutor);
}
}
}
之后就进入到状态更新阶段:org.elasticsearch.cluster.service.MasterService#runTasks:
这里对应的线程池为 org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor,于是进入到其的execute方法:
@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
boolean removed = false;
// 如果任务指定的节点存在于当前集群状态中,则从DiscoveryNodes.Builder对象中删除该节点,
// 并将removed标志设置为true。如果该任务指定的节点不存在于当前集群状态中,则记录一条调试级别的日志,表示忽略该任务。
for (final Task task : tasks) {
if (currentState.nodes().nodeExists(task.node())) {
remainingNodesBuilder.remove(task.node());
removed = true;
} else {
logger.debug("node [{}] does not exist in cluster state, ignoring", task);
}
}
// 如果没有删除任何节点,则返回当前的集群状态
if (!removed) {
// no nodes to remove, keep the current cluster state
return ClusterTasksResult.<Task>builder().successes(tasks).build(currentState);
}
// 剩余节点
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
return getTaskClusterTasksResult(currentState, tasks, remainingNodesClusterState);
}
protected ClusterTasksResult<Task> getTaskClusterTasksResult(ClusterState currentState, List<Task> tasks,
ClusterState remainingNodesClusterState) {
ClusterState ptasksDisassociatedState = PersistentTasksCustomMetadata.disassociateDeadNodes(remainingNodesClusterState);
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
return resultBuilder.build(allocationService.disassociateDeadNodes(ptasksDisassociatedState, true, describeTasks(tasks)));
}
随后调用 org.elasticsearch.cluster.routing.allocation.AllocationService#disassociateDeadNodes(org.elasticsearch.cluster.ClusterState, boolean, java.lang.String)
private void disassociateDeadNodes(RoutingAllocation allocation) {
for (Iterator<RoutingNode> it = allocation.routingNodes().mutableIterator(); it.hasNext(); ) {
RoutingNode node = it.next();
// 如果该节点的ID存在于RoutingAllocation对象的数据节点列表中,则表示该节点是活动节点,继续遍历下一个节点
if (allocation.nodes().getDataNodes().containsKey(node.nodeId())) {
// its a live node, continue
continue;
}
// 否则,这个节点就离线。
// 对于每个已离线的节点,该方法遍历该节点上的所有分片,并为每个分片创建一个UnassignedInfo对象,表示该分片未分配的原因是节点离线。
// 然后,该方法调用failShard方法,将该分片标记为失败,并将其移动到未分配状态。该方法还会从RoutingNode列表中删除已离线的节点
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT,
Collections.emptySet());
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetadata, allocation.changes());
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
// since it relies on the fact that the RoutingNode exists in the list of nodes
it.remove();
}
}
org.elasticsearch.cluster.routing.RoutingNodes#failShard, 将分片移动到未分配状态或完全删除分片(如果是重分配relocation目标分片),具体的逻辑如下:
这里我们关注一下,怎么做的主从切换, 具体进入到如下逻辑:
// fail actual shard
if (failedShard.initializing()) {
if (failedShard.relocatingNodeId() == null) {
if (failedShard.primary()) {
// promote active replica to primary if active replica exists (only the case for shadow replicas)
unassignPrimaryAndPromoteActiveReplicaIfExists(failedShard, unassignedInfo, routingChangesObserver);
} else {
// initializing shard that is not relocation target, just move to unassigned
moveToUnassigned(failedShard, unassignedInfo);
}
}
org.elasticsearch.cluster.routing.RoutingNodes#unassignPrimaryAndPromoteActiveReplicaIfExists
private void unassignPrimaryAndPromoteActiveReplicaIfExists(ShardRouting failedShard, UnassignedInfo unassignedInfo,
RoutingChangesObserver routingChangesObserver) {
assert failedShard.primary();
// 这里会从所有副本分片中找最高版本的副本
ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId());
if (activeReplica == null) {
moveToUnassigned(failedShard, unassignedInfo);
} else {
// 将主分片移动到未分配状态,并将其降级为副本, 这里还会将主分片加入到unassignedShards列表中, 根据索引的配置会做分片恢复
movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo);
// 将活动副本升级为主分片,并通知路由变更观察器进行相应的处理
promoteReplicaToPrimary(activeReplica, routingChangesObserver);
}
}
做好路由的变更之后,master会同步集群状态给各个节点,至此就完成了主副本分片切换。
总的来说,只要探测到data-node
离线肯定就会进行主从切换,所以这部分的耗时主要在同步集群状态上,在网络等资源良好的情况下,基本是秒级的。
以 java 客户端为例,当配置多个 IP , 客户端会自动执行负载均衡, 默认是轮询存活的节点。
意味着每个新的请求都会发送到下一个 IP 地址,当所有的 IP 地址都被使用后,再从头开始。
这样可以确保所有的 Elasticsearch 节点都有均等的机会处理请求,避免了某个节点过载的问题。
当一个节点失败了,会加入到blaklist:
private void onFailure(Node node) {
while(true) {
DeadHostState previousDeadHostState =
blacklist.putIfAbsent(node.getHost(), new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER));
if (previousDeadHostState == null) {
if (logger.isDebugEnabled()) {
logger.debug("added [" + node + "] to blacklist");
}
break;
}
if (blacklist.replace(node.getHost(), previousDeadHostState,
new DeadHostState(previousDeadHostState))) {
if (logger.isDebugEnabled()) {
logger.debug("updated [" + node + "] already in blacklist");
}
break;
}
}
failureListener.onFailure(node);
}
在发送请求的时候,selectNodes 会过滤出活跃的节点(没有被列入黑名单或已经到了重试时间的节点),然后使用一个 NodeSelector 对象对这些节点进行选择。
可以看出是轮询的方式。
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
/*
* Sort the nodes into living and dead lists.
*/
List<Node> livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - blacklist.size()));
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) {
DeadHostState deadness = blacklist.get(node.getHost());
// 没有被列入黑名单或已经到了重试时间的节点 , 默认是1min重试间隔, 只有1min之后才会再次访问这个节点, 无法修改,写死的
if (deadness == null || deadness.shallBeRetried()) {
livingNodes.add(node);
} else {
deadNodes.add(new DeadNode(node, deadness));
}
}
注:这里的挂掉都是指网卡挂了,并不是被kill或者关机,这里两者有本质区别:
这里搭建 6 个节点, 其中 3 个节点为 master 角色, 3 个节点为 data 角色,使用的是虚拟机部署
第一次检测失败:2023-07-14T20:40:52
第二次检测失败:2023-07-14T20:41:03
第三次检测失败:2023-07-14T20:41:14
2023-07-14T20:41:14,427 选举结束,再发布集群状态。
值得注意的是,客户端不断使用非挂掉机器的ip 访问es, 只有选举期间是无法写入的,其他时间都可以写入,但是无法更新集群状态(比如创建索引等),因为这个时候master已经挂了。
直接看最后一次的 timeout:2023-07-14T21:04:54 ,耗时就是大概30s。
值得注意的是,客户端不断使用非挂掉机器的ip 访问es, 因为只是关闭了备master节点,所以期间集群状态一直都是green的。对客户端新的请求是没有影响的。
直接看最后一次check失败:2023-07-14T21:17:58 , 相差大概也是30s,这个时候标记这个数据节点的分片为failed(会做主从分片切换), 并计划恢复那些丢失的分片,这个时候集群为Yellow状态
由于使用的是默认配置index.unassigned.node_left.delayed_timeout 为1m, 而且索引的数据量很小,所以集群很快恢复了分片,转入到GREEN状态
这个场景其实是故障一和故障三的结合, 从服务端流程可以看出来,先进行了master重选举,再然后是处理数据节点的离线。原因很简单,因为只有active master 节点才有follower 检测,所以有这个顺序。
2023-07-14T21:34:54 选举出新 master
2023-07-14T21:35:26 探测到 data 节点
这块主要看客户端怎么做的容错,以java客户端为例,
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.2</version>
</dependency>
在服务端熔断的情况下,服务端会直接返回异常,客户端会抛出异常, 这个时候需要判断返回码429, 并判断熔断类型是否为TRANSIENT, 如果是需要不断重试, 如果是PERMANENT 则可以放弃重试。
注:这里的挂掉都是指网卡挂了,并不是被 kill 或者关机,这里两者有本质区别:
故障场景 | 客户端影响 | 大概恢复时长 | 优化手段以及对应时长 |
---|---|---|---|
场景一:单个 active master 挂掉 | 1.访问故障节点的请求都失败探测active 故障之前: 2.可对已有索引进行读写,但是无法更新集群状态,比如创建索引等选举期间:3.其他读请求可以成功进行,但是可能会读到旧数据 | 1. 取决故障节点恢复时间, 这个对客户端没影响,只要客户端能重试请求到其他存活es节点就可以2.探测master挂掉需要30s, 期间可读写索引,但是无法更新集群状态3. 3个master节点 选举只需要几s,具体看网络环境和节点个数,未选举出新的master时,只能读取 | 可以降低节点的探测超时时间,改成每次探测超时时间为3s,而不是10s,这样就可以将探测时间降低到9s |
场景二:单个备用master挂掉 | 1.访问故障节点的请求都失败 | 1. 取决故障节点恢复时间, 这个对客户端没影响,只要客户端能重试请求到其他存活es节点就可以,不会发生重选举 | |
场景三:单个data node挂掉 | 1.访问故障节点的请求都失败 2. 访问这个数据节点的分片的请求都失败 | 1. 取决故障节点恢复时间, 这个对客户端没影响,只要客户端能重试请求到其他存活es节点就可以2.需要探测数据节点掉线30s,所以30s之内的这个数据节点负责的分片都无法写入和读取。 3.主副本分片切换之后,同步集群状态大概几s,具体看网络环境和节点个数 | 可以降低节点的探测超时时间,改成每次探测超时时间为3s,而不是10s,这样就可以将探测时间降低到9s |
场景四:active master节点和data node 节点同时挂掉 | 场景一和场景三的结合,会先经历场景一,然后发生场景三 | 1.探测master挂掉需要30s, 期间可读写索引,但是无法更新集群状态,选举需要秒级时间(3个master在集群压力小的情况下,通常1s之内)2.需要探测数据节点掉线30s,所以30s之内的这个数据节点负责的分片都无法写入 | 可以降低节点的探测超时时间,改成每次探测超时时间为3s,而不是10s,这样就可以将探测时间降低到18s |
场景五:服务端熔断 | 1. 访问熔断节点的请求都失败 | 1.取决服务端内存释放情况,如果只是临时熔断,理论上几s钟就能释放一些 | 客户端应该判断服务端异常是否为熔断,如果是临时熔断应该做重试处理,理论上重试多少次都没关系,只要临时熔断,总会自动恢复 |
He Chengbo,互联网安全独角兽公司资深工程师,死磕 Elasticsearch 星球资深活跃技术专家。
在此,感谢铭毅老师提供这个宝贵的平台发表文章,也感谢您给予的指导和鼓励!
本文分享自 铭毅天下Elasticsearch 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!