在YARN
的原生任务监控界面中,我们经常能看到Aggregate Resource Allocation
这个指标(图中高亮选中部分),这个指标表示该任务拥有的所有container
每秒所消耗的资源(内存、CPU
)总和:
Aggregate Resource Allocation
是在org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt
类中进行计算的,主要逻辑如下:
// 资源信息更新间隔:3秒
private static final long MEM_AGGREGATE_ALLOCATION_CACHE_MSECS = 3000;
// 最后更新时间、最后更新时的每秒的内存和CPU使用量
protected long lastMemoryAggregateAllocationUpdateTime = 0;
private long lastMemorySeconds = 0;
private long lastVcoreSeconds = 0;
/**
* 返回任务拥有的所有container每秒所消耗的资源(内存、CPU)总和
* @return
*/
synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
long currentTimeMillis = System.currentTimeMillis();
// Don't walk the whole container list if the resources were computed
// recently.
// 判断是否达到更新条件:当前时间 - 最后更新时间 > 最大更新间隔(3秒)
if ((currentTimeMillis - lastMemoryAggregateAllocationUpdateTime)
> MEM_AGGREGATE_ALLOCATION_CACHE_MSECS) {
long memorySeconds = 0;
long vcoreSeconds = 0;
// 迭代所有的container,计算每个container每秒所消耗的资源(内存、CPU)
for (RMContainer rmContainer : this.liveContainers.values()) {
// 获取container的运行时间
long usedMillis = currentTimeMillis - rmContainer.getCreationTime();
// 计算container每秒所消耗的资源(内存、CPU)
Resource resource = rmContainer.getContainer().getResource();
// 汇总内存和CPU使用量
memorySeconds += resource.getMemory() * usedMillis /
DateUtils.MILLIS_PER_SECOND;
vcoreSeconds += resource.getVirtualCores() * usedMillis
/ DateUtils.MILLIS_PER_SECOND;
}
// 记录最后更新任务资源使用情况的时间、任务最后每秒使用的内存和CPU数量
lastMemoryAggregateAllocationUpdateTime = currentTimeMillis;
lastMemorySeconds = memorySeconds;
lastVcoreSeconds = vcoreSeconds;
}
return new AggregateAppResourceUsage(lastMemorySeconds, lastVcoreSeconds);
}
/**
* 返回任务使用的资源情况
* @return
*/
public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
// 返回任务所使用的资源情况:所使用的container数量、预留的container数量、当前消耗的资源、当前预留的资源、所需的总资源(当前消耗的资源+当前预留的资源)、每秒的内存和CPU使用量
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), Resources.clone(currentConsumption),
Resources.clone(currentReservation),
Resources.add(currentConsumption, currentReservation),
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
}
getResourceUsageReport
方法是一个用synchronized
关键字修饰的同步方法,被在org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler
类的getAppResourceUsageReport
方法中调用。因此,synchronized
关键字在这里起的是对象锁的作用,保证在同一时刻多个线程更新任务资源使用信息时,不会产生并发更新问题。
@Override
public ApplicationResourceUsageReport getAppResourceUsageReport(
ApplicationAttemptId appAttemptId) {
SchedulerApplicationAttempt attempt = getApplicationAttempt(appAttemptId);
if (attempt == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Request for appInfo of unknown attempt " + appAttemptId);
}
return null;
}
return attempt.getResourceUsageReport();
}