distributedShell是Yarn自带的应用程序,和MR类似,当前工具可以用来对Yarn进行压测。
参考命令如下:
./bin/hadoop jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar \
-jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar -shell_command \
'/bin/date' -num_containers 5
可以提交一个样例作业到Yarn上面。
当前样例的入口类是org.apache.hadoop.yarn.applications.distributedshell.Client
,在pom文件里面默认定义了当前类为主类。所以在提交的时候可以不用指定主类。
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- 省略部分参数 -->
</executions>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.yarn.applications.distributedshell.Client</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
核心流程主要包含下面3个:
其中前面两个主要在客户端,第3个主要是在yarn上面。
初始化阶段包括下面两部分:
下面是初始化Client对象的核心代码。
Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
this.conf.setBoolean(
YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true);
this.appMasterMainClass = appMasterMainClass;
// 创建和RM的连接
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
opts = new Options();
// 初始化支持的参数列表
stopSignalReceived = new AtomicBoolean(false);
isRunning = new AtomicBoolean(false);
}
初始化Client,在初始化Client阶段主要是读取命令行参数。
// 初始化Client函数入口
boolean doRun = client.init(args);
首先还是建立和Yarn服务端的连接,为作业提交做准备。
isRunning.set(true);
yarnClient.start();
在连接建立之后会查询并且在控制台打印Yarn服务端的一些信息。主要包含下面内容:
yarnClient.getYarnClusterMetrics()
查询到并且显示。yarnClient.getNodeReports(NodeState.RUNNING)
查询到。yarnClient.getQueueInfo(this.amQueue)
查询到。yarnClient.getQueueAclsInfo()
查询。yarnClient.getResourceProfiles()
查询。在打印完集群信息之后才是作业提交的开始。
提交作业之前,是需要先向RM申请AppId的。AppId可以通过YarnClientApplication app = yarnClient.createApplication();
获取。作业提交信息一般都在ApplicationSubmissionContext里面,包含下面信息:
appContext.setAMContainerResourceRequests(amResourceRequests);
设置。appContext.setApplicationName(appName);
设置。appContext.setLogAggregationContext(logAggregationContext);
设置。作业真正提交的代码只有一行:
yarnClient.submitApplication(appContext);
当前样例做到了作业所需要的信息可配置。是一个比较适合开发作业的样例。
AM的核心代码是在ApplicationMaster.java里面的。在启动AM的时候会调用到当前函数的main函数。
在构造函数里面和init函数里面,主要是加载配置项以及命令行参数。真正运行的函数是run,核心在run函数里面,
首先需要创建和RM以及NM的连接。
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
startTimelineClient(conf);
在AM启动OK了第一件事就是需要去RM上面注册,证明当前AM已经启动完成了。
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
普通Container的申请是在AM里面处理的,类似下面代码,下面代码是异步申请的。
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
当Container申请好之后,可以通过下面代码获取,在样例中触发onContainerAllocated事件。
List<Container> allocated = response.getAllocatedContainers();
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
通过下面代码启动Container.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
nmClientAsync.startContainerAsync(container, ctx);
在作业结束的时候,AM需要做下面事:
nmClientAsync.stop();
try {
amRMClient.unregisterApplicationMaster(appStatus, message, null);
} catch (YarnException | IOException ex) {
LOG.error("Failed to unregister application", ex);
}
amRMClient.stop();
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有