前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Yarn 状态机以及事件机制

Yarn 状态机以及事件机制

作者头像
zeekling
发布2023-11-29 10:47:30
3070
发布2023-11-29 10:47:30
举报
文章被收录于专栏:浪浪山下那个村

简介

Yarn采用了基于事件驱动的并发模型:

  • 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器。
  • 需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。

某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。

事件处理器定义

事件处理器定义如下:

代码语言:javascript
复制
@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {

  void handle(T event);

}

只有一个handler函数,如参是事件:

中央处理器AsyncDispatcher

AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能:

  • 注册不同类型的事件,主要包含事件类型和事件处理器。
  • 获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。
代码语言:javascript
复制
@Public
@Evolving
public interface Dispatcher {

  EventHandler<Event> getEventHandler();

  void register(Class<? extends Enum> eventType, EventHandler handler);

}

AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务, 是一个典型的生产者消费这模型。

代码语言:javascript
复制
public class AsyncDispatcher extends AbstractService implements Dispatcher {
 ...
}

事件处理器的注册

事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:Map<Class<? extends Enum>, EventHandler> eventDispatchers,键是事件类型,value是事件的处理器。

对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时,会依次调用每个handler。

代码语言:javascript
复制
public void register(Class<? extends Enum> eventType,
      EventHandler handler) {
    /* check to see if we have a listener registered */
    EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
    LOG.info("Registering " + eventType + " for " + handler.getClass());
    if (registeredHandler == null) {
      eventDispatchers.put(eventType, handler);
    } else if (!(registeredHandler instanceof MultiListenerHandler)){
      /* for multiple listeners of an event add the multiple listener handler */
      MultiListenerHandler multiHandler = new MultiListenerHandler();
      multiHandler.addHandler(registeredHandler);
      multiHandler.addHandler(handler);
      eventDispatchers.put(eventType, multiHandler);
    } else {
      /* already a multilistener, just add to it */
      MultiListenerHandler multiHandler
      = (MultiListenerHandler) registeredHandler;
      multiHandler.addHandler(handler);
    }
  }

事件处理

AsyncDispatcher#getEventHandler()是异步派发的关键:

代码语言:javascript
复制
private final EventHandler<Event> handlerInstance = new GenericEventHandler();

// 省略.....

@Override
public EventHandler<Event> getEventHandler() {
   return handlerInstance;
}

GenericEventHandler:一个特殊的事件处理器

GenericEventHandler是一个特殊的事件处理器,用于接受各种事件。由指定线程处理接收到的事件。

代码语言:javascript
复制
public void handle(Event event) {
  if (blockNewEvents) {
    return;
  }
  drained = false;
  /* all this method does is enqueue all the events onto the queue */
  int qSize = eventQueue.size();
  if (qSize != 0 && qSize % 1000 == 0
      && lastEventQueueSizeLogged != qSize) {
    lastEventQueueSizeLogged = qSize;
    LOG.info("Size of event-queue is " + qSize);
  }
  if (qSize != 0 && qSize % detailsInterval == 0
          && lastEventDetailsQueueSizeLogged != qSize) {
    lastEventDetailsQueueSizeLogged = qSize;
    printEventQueueDetails();
    printTrigger = true;
  }
  int remCapacity = eventQueue.remainingCapacity();
  if (remCapacity < 1000) {
    LOG.warn("Very low remaining capacity in the event-queue: "
        + remCapacity);
  }
  try {
    eventQueue.put(event);
  } catch (InterruptedException e) {
    if (!stopped) {
      LOG.warn("AsyncDispatcher thread interrupted", e);
    }
    // Need to reset drained flag to true if event queue is empty,
    // otherwise dispatcher will hang on stop.
    drained = eventQueue.isEmpty();
    throw new YarnRuntimeException(e);
  }
};
  • blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。
  • eventQueue:将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。

事件处理线程

在服务启动时(serviceStart函数)创建一个线程,会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。

代码语言:javascript
复制
Runnable createThread() {
  return new Runnable() {
    @Override
    public void run() {
      while (!stopped && !Thread.currentThread().isInterrupted()) {
        drained = eventQueue.isEmpty();
        // 省略。。。
        Event event;
        try {
          event = eventQueue.take();
        } catch(InterruptedException ie) {
          if (!stopped) {
            LOG.warn("AsyncDispatcher thread interrupted", ie);
          }
          return;
        }
        if (event != null) {
          // 省略。。。
          dispatch(event);
          // 省略。。。
        }
      }
    }
  };
}

dispatch详解

  • 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器,调用当前处理器的handler函数。
  • 如果当前handler处理出现异常时,默认会退出RM。
代码语言:javascript
复制
protected void dispatch(Event event) {
  //all events go thru this loop
  LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
      event);

  Class<? extends Enum> type = event.getType().getDeclaringClass();

  try{
    EventHandler handler = eventDispatchers.get(type);
    if(handler != null) {
      handler.handle(event);
    } else {
      throw new Exception("No handler for registered for " + type);
    }
  } catch (Throwable t) {
    //TODO Maybe log the state of the queue
    LOG.error(FATAL, "Error in dispatcher thread", t);
    // If serviceStop is called, we should exit this thread gracefully.
    if (exitOnDispatchException
        && (ShutdownHookManager.get().isShutdownInProgress()) == false
        && stopped == false) {
      stopped = true;
      Thread shutDownThread = new Thread(createShutDownThread());
      shutDownThread.setName("AsyncDispatcher ShutDown handler");
      shutDownThread.start();
    }
  }
}

状态机

状态转换由成员变量StateMachine管理,所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。

代码语言:javascript
复制
private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

    .addTransition(
        RMAppState.KILLED,
        RMAppState.KILLED,
        EnumSet.of(RMAppEventType.APP_ACCEPTED,
            RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
            RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
            RMAppEventType.NODE_UPDATE, RMAppEventType.START))

     .installTopology();

Transition定义了“从一个状态转换到另一个状态”的行为,由转换操作、开始状态、事件类型、事件组成:

代码语言:javascript
复制
public interface StateMachine
                 <STATE extends Enum<STATE>,
                  EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
  public STATE getCurrentState();
  public STATE getPreviousState();
  public STATE doTransition(EVENTTYPE eventType, EVENT event)
        throws InvalidStateTransitionException;
}

ResourceManager中状态机

  • RMApp:用于维护一个Application的生命周期,实现类 - RMAppImpl
  • RMAppAttempt:用于维护一次试探运行的生命周期,实现类 - RMAppAttemptImpl
  • RMContainer:用于维护一个已分配的资源最小单位Container的生命周期,实现类 - RMContainerImpl
  • RMNode:用于维护一个NodeManager的生命周期,实现类 - RMNodeImpl

NodeManager中状态机:

  • Application:用于维护节点上一个Application的生命周期,实现类 - ApplicationImpl
  • Container:用于维护节点上一个容器的生命周期,实现类 - ContainerImpl
  • LocalizedResource:用于维护节点上资源本地化的生命周期,没有使用接口即实现类 - LocalizedResource
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023.11.28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 事件处理器定义
  • 中央处理器AsyncDispatcher
  • 事件处理器的注册
  • 事件处理
    • GenericEventHandler:一个特殊的事件处理器
      • 事件处理线程
        • dispatch详解
    • 状态机
      • ResourceManager中状态机
      相关产品与服务
      容器服务
      腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档