前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ContainerManager详解

ContainerManager详解

原创
作者头像
zeekling
发布2024-08-07 00:38:41
1920
发布2024-08-07 00:38:41
举报
文章被收录于专栏:浪浪山下那个村

简介

ContainerManager主要负责NM中管理所有Container生命周期,其主要包含启动Container、恢复Container、停止Container等功能。

主要功能由ContainerManagerImpl类实现,具体代码可以参考当前类。

初始化

初始化主要分为两部分:

ContainerManagerImpl实例的构造函数和serviceInit函数。

构造函数

当前函数为构造函数,主要初始化必须要的一些变量等。

  • dispatcher : 事件的中央调度器,主要用于管理单个Container的生命周期。在当前函数里面会通过dispatcher.register()注册支持的事件。
  • containersLauncher: 主要用于启动Container,可以通过配置项yarn.nodemanager.containers-launcher.class指定。默认为containersLauncher.class

serviceInit函数

主要是服务启动时的初始化函数,ContainerManager在NodeManager内部属于一个服务。所以初始化的时候会调用这个函数初始化一些服务相关的东西。

在这个函数里面总结下来主要做了几件事:

  • 继续初始化一些事件,主要包含LogHandlerEventType、SharedCacheUploadEventType。
  • 初始化AMRMProxyService。
  • 从本地的LevelDB恢复Container信息。

恢复当前NodeManager的所有作业信息

第一步:恢复Application 信息

首先是从LevelDB里面加载Application信息。循环加载。

代码语言:java
复制
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
 try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
          appsState.getIterator()) {
   while (rasIterator.hasNext()) {
     ContainerManagerApplicationProto proto = rasIterator.next();
     LOG.debug("Recovering application with state: {}", proto);
     recoverApplication(proto);
   }
 }

加载Application的时候会将Application的上下文信息从LevelDB里面读出来,通过上下文信息等初始化新的ApplicationImpl,并且触发ApplicationInitEvent事件。

会根据当前作业上下文中实际的状态等信息跳转到实际的状态。

代码语言:java
复制
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
    appId, creds, context, p.getAppLogAggregationInitedTime());
context.getApplications().put(appId, app);
metrics.runningApplication();
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
第二步:恢复所有的Container信息

第二步是从LevelDB里面加载Container信息。循环加载。

代码语言:java
复制
try (RecoveryIterator<RecoveredContainerState> rcsIterator =
          stateStore.getContainerStateIterator()) {
   while (rcsIterator.hasNext()) {
     RecoveredContainerState rcs = rcsIterator.next();
     LOG.debug("Recovering container with state: {}", rcs);
     recoverContainer(rcs);
   }
 }

recoverContainer函数用于恢复单个Container信息。对于已经存在的Application对应的Container会通过LevelDB里面加载到的信息初始化Container对象,

将其加到所有Container的列表里面并且触发ApplicationContainerInitEvent,后续会根据实际状态信息跳转到指定状态继续处理。

代码语言:java
复制
Container container = new ContainerImpl(getConfig(), dispatcher,
    launchContext, credentials, metrics, token, context, rcs);
context.getContainers().put(token.getContainerID(), container);
containerScheduler.recoverActiveContainer(container, rcs);
app.handle(new ApplicationContainerInitEvent(container));

如果发现作业的状态为KILL状态,则会为当前Container重新触发KILL事件,保证Container已经停止。

对于Application找不见的Container,认为作业已经结束了,直接标记为已经完成。

在恢复完成之后会触发事件: ContainerSchedulerEventType.RECOVERY_COMPLETED

此状态会重新拉起所有的Container。

启动Containers

获取NMToken

在Container启动之前需要获取NMToken,可以通过下面命令获取,一般情况下获取第一个NMTokenIdentifier类型的Token。

代码语言:java
复制
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();

开始启动Container

启动之前需要做的就是初始化ContainerImpl信息,方便后续启动Container。

代码语言:java
复制
Container container =
    new ContainerImpl(getConfig(), this.dispatcher,
        launchContext, credentials, metrics, containerTokenIdentifier,
        context, containerStartTime);

如果是第一次启动(满足条件:!context.getApplications().containsKey(applicationID)),也就是AM,会通过下面命令触发作业的启动:

代码语言:java
复制
context.getNMStateStore().storeApplication(applicationID,
    buildAppProto(applicationID, user, credentials, appAcls,
        logAggregationContext, flowContext));
dispatcher.getEventHandler().handle(new ApplicationInitEvent(
    applicationID, appAcls, logAggregationContext));

满足下面条件则是恢复Application:

containerTokenIdentifier.getContainerType() == ContainerType.APPLICATION_MASTER && context.getApplications().containsKey(applicationID))

启动Container主要是触发Container的Init事件:

代码语言:java
复制
this.context.getNMStateStore().storeContainer(containerId,
    containerTokenIdentifier.getVersion(), containerStartTime, request);
dispatcher.getEventHandler().handle(
  new ApplicationContainerInitEvent(container));

停止Container

获取NMToken

在Container停止之前需要获取NMToken,可以通过下面命令获取,一般情况下获取第一个NMTokenIdentifier类型的Token。和启动时候的类似。

代码语言:java
复制
Set<TokenIdentifier> tokenIdentifiers = remoteUgi.getTokenIdentifiers();

停止作业

停止作业的时候会优先通过context.getContainers().get(containerID)获取Container信息。

如下场景会抛出异常:

  • 对于查询不到的Container,如果不是最新停止的,则会抛出异常。
  • 对于正在恢复的Container,不会接受停止,会抛出异常。

停止作业的核心逻辑如下,核心思想是触发KILL事件。具体可以参见Container事件处理

代码语言:java
复制
context.getNMStateStore().storeContainerKilled(containerID);
container.sendKillEvent(ContainerExitStatus.KILLED_BY_APPMASTER,
    "Container killed by the ApplicationMaster.");

Container事件处理

Container启动的开始是从事件InitContainerTransition。由当前事件通过状态机转到其他状态。

pic
pic

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 初始化
    • 构造函数
      • serviceInit函数
        • 恢复当前NodeManager的所有作业信息
    • 启动Containers
      • 获取NMToken
        • 开始启动Container
        • 停止Container
          • 获取NMToken
            • 停止作业
            • Container事件处理
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档