前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >YARN Fair Scheduler批量分配功能调优总结

YARN Fair Scheduler批量分配功能调优总结

作者头像
九州暮云
发布2020-02-18 13:31:36
2.6K1
发布2020-02-18 13:31:36
举报
文章被收录于专栏:九州牧云

背景

YARN调度效率不高,队列资源充足,集群中正在运行的任务数量少,资源利用率低的情况下有一部分任务排队,等待分配资源时间长。

注:我们的集群使用的是Apache Hadoop 2.6.3,以下内容以该版本为准。

解决方案

YARN调度时机和批量分配

出现这种情况,我们可以启用YARN批量分配功能提高单个节点资源的利用率,提升YARN调度效率。在了解批量分配之前,我们先简单了解一下调度被触发的时机。

一次调度是如何被触发的呢?这就涉及到两种调度时机,即心跳调度和持续调度。两种触发机制不同的地方有两个:

  • 调度时机:心跳调度仅仅发生在收到了某个NodeManager的心跳信息的情况下,持续调度则不依赖与NodeManager的心跳通信,是连续发生的,当心跳到来,会将调度结果直接返回给NodeManager。我们通过yarn.scheduler.fair.continuous-scheduling-enabled来配置是否打开连续调度功能,默认情况下该功能关闭
  • 调度范围:心跳调度机制下,当收到某个节点的心跳,就对这个节点且仅仅对这个节点进行一次调度,即谁的心跳到来就触发对谁的调度,而持续调度的每一轮,是会遍历当前集群的所有节点,每个节点依次进行一次调度,保证一轮下来每一个节点都被公平地调度一次

我们的集群没有启用连续调度,所以这里我们只关注心跳调度。YARNNodeManager会通过心跳的方式定期向ResourceManager汇报自身状态,心跳时间间隔通过yarn.nodemanager.heartbeat.interval-ms配置,默认值 1000ms。当NodeManagerResourceManager汇报了自身资源情况(比如,当前可用资源,正在使用的资源,已经释放的资源),这个RPC会触发ResourceManager调用nodeUpdate()方法,这个方法为这个节点进行一次资源调度。当节点资源没有被任务所预留且不开启批量分配的情况下,节点经过一次资源调度过程就结束资源分配了,这在节点资源充足,任务量多的情况下,会使集群产生资源碎片增多、节点资源利用不充分、调度效率不高等问题。因此,我们需要启用批量分配功能。

YARN为我们提供了以下几个批量分配参数:

  • yarn.scheduler.fair.assignmultiple:是否启用批量分配功能。当一个节点出现大量资源时,可以一次分配完成,也可以多次分配完成。默认情况下,该参数值为false
  • yarn.scheduler.fair.max.assign:如果开启批量分配功能,可指定一次分配的container数目。默认情况下,该参数值为-1,表示不限制
  • yarn.scheduler.fair.dynamic.max.assign:开启批量分配资源后,一次可以分配未分配资源的一半给container。默认情况下,该参数值为true。该参数Hadoop 2.8.0版本及以上版本才有

调优过程

yarn.scheduler.fair.assignmultiple参数设置为true,开启批量分配以后,在监控系统上发现,调度效率提升明显,集群资源利用率明显提升,但是经常会看到以下情况:

  • 少部分节点上运行的container数量比较多,资源被耗尽,大部分节点资源利用率为0或不高
  • 集群不繁忙的情况下,经常有预留资源的情况出现,尽管预留时间很短,预留资源的任务一直等待其他任务释放被预留资源节点上的资源,直接影响了集群的调度效率和任务的执行效率

于是,我们根据生产环境的NodeManager节点配置(170G内存、32个虚拟CPU)和任务资源配置,指定每次分配的container数量,调整yarn.scheduler.fair.max.assign的值为5。调整以后,解决了上述只开启批量分配不指定批量分配数量产生的问题。

