我们是基础架构部,腾讯云 CES/CTSDB 产品后台服务的支持团队,我们拥有专业的ES开发运维能力,为大家提供稳定、高性能的服务,欢迎有需求的童鞋接入,同时也欢迎各位交流 Elasticsearch、Lucene 相关的技术!
在线上生产环境中,对于大规模的ES集群出现节点故障的场景比较多,例如,网络分区、机器故障、集群压力等等,都会导致节点故障。当外在环境恢复后,节点需要重新加入集群,那么当节点重新加入集群时,由于ES的自平衡策略,需要将某些分片恢复到新加入的节点上,那么ES的分片恢复流程是如何进行的呢?遇到分片恢复的坑该如何解决呢?(这里线上用户有碰到过,当恢复的并发调得较大时,会触发es的bug导致分布式死锁)?分片恢复的完整性、一致性如何保证呢?,本文将通过ES源码一窥究竟。注:ES分片恢复的场景有多种,本文只剖析最复杂的场景--peer recovery。
ES副本分片恢复主要涉及恢复的目标节点和源节点,目标节点即故障恢复的节点,源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求,源节点接收到请求后主要分两阶段来处理。第一阶段,对需要恢复的shard创建snapshot,然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过,否则对比shard的segment文件差异,将有差异的segment文件发送给target node。第二阶段,为了保证target node数据的完整性,需要将本地的translog发送给target node,且对接收到的translog进行回放。整体流程如下图所示。
以上为恢复的总体流程,具体实现细节,下面将结合源码进行解析。
本节,我们通过源码来剖析副本分片的详细恢复流程。ES根据metadata的变化来驱动各个模块工作,副本分片恢复的起始入口为IndicesClusterStateService.createOrUpdateShards,这里首先会判断本地节点是否在routingNodes中,如果在,说明本地节点有分片创建或更新的需求,否则跳过。逻辑如下:
private void createOrUpdateShards(final ClusterState state) {
RoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());
if (localRoutingNode == null) {
return;
}
DiscoveryNodes nodes = state.nodes();
RoutingTable routingTable = state.routingTable();
for (final ShardRouting shardRouting : localRoutingNode) {
ShardId shardId = shardRouting.shardId();
if (failedShardsCache.containsKey(shardId) == false) {
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());
Shard shard = indexService.getShardOrNull(shardId.id());
if (shard == null) { // shard不存在则需创建
createShard(nodes, routingTable, shardRouting, state);
} else { // 存在则更新
updateShard(nodes, shardRouting, shard, routingTable, state);
}
}
}
}
副本分片恢复走的是createShard分支,在该方法中,首先获取shardRouting的类型,如果恢复类型为PEER,说明该分片需要从远端获取,则需要找到源节点,然后调用IndicesService.createShard:
private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {
DiscoveryNode sourceNode = null;
if (shardRouting.recoverySource().getType() == Type.PEER) {
sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting); // 如果恢复方式是peer,则会找到shard所在的源节点进行恢复
if (sourceNode == null) {
return;
}
}
RecoveryState recoveryState = new RecoveryState(shardRouting, nodes.getLocalNode(), sourceNode);
indicesService.createShard(shardRouting, recoveryState, recoveryTargetService, new RecoveryListener(shardRouting), repositoriesService, failedShardHandler);
... ...
}
private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();
if (primary.active()) {
sourceNode = nodes.get(primary.currentNodeId()); // 找到primary shard所在节点
}
} else if (shardRouting.relocatingNodeId() != null) {
sourceNode = nodes.get(shardRouting.relocatingNodeId()); // 找到搬迁的源节点
} else {
... ...
}
return sourceNode;
}
源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中,首先获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为 StartRecoveryRequest,通过RPC发送到源节点:
... ...
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
... ...
// 创建recovery quest
request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(), clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
... ...
// 向源节点发送请求,请求恢复
cancellableThreads.execute(() -> responseHolder.set(
transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet()));
注意,请求的发送是异步的,但是这里会调用 PlainTransportFuture.txGet() 方法,等待对端的回复,否则将一直 阻塞 。至此,目标节点已将请求发送给源节点,源节点的执行逻辑随后详细分析。
源节点接收到请求后会调用恢复的入口函数recover:
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}
}
recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:
public RecoveryResponse recoverToTarget() throws IOException { // 恢复分为两阶段
try (Translog.View translogView = shard.acquireTranslogView()) {
final IndexCommit phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
}
try {
phase1(phase1Snapshot, translogView); // 第一阶段,比较syncid和segment,然后得出有差异的部分,主动将数据推送给请求方
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseIndexCommit(phase1Snapshot);
} catch (IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
}
// engine was just started at the end of phase 1
if (shard.state() == IndexShardState.RELOCATED) {
throw new IndexShardRelocatedException(request.shardId());
}
try {
phase2(translogView.snapshot()); // 第二阶段,发送translog
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
finalizeRecovery();
}
return response;
}
从上面的代码可以看出,恢复主要分两个阶段,第一阶段恢复segment文件,第二阶段发送translog。这里有个关键的地方,在恢复前,首先需要获取translogView及segment snapshot,translogView的作用是保证当前时间点到恢复结束时间段的translog不被删除,segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
final Store store = shard.store(); //拿到shard的存储信息
recoverySourceMetadata = store.getMetadata(snapshot); // 拿到snapshot的metadata
String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
final boolean recoverWithSyncId = recoverySourceSyncId != null && recoverySourceSyncId.equals(recoveryTargetSyncId);
if (recoverWithSyncId) { // 如果syncid相等,再继续比较下文档数,如果都相同则不用恢复
final long numDocsTarget = request.metadataSnapshot().getNumDocs();
final long numDocsSource = recoverySourceMetadata.getNumDocs();
if (numDocsTarget != numDocsSource) {
throw new IllegalStateException("... ...");
}
} else {
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot()); // 找出target和source有差别的segment
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
phase1Files.addAll(diff.different);
phase1Files.addAll(diff.missing);
... ...
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); // 将需要恢复的文件发送到target node
... ...
}
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:
public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {
final List<StoreFileMetaData> identical = new ArrayList<>(); // 相同的file
final List<StoreFileMetaData> different = new ArrayList<>(); // 不同的file
final List<StoreFileMetaData> missing = new ArrayList<>(); // 缺失的file
final Map<String, List<StoreFileMetaData>> perSegment = new HashMap<>();
final List<StoreFileMetaData> perCommitStoreFiles = new ArrayList<>();
... ...
for (List<StoreFileMetaData> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {
identicalFiles.clear();
boolean consistent = true;
for (StoreFileMetaData meta : segmentFiles) {
StoreFileMetaData storeFileMetaData = recoveryTargetSnapshot.get(meta.name());
if (storeFileMetaData == null) {
consistent = false;
missing.add(meta); // 该segment在target node中不存在,则加入到missing
} else if (storeFileMetaData.isSame(meta) == false) {
consistent = false;
different.add(meta); // 存在但不相同,则加入到different
} else {
identicalFiles.add(meta); // 存在且相同
}
}
if (consistent) {
identical.addAll(identicalFiles);
} else {
// make sure all files are added - this can happen if only the deletes are different
different.addAll(identicalFiles);
}
}
RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical), Collections.unmodifiableList(different), Collections.unmodifiableList(missing));
return recoveryDiff;
}
这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog,这里需要注意的是,这两次网络通信都会调用 PlainTransportFuture.txGet() 方法阻塞等待 对端回复。至此,第一阶段的恢复逻辑完毕。
第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。
对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:
public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {
final Store store = store();
final String name = fileMetaData.name();
... ...
if (position == 0) {
indexOutput = openAndPutIndexOutput(name, fileMetaData, store);
} else {
indexOutput = getOpenIndexOutput(name); // 加一层前缀,组成临时文件
}
... ...
while((scratch = iterator.next()) != null) {
indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件
}
... ...
store.directory().sync(Collections.singleton(temporaryFileName)); // 这里会调用fsync落盘
}
经过上面的过程,目标节点完成了追数据的第一步。接收完segment后,目标节点打开shard对应的引擎准备接收translog,注意,这里打开引擎后,正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下:
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
... ...
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final EngineConfig.OpenMode openMode;
if (indexExists == false) {
openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
} else if (skipTranslogRecovery) {
openMode = EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG;
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false); // 恢复过程中不删除translog
Engine newEngine = createNewEngine(config); // 创建engine
... ...
}
打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
switch (operation.opType()) { // 还原出操作类型及操作数据并调用engine执行相应的动作
case INDEX:
Translog.Index index = (Translog.Index) operation;
// ... 根据index构建engineIndex对象 ...
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
index(engine, engineIndex); // 执行写入操作
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
// ... 根据delete构建engineDelete对象 ...
delete(engine, engineDelete); // 执行删除操作
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}
通过上面的步骤,translog的重放完毕,此后需要做一些收尾的工作,包括,refresh让回放后的最新数据可见,打开translog gc:
public void finalizeRecovery() {
recoveryState().setStage(RecoveryState.Stage.FINALIZE);
Engine engine = getEngine();
engine.refresh("recovery_finalization");
engine.config().setEnableGcDeletes(true);
}
到这里,replica shard恢复的两个阶段便完成了,由于此时shard还处于INITIALIZING状态,还需通知master节点启动已恢复的shard:
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {
@Override
public void onRecoveryDone(RecoveryState state) {
if (state.getRecoverySource().getType() == Type.SNAPSHOT) {
SnapshotRecoverySource snapshotRecoverySource = (SnapshotRecoverySource) state.getRecoverySource();
restoreService.indexShardRestoreCompleted(snapshotRecoverySource.snapshot(), shardRouting.shardId());
}
shardStateAction.shardStarted(shardRouting, "after " + state.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}
}
至此,shard recovery的所有流程都已完成。
通过上述源码剖析后,本节将对文章开头抛出的几个问题进行答疑解惑,加深大家对分片恢复的理解。
由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnVersions(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.skipAsStale(false, index.version());
}
本文结合ES源码详细分析了副本分片恢复的具体流程,并通过对源码的理解对文章开头提出的问题进行答疑解惑。后面,我们也将推出更多ES相关的文章,欢迎大家多多关注和交流。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。