前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch 底层系列之分片恢复解析

Elasticsearch 底层系列之分片恢复解析

原创
作者头像
技术姐
修改2018-12-10 14:31:33
9890
修改2018-12-10 14:31:33
举报
文章被收录于专栏:时序数据库专栏

    我们是基础架构部,腾讯云 CES/CTSDB 产品后台服务的支持团队,我们拥有专业的ES开发运维能力,为大家提供稳定、高性能的服务,欢迎有需求的童鞋接入,同时也欢迎各位交流 Elasticsearch、Lucene 相关的技术!

1. 前言

    在线上生产环境中,对于大规模的ES集群出现节点故障的场景比较多,例如,网络分区、机器故障、集群压力等等,都会导致节点故障。当外在环境恢复后,节点需要重新加入集群,那么当节点重新加入集群时,由于ES的自平衡策略,需要将某些分片恢复到新加入的节点上,那么ES的分片恢复流程是如何进行的呢?遇到分片恢复的坑该如何解决呢?(这里线上用户有碰到过,当恢复的并发调得较大时,会触发es的bug导致分布式死锁)?分片恢复的完整性、一致性如何保证呢?,本文将通过ES源码一窥究竟。注:ES分片恢复的场景有多种,本文只剖析最复杂的场景--peer recovery。

2. 分片恢复总体流程

    ES副本分片恢复主要涉及恢复的目标节点和源节点,目标节点即故障恢复的节点,源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求,源节点接收到请求后主要分两阶段来处理。第一阶段,对需要恢复的shard创建snapshot,然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过,否则对比shard的segment文件差异,将有差异的segment文件发送给target node。第二阶段,为了保证target node数据的完整性,需要将本地的translog发送给target node,且对接收到的translog进行回放。整体流程如下图所示。

分片恢复流程
分片恢复流程

    以上为恢复的总体流程,具体实现细节,下面将结合源码进行解析。

3. 副本分片流程

3.1. 目标节点请求恢复

    本节,我们通过源码来剖析副本分片的详细恢复流程。ES根据metadata的变化来驱动各个模块工作,副本分片恢复的起始入口为IndicesClusterStateService.createOrUpdateShards,这里首先会判断本地节点是否在routingNodes中,如果在,说明本地节点有分片创建或更新的需求,否则跳过。逻辑如下:

代码语言:txt
复制
private void createOrUpdateShards(final ClusterState state) {
    RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
    if (localRoutingNode == null) {
        return;
    }
    DiscoveryNodes nodes = state.nodes();
    RoutingTable routingTable = state.routingTable();
    for (final ShardRouting shardRouting : localRoutingNode) {
        ShardId shardId = shardRouting.shardId();
        if (failedShardsCache.containsKey(shardId) == false) {
            AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
            Shard shard = indexService.getShardOrNull(shardId.id());
            if (shard == null) { // shard不存在则需创建
                createShard(nodes, routingTable, shardRouting, state);
            } else { // 存在则更新
                updateShard(nodes, shardRouting, shard, routingTable, state);
            }
        }
    }
}

    副本分片恢复走的是createShard分支,在该方法中,首先获取shardRouting的类型,如果恢复类型为PEER,说明该分片需要从远端获取,则需要找到源节点,然后调用IndicesService.createShard:

代码语言:txt
复制
private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
    DiscoveryNode sourceNode = null;
    if (shardRouting.recoverySource().getType() == Type.PEER)  {
        sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); // 如果恢复方式是peer,则会找到shard所在的源节点进行恢复
        if (sourceNode == null) {
            return;
        }
    }
        RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
        indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), repositoriesService, failedShardHandler);
        ... ...
}

private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
    DiscoveryNode sourceNode = null;
    if (!shardRouting.primary()) {
        ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
        if (primary.active()) {
            sourceNode = nodes.get(primary.currentNodeId()); // 找到primary shard所在节点
        }
    } else if (shardRouting.relocatingNodeId() != null) {
        sourceNode = nodes.get(shardRouting.relocatingNodeId()); // 找到搬迁的源节点
    } else {
         ... ...
    }
    return sourceNode;
}

    源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中,首先获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为 StartRecoveryRequest,通过RPC发送到源节点:

代码语言:txt
复制
... ...
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
... ...
// 创建recovery quest 
request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
... ...
// 向源节点发送请求,请求恢复
cancellableThreads.execute(() -> responseHolder.set(
        transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
                new FutureTransportResponseHandler<RecoveryResponse>() {
                    @Override
                    public RecoveryResponse newInstance() {
                        return new RecoveryResponse();
                    }
                }).txGet()));

    注意,请求的发送是异步的,但是这里会调用 PlainTransportFuture.txGet() 方法,等待对端的回复,否则将一直 阻塞 。至此,目标节点已将请求发送给源节点,源节点的执行逻辑随后详细分析。