Hadoop 2.8.0版本之后又提供了yarn.scheduler.fair.dynamic.max.assign参数,弥补了开启批量分配参数和指定批量分配数量后,部分节点资源被耗尽,导致节点负载太高的问题。无论是节点资源利用率不高还是节点太高,都会影响调度效率,所以,取一个中间值达到调度平衡就可以,这需要我们在不断的调优过程中做一个权衡。

源码解析

无论是心跳调度还是持续调度,他们对某个节点进行一次调度的算法和原理是公用的,都是通过synchronized void attemptScheduling(FSSchedulerNode node)来在某个节点上进行一次调度,方法的参数代表了准备进行资源分配的节点。批量分配参数就是在该方法中起的作用,以下是org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler类中批量分配的相关逻辑:

代码语言:javascript
复制
private synchronized void attemptScheduling(FSSchedulerNode node) {
  if (rmContext.isWorkPreservingRecoveryEnabled()
      && !rmContext.isSchedulerReadyForAllocatingContainers()) {
    return;
  }

  // Assign new containers...
  // 1. Check for reserved applications
  // 2. Schedule if there are no reservations

  // 获取这个节点上预留的application
  FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
  if (reservedAppSchedulable != null) { //如果这个节点上已经有reservation
    Priority reservedPriority = node.getReservedContainer().getReservedPriority();
    
    /**
     * 之前有预留,不满足条件则释放
     * 结合本地松弛特性,判断这个预留资源的应用是否有任何一个请求在这个节点上、在节点所在的机架上、在任意机器上运行:
     * 1、如果该application在任意机器上或者该节点所在机架上的container需求已经为0,则释放预留的资源
     * 2、如果该application在该节点没有container的需求,则释放预留的资源
     * 3、如果该application在该节点有container的需求,但是该节点总资源量小于container需要的资源量,则释放预留的资源
     */
    if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
      // Don't hold the reservation if app can no longer use it
      LOG.info("Releasing reservation that cannot be satisfied for application "
          + reservedAppSchedulable.getApplicationAttemptId()
          + " on node " + node);
      //如果这个被预留的container已经不符合运行条件,就没有必要保持预留了,直接取消预留,让出资源
      reservedAppSchedulable.unreserve(reservedPriority, node);
      reservedAppSchedulable = null;
    } else {
      // Reservation exists; try to fulfill the reservation
      if (LOG.isDebugEnabled()) {
        LOG.debug("Trying to fulfill reservation for application "
            + reservedAppSchedulable.getApplicationAttemptId()
            + " on node: " + node);
      }
      //对这个已经进行了reservation对节点进行节点分配,当然,有可能资源还是不足,因此还将处于预定状态
      node.getReservedAppSchedulable().assignReservedContainer(node);
    }
  }
   /**
    * 这个节点还没有进行reservation,则尝试进行container分配 
    */
  if (reservedAppSchedulable == null) { 
    // No reservation, schedule at queue which is farthest below fair share
    int assignedContainers = 0;
    while (node.getReservedContainer() == null) { //如果这个节点没有预留的container信息,那么,就尝试给这个节点分配container
      boolean assignedContainer = false;
      //尝试进行container的分配,并判断是否完全没有分配到并且也没有reserve成功
      //如果分配到了资源,或者预留到了资源,总之不是none
      if (!queueMgr.getRootQueue().assignContainer(node).equals(
          Resources.none())) {
        assignedContainers++;
        assignedContainer = true;
      }
      // 如果分配container失败,则结束本次分配
      if (!assignedContainer) { break; }
      // 如果没有开启批量分配特性,则结束本次分配
      if (!assignMultiple) { break; }
      // 如果开启了批量分配特性,且已分配的container数量大于每次最多分配的container数量,则结束本次分配,否则一直尝试分配,一直分配可能会出现在该节点上预留资源
      if ((assignedContainers >= maxAssign) && (maxAssign > 0)) { break; }
    }
  }
  // 更新队列的metric信息
  updateRootQueueMetrics();
}

参考资料

Yarn资源请求处理和资源分配原理解析

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 解决方案
    • YARN调度时机和批量分配
      • 调优过程
      • 源码解析
      • 参考资料
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档