Capacity 调度器的核心类是CapacityScheduler。在初始化CapacityScheduler的时候,在构造函数initAsyncSchedulingProperties,里面会初始化调度器相关。
核心类是AsyncSchedulingConfiguration,主要内容总结为:初始化异步调度器线程AsyncScheduleThread,可以初始化多个,调度支持多线程。
AsyncScheduleThread继承自Thread,核心是循环调度,调度的核心函数为schedule。
一般情况下,满足下面条件的节点不会被分配资源:
上述判断的核心实现函数为shouldSkipNodeSchedule。
资源分配方式分为:
代码主要流程如下:
int start = random.nextInt(nodeSize);
boolean printSkippedNodeLogging = isPrintSkippedNodeLogging(cs);
// Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}
current = 0;
// Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
}
if (shouldSkipNodeSchedule(node, cs, printSkippedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
int partitionSize = partitions.size();
// First randomize the start point
int start = random.nextInt(partitionSize);
// Allocate containers of partition [start, end)
for (String partition : partitions) {
if (current++ >= start) {
CandidateNodeSet<FiCaSchedulerNode> candidates =
cs.getCandidateNodeSet(partition);
if (candidates == null) {
continue;
}
cs.allocateContainersToNode(candidates, false);
}
}
current = 0;
// Allocate containers of partition [0, start)
for (String partition : partitions) {
if (current++ > start) {
break;
}
CandidateNodeSet<FiCaSchedulerNode> candidates =
cs.getCandidateNodeSet(partition);
if (candidates == null) {
continue;
}
cs.allocateContainersToNode(candidates, false);
}
资源分配的核心实现函数为allocateContainersToNode。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。