3.2 源节点处理恢复请求

    源节点接收到请求后会调用恢复的入口函数recover:

代码语言:txt
复制
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
    @Override
    public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
        RecoveryResponse response = recover(request);
        channel.sendResponse(response);
    }
}

    recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:

代码语言:txt
复制
public RecoveryResponse recoverToTarget() throws IOException { // 恢复分为两阶段
    try (Translog.View translogView = shard.acquireTranslogView()) { 
        final IndexCommit phase1Snapshot;
        try {
            phase1Snapshot = shard.acquireIndexCommit(false);
        } catch (Exception e) {
            IOUtils.closeWhileHandlingException(translogView);
            throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
        }
        try {
            phase1(phase1Snapshot, translogView);  // 第一阶段,比较syncid和segment,然后得出有差异的部分,主动将数据推送给请求方
        } catch (Exception e) {
            throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
        } finally {
            try {
                shard.releaseIndexCommit(phase1Snapshot);
            } catch (IOException ex) {
                logger.warn("releasing snapshot caused exception", ex);
            }
        }
        // engine was just started at the end of phase 1
        if (shard.state() == IndexShardState.RELOCATED) {
            throw new IndexShardRelocatedException(request.shardId());
        }
        try {
            phase2(translogView.snapshot()); // 第二阶段,发送translog
        } catch (Exception e) {
            throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
        }
        finalizeRecovery();
    }
    return response;
}

    从上面的代码可以看出,恢复主要分两个阶段,第一阶段恢复segment文件,第二阶段发送translog。这里有个关键的地方,在恢复前,首先需要获取translogView及segment snapshot,translogView的作用是保证当前时间点到恢复结束时间段的translog不被删除,segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:

代码语言:txt
复制
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
    final Store store = shard.store(); //拿到shard的存储信息
    recoverySourceMetadata = store.getMetadata(snapshot); // 拿到snapshot的metadata
    String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
            String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
            final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId);
            if (recoverWithSyncId) { // 如果syncid相等,再继续比较下文档数,如果都相同则不用恢复
    final long numDocsTarget = request.metadataSnapshot().getNumDocs();
    final long numDocsSource = recoverySourceMetadata.getNumDocs();
    if (numDocsTarget != numDocsSource) {
        throw new IllegalStateException("... ...");
    } 
} else {
	final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); // 找出target和source有差别的segment
	List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
	phase1Files.addAll(diff.different);
	phase1Files.addAll(diff.missing);
	... ...
	final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
        md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
	sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // 将需要恢复的文件发送到target node
	... ...
    }
    prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());

    从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:

代码语言:txt
复制
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
    final List<StoreFileMetaData> identical = new ArrayList<>();  // 相同的file 
    final List<StoreFileMetaData> different = new ArrayList<>();  // 不同的file
    final List<StoreFileMetaData> missing = new ArrayList<>();   // 缺失的file
    final Map<String, List<StoreFileMetaData>> perSegment = new HashMap<>();
    final List<StoreFileMetaData> perCommitStoreFiles = new ArrayList<>();
    ... ...
    for (List<StoreFileMetaData> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
        identicalFiles.clear();
        boolean consistent = true;
        for (StoreFileMetaData meta : segmentFiles) {
            StoreFileMetaData storeFileMetaData = recoveryTargetSnapshot.get(meta.name());
            if (storeFileMetaData == null) {
                consistent = false;
                missing.add(meta); // 该segment在target node中不存在,则加入到missing
            } else if (storeFileMetaData.isSame(meta) == false) {
                consistent = false;
                different.add(meta); // 存在但不相同,则加入到different
            } else {
                identicalFiles.add(meta);  // 存在且相同
            }
        }
        if (consistent) {
            identical.addAll(identicalFiles);
        } else {
            // make sure all files are added - this can happen if only the deletes are different
            different.addAll(identicalFiles);
        }
    }
    RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
    return recoveryDiff;
}

    这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog,这里需要注意的是,这两次网络通信都会调用 PlainTransportFuture.txGet() 方法阻塞等待 对端回复。至此,第一阶段的恢复逻辑完毕。

    第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。

3.3 目标节点开始恢复

  • 接收segment

    对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:

代码语言:txt
复制
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {
    final Store store = store();
    final String name = fileMetaData.name();
    ... ...
    if (position == 0) {
        indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
    } else {
        indexOutput = getOpenIndexOutput(name); // 加一层前缀,组成临时文件
    }
    ... ...
    while((scratch = iterator.next()) != null) { 
        indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件
    }
    ... ...
    store.directory().sync(Collections.singleton(temporaryFileName));  // 这里会调用fsync落盘
}
  • 打开引擎

    经过上面的过程,目标节点完成了追数据的第一步。接收完segment后,目标节点打开shard对应的引擎准备接收translog,注意,这里打开引擎后,正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下:

代码语言:txt
复制
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
    ... ...
    recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
    final EngineConfig.OpenMode openMode;
    if (indexExists == false) {
        openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
    } else if (skipTranslogRecovery) {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
    } else {
        openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
    }
    final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
    // we disable deletes since we allow for operations to be executed against the shard while recovering
    // but we need to make sure we don't loose deletes until we are done recovering
    config.setEnableGcDeletes(false); // 恢复过程中不删除translog
    Engine newEngine = createNewEngine(config); // 创建engine
    ... ...
}
  • 接收并重放translog

    打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:

代码语言:txt
复制
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
    switch (operation.opType()) { // 还原出操作类型及操作数据并调用engine执行相应的动作
        case INDEX:
            Translog.Index index = (Translog.Index) operation;           
            // ...  根据index构建engineIndex对象 ...
            maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
            index(engine, engineIndex); // 执行写入操作
            break;
        case DELETE:
            Translog.Delete delete = (Translog.Delete) operation;
            // ...  根据delete构建engineDelete对象 ...
            delete(engine, engineDelete); // 执行删除操作
            break;
        default:
            throw new IllegalStateException("No operation defined for [" + operation + "]");
    }
}

    通过上面的步骤,translog的重放完毕,此后需要做一些收尾的工作,包括,refresh让回放后的最新数据可见,打开translog gc:

代码语言:txt
复制
public void finalizeRecovery() {
    recoveryState().setStage(RecoveryState.Stage.FINALIZE);
    Engine engine = getEngine();
    engine.refresh("recovery_finalization"); 
    engine.config().setEnableGcDeletes(true);
}

    到这里,replica shard恢复的两个阶段便完成了,由于此时shard还处于INITIALIZING状态,还需通知master节点启动已恢复的shard:

代码语言:txt
复制
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
    @Override
    public void onRecoveryDone(RecoveryState state) {
        if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
            SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
            restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId());
        }
        shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
    }
}

    至此,shard recovery的所有流程都已完成。

4. 答疑解惑

    通过上述源码剖析后,本节将对文章开头抛出的几个问题进行答疑解惑,加深大家对分片恢复的理解。

  • 分布式死锁     通过上述源码的分析,大家注意3.1和3.2末尾处加粗的地方,可以看出,源节点和目标节点都有调用PlainTransportFuture.txGet()方法阻塞线程同步返回结果,这是导致死锁的关键点。具体问题描述及处理方法见https://cloud.tencent.com/developer/article/1370318,大家可以结合本文源码分析搞清楚死锁的原因。
  • 完整性     首先,phase1阶段,保证了存量的历史数据可以恢复到从分片。phase1阶段完成后,从分片引擎打开,可以正常处理index、delete请求,而translog覆盖完了整个phase1阶段,因此在phase1阶段中的index/delete操作都将被记录下来,在phase2阶段进行translog回放时,副本分片正常的index和delete操作和translog是并行执行的,这就保证了恢复开始之前的数据、恢复中的数据都会完整的写入到副本分片,保证了数据的完整性。如下图所示:
从分片恢复时序图
从分片恢复时序图
  • 一致性

    由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:

代码语言:txt
复制
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
    plan = IndexingStrategy.skipAsStale(false, index.version());
} 

5. 小结

    本文结合ES源码详细分析了副本分片恢复的具体流程,并通过对源码的理解对文章开头提出的问题进行答疑解惑。后面,我们也将推出更多ES相关的文章,欢迎大家多多关注和交流。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 前言
  • 2. 分片恢复总体流程
  • 3. 副本分片流程
  • 3.1. 目标节点请求恢复
  • 3.2 源节点处理恢复请求
  • 3.3 目标节点开始恢复
  • 4. 答疑解惑
  • 5. 小结
相关产品与服务
时序数据库 CTSDB
腾讯云时序数据库(TencentDB for CTSDB)是一种高效、安全、易用的云上时序数据存储服务。特别适用于物联网、大数据和互联网监控等拥有海量时序数据的场景。您可以根据实际业务需求快速创建CTSDB 实例,并随着业务变化实时线性扩展实例。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档