最近几天做Hadoop机架感知功能时,在网上可以找到很多关于HDFS机架感知的资料,但是对于YARN机架感知的介绍却很少。这篇文章最主要就是说明机架感知功能对于YARN来说起到的作用,若有理解的偏差请指正。谢谢。
*注:代码基于3.1.1*
HDFS的默认副本数是3个,在未启用Hadoop机架感知功能时,数据的备份是随机的,有可能同一个DataNode节点有多个副本,也有可能所有的3个副本都在同一个机架,这样如果一个节点宕机或整个机架出现问题,数据就会丢失。
在开启Hadoop机架感知功能后,本地会存储一份, 同机架的某个节点存储一份,不同机架的某个节点存储一份。这样保证了如果本地数据损坏,首先会在同机架的节点上获取数据;如果整个机架出现问题,也可以保证从其他机架上获取数据。
Hadoop的机架感知功能使得HDFS在读取数据时降低了整体的带宽消耗和读取延时,也保证了数据不会轻易丢失。
Hadoop机架感知功能对于YARN最直观的表现,可以通过ResourceManager的管理界面(ResourceManagerIP:8088/cluster/nodes)查看各个NodeManager所属机架,未启用Hadoop机架感知功能时,默认的机架就为/default-rack。这个可以结合NodeManager的健康检查功能,很快的定位出现问题节点所在的物理位置。
并且Hadoop机架感知还会影响YARN中Container启动时所在节点。Container首先会选择数据所在节点启动,如果该节点资源不足,则会在与该节点同机架的节点启动。如果该机架的节点资源都不足,则在其他节点启动。
配置项 | 默认值 | 配置说明 |
---|---|---|
net.topology.node.switch.mapping.impl | org.apache.hadoop.net.ScriptBasedMapping | DNSToSwitchMapping的实现类。当配置为"org.apache.hadoop.net.ScriptBasedMapping"时,它调用“net.topology.script.file.name”中指定的脚本来解析节点所属机架。如果“net.topology.script.file.name”的值为空,所有节点的机架信息都为"/default-rack" |
net.topology.script.file.name | 解析机架拓扑关系的脚本名。例如:如果192.168.0.1所属机架为/rack1,则将“192.168.0.1”作为参数传入脚本,则会返回/rack1为输出。 |
"net.topology.node.switch.mapping.impl"配置项是拓扑解析的实现类,用户可以自己编写Java程序来解析机架信息。我们这里使用它的默认值“org.apache.hadoop.net.ScriptBasedMapping”来实现Hadoop的机架感知功能。
一个脚本的实例如下所示,里面包含了每个IP所对应的rack信息,可以将这个脚本放在/home目录下,脚本名字为 topology-rack.sh。
#!/usr/bin/python
#-\*-coding:utf-8 -\*-
import sys
rack = {
"192.168.0.1":"/room1-rack1",
"192.168.0.2":"/room1-rack1",
"192.168.0.3":"/room1-rack2",
"192.168.0.4":"/room1-rack2",
"192.168.0.5":"/room2-rack1",
"192.168.0.6":"/room2-rack1",
"192.168.0.7":"/room2-rack2"
}
if \_\_name\_\_=="\_\_main\_\_":
print "/"+rack.get(sys.argv[1],"/default-rack")
将配置项“net.topology.script.file.name”配置如下:
<property>
<name>net.topology.script.file.name</name>
<value>/home/topology-rack.sh</value>
</property>
重启HDFS和YARN之后,就可以开启Hadoop机架感知功能。
**注:如果DataNode或NodeManager扩容,要及时修改脚本文件当中的拓扑关系,否则会默认扩容节点为/default-rack机架**
YARN中,主要在RackResolver
类中处理机架信息。下面介绍该类的主要功能。
首先看其的初始化方法,RackResolver.init()
:
public synchronized static void init(Configuration conf) {
....
Class<? extends DNSToSwitchMapping> dnsToSwitchMappingClass =
conf.getClass(
CommonConfigurationKeysPublic.NET\_TOPOLOGY\_NODE\_SWITCH\_MAPPING\_IMPL\_KEY,
ScriptBasedMapping.class,
DNSToSwitchMapping.class);
try {
DNSToSwitchMapping newInstance = ReflectionUtils.newInstance(
dnsToSwitchMappingClass, conf);
dnsToSwitchMapping =
((newInstance instanceof CachedDNSToSwitchMapping) ? newInstance
: new CachedDNSToSwitchMapping(newInstance));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
初始化机架感知处理类时,会通过ScriptBasedMapping
的构造函数,初始化一些参数:
public void setConf (Configuration conf) {
super.setConf(conf);
if (conf != null) {
scriptName = conf.get(SCRIPT\_FILENAME\_KEY);
maxArgs = conf.getInt(SCRIPT\_ARG\_COUNT\_KEY, DEFAULT\_ARG\_COUNT);
} else {
scriptName = null;
maxArgs = 0;
}
}
YARN会通过RackResolver.coreResolve()
方法来获取主机名和机架信息的对应关系,具体如下:
dnsToSwitchMapping.resolve(tmpList)
获取主机与机架的信息,其具体解析过程3.2.2介绍;DEFAULT\_RACK
,如果主机解析机架信息时为空,则其机架信息为默认值;Node
。 private static Node coreResolve(String hostName) {
List <String> tmpList = Collections.singletonList(hostName);
List <String> rNameList = dnsToSwitchMapping.resolve(tmpList);
String rName = NetworkTopology.DEFAULT\_RACK;
if (rNameList == null || rNameList.get(0) == null) {
LOG.debug("Could not resolve {}. Falling back to {}", hostName,
NetworkTopology.DEFAULT\_RACK);
} else {
rName = rNameList.get(0);
LOG.debug("Resolved {} to {}", hostName, rName);
}
return new NodeBase(hostName, rName);
}
接下来对3.2.1中所提到的dnsToSwitchMapping.resolv(List<String> names)
进行说明。dnsToSwitchMapping
是CachedDNSToSwitchMapping
的一个实例,会调用CachedDNSToSwitchMapping.resolve()
方法,其说明如下:
getUncachedHosts(names)
获取没存放在缓存中的主机IP;rawMapping.resolve(uncachedHosts)
来解析未在缓存中主机的机架信息,具体解析过程3.2.3中介绍;uncachedHosts
和resolvedHosts
信息对应的存放到缓存中;***注:由第2条可知,在NodeManager扩容时,将其信息更新到脚本就可以。但是如果修改一个已存在的NodeManager的机架信息,需要重启YARN服务生效(更新缓存)。***
public List<String> resolve(List<String> names) {
names = NetUtils.normalizeHostNames(names);
...
List<String> uncachedHosts = getUncachedHosts(names);
// Resolve the uncached hosts
List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
//cache them
cacheResolvedHosts(uncachedHosts, resolvedHosts);
//now look up the entire list in the cache
return getCachedHosts(names);
}
在rawMapping.resolve(uncachedHosts)
中,实际调用的是ScriptBasedMapping#RawScriptBasedMapping.resolve()
方法,其说明如下:
runResolveCommand()
方法就是判断下传参是否符合要求,并且执行脚本,并将脚本执行结果返回),经过处理后返回。public List<String> resolve(List<String> names) {
List<String> m = new ArrayList<String>(names.size());
if (scriptName == null) {
for (String name : names) {
m.add(NetworkTopology.DEFAULT\_RACK);
}
return m;
}
...
String output = runResolveCommand(names, scriptName);
if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) {
String switchInfo = allSwitchInfo.nextToken();
m.add(switchInfo);
}
...
return m;
}
在ResourceTrackerService.resolve()
、AMRMClientImpl.resolveRacks()
、RMContainerAllocator.assignMapsWithLocality()
、TaskAttemptImpl
的构造函数和TaskAttemptImpl.computeRackAndLocality()
中都调用了RackResolver.resolve()
方法来获取主机与机架的信息。
ResourceTrackerService
中,用其信息用在NodeManager
向ResourceManager
注册的过程中;AMRMClientImpl
中,其信息用在添加containerRequest
当中;RMContainerAllocator
和TaskAttemptImpl
中,其信息与资源本地性相关;HADOOP的机架感知功能,除了对HDFS的数据的副本有关之外,对YARN的NodeManager注册及Container的资源分发也有着密切的关系。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。