用过Impala的同学都知道,Impala本身引入了一个catalogd服务,来缓存hms和nn中的一些元数据,例如表的信息、文件信息、block信息等。同时,这份元数据会通过statestored广播到所有的coordinator节点(以下简称c节点),executor节点不需要缓存元数据。这种设计可以极大的提升查询性能,每次SQL解析的时候,不需要再跟hms/nn进行交互,所有的元数据操作都通过catalogd来进行,c节点只需要周期性地从statestored获取元数据信息进行同步即可。 但是这种设计也带来了一定的问题,当元数据量非常大的时候,catalogd本身就成为了瓶颈,会出现各种各样的问题。为此,社区从3.x版本就开始开发了一种新的catalog模式,称为LocalCatalog模式,也可以叫“Fetch-on-demand”。关于这个LocalCatalog模式的相关讨论,可以参考社区JIRA:IMPALA-7127,里面也有设计文档,这里就不再展开说明。本文主要会从两个常见的场景出发,结合代码,来跟大家一起学习下LocalCatalog模式下,c节点的一些处理逻辑。由于LocalCatalog模式涉及到的内容非常多,因此本文可以无法一一覆盖,敬请谅解。
当c节点接收到client发来的SQL请求之后,会通过JNI来调用FE端的代码,进行SQL解析,相关操作的API入口位于JniFrontend.java中,具体的执行处理逻辑位于Frontend.java中。当c节点启动的时候,系统会根据配置项来构造一个对应的Frontend实例,相关代码如下所示:
//JniFrontend.ctor
if (cfg.is_coordinator) {
frontend_ = new Frontend(authzFactory, isBackendTest);
} else {
frontend_ = null;
}
//Frontend.ctor
public Frontend(AuthorizationFactory authzFactory, boolean isBackendTest)
throws ImpalaException {
this(authzFactory, FeCatalogManager.createFromBackendConfig(), isBackendTest);
}
private Frontend(AuthorizationFactory authzFactory, FeCatalogManager catalogManager,
boolean isBackendTest) throws ImpalaException {
//省略主体代码
}
在Frontend的private构造函数参数中,需要传入一个FeCatalogManager变量,用于进行后续的catalog相关操作。这里的createFromBackendConfig()函数,就是根据BE端的配置项,来决定启动哪种模式:
//FeCatalogManager.java
public static FeCatalogManager createFromBackendConfig() {
if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
return new LocalImpl();
} else {
return new CatalogdImpl();
}
}
如果use_local_catalog为true,返回LocalImpl,表示启动LocalCatalog模式;否则启返回CatalogImpl,表示沿用之前的普通Catalog模式。关于FeCatalogManager的UML类图如下所示:
这个FeCatalogManager只是用来管理产生相应的FeCatalog,并不负责实际的元数据操作。FeCatalogManager通过getOrCreateCatalog()方法来构造对应的FeCatalog变量:
//FeCatalogManager.CatalogdImpl
public FeCatalog getOrCreateCatalog() {
return catalog_.get();
}
//FeCatalogManager.LocalImpl
public FeCatalog getOrCreateCatalog() {
PROVIDER.setAuthzChecker(authzChecker_);
return new LocalCatalog(PROVIDER, DEFAULT_KUDU_MASTER_HOSTS);
}
对于CatalogdImpl,直接返回了ImpaladCatalog类型的私有成员变量,用于后续的元数据操作;而对于LocalImpl,每次都是构造了一个新的LocalCatalog,其中包含了CatalogdMetaProvider,这个类就是实际进行元数据操作的,后面我们会再详细介绍。这两个Catalog都是FeCatalog的实现,相关的UML类图如下所示:
简单介绍下图中的相关类:Catalog是一个abstract class,用于读取/更新HMS中的元数据,它的两个子类,CatalogServiceCatalog主要是在catalogd中进行各种元数据相关的操作;ImpaladCatalog是在impalad中进行各种元数据的相关操作(普通Catalog模式,即use_local_catalog为false)。LocalCatalog就是在LocalCatalog模式下,利用CatalogdMetaProvider,在impalad中进行各种的元数据操作。FeCatalog是一个接口,有一些元数据相关操作相关的api,例如getTable、getDb等。
上面介绍了元数据操作的相关类。接下来,我们以普通的查询流程为例,看看在LocalCatalog模式下,FE是如何进行元数据的加载和缓存的。下面就是c节点在接收到SQL解析请求之后,FE端的函数调用栈:
createExecRequest(JniFrontend.java):162
-createExecRequest(Frontend.java):1595
--getTExecRequest(Frontend.java):1625
---doCreateExecRequest(Frontend.java):1655
----loadTables(StmtMetadataLoader.java):138
-----loadTables(StmtMetadataLoader.java):162
------getMissingTables(StmtMetadataLoader.java)
Impala首先会对SQL进行parse,转换成一个AST。然后利用表的元数据进行analysis。在进行analysis阶段之前,会先检查SQL中涉及到的表,是否都已经加载。如果没有的话,需要主动加载元数据,并缓存在c节点的内存中。相关的代码如下所示:
//StmtMetadataLoader.loadTables()
FeCatalog catalog = fe_.getCatalog();
Set<TableName> missingTbls = getMissingTables(catalog, tbls);
对于LocalCatalog模式,就是通过上面提到的LocalImpl.getOrCreateCatalog方法,每次创建一个新的LocalCatalog返回。实际的元数据操作是通过LocalCatalog的私有成员变量来执行的(类型是CatalogdMetaProvider)。我们沿着getMissingTables()函数的处理逻辑继续往下看:
//StmtMetadataLoader.getMissingTables()
for (TableName tblName: tbls) {
if (loadedOrFailedTbls_.containsKey(tblName)) continue;
FeDb db = catalog.getDb(tblName.getDb());
if (db == null) continue;
dbs_.add(tblName.getDb());
FeTable tbl = db.getTable(tblName.getTbl());
if (tbl == null) continue;
if (!tbl.isLoaded()) {
missingTbls.add(tblName);
continue;
}
//省略部分代码
}
这段代码的功能,就是获取SQL中没有加载的表,称之为Missing Table。对于这些表,需要先加载完元数据,才能继续后面的analysis操作。首先根据FeCatalog(这里就是LocalCatalog)和表的库名,来获取对应的FeDb。关于FeDb相关的UML类图如下所示:
FeDb就是在FE中,database的一个接口,提供了一些与database相关的api,例如getMetaStoreDb、containsTable、getTable等。在LocalCatalog模式中,FeDb实际上是就是LocalDb。接着就会根据这个LocalDb来获取对应的FeTable。与FeDb类似,FeTable就是在FE中,table的一个接口。目前Impala支持多种table,例如hdfs、kudu、hbase等。这里我们以hdfs表为例进行介绍,相关的UML类图如下所示:
这个图相对比较复杂,这里简单介绍一下,不感兴趣的同学可以先跳过。FeTable是在FE端与表进行交互的一个接口,Impala针对每种类型的表,又实现了各自的接口,例如FeFsTable(parquet、orc、text等)、FeHBaseTable、FeKuduTable等,这些都继承自FeTable。 在普通的Catalog模式下,Impala实现了一个Table,这是一个abstract class,可以理解为表作为一个catalog object的基类。因此,对于每一个具体的表,例如HdfsTable,需要继承Table,并实现FeFsTable。在LocalCatalog模式下,Impala又实现了一个LocalTable类,与Table对应。此时对于每一种具体的表,例如LocalFsTable,也需要继承LocalTable,并实现FeFsTable。对于其他类型的表,kudu、hbase等,也是如此。 除此之外,图中还有一个接口FeIncompleteTable,分别有两个实现:IncompleteTable和LocalIncompleteTable,代表的就是表加载失败之后的状态。每种具体类型的表,如果加载失败了,都会变成incomplete状态,同时还会保存加载失败的原因。 到这里,关于FeTable的介绍就差不多已经结束了。回到上面的那段代码,在LocalCatalog模式下,我们获取到的FeTable实际上就是一个LocalFsTable。接下来,我们就看一下,如何根据表名,通过LocalDb来获取对应的LocalFsTable,这里我们承接了上面的调用栈:
getMissingTables(StmtMetadataLoader.java):307
-getTable(LocalDb.java):123
--getTableIfCached(LocalDb.java):110
---loadTableNames(LocalDb.java):170
----loadTableList(CatalogdMetaProvider.java):662
-----loadWithCaching(CatalogdMetaProvider.java)
可以看到,最终在CatalogdMetaProvider.java中,就是通过这个loadWithCaching方法来向catalogd获取相关的元数据信息。在获取database、table等信息的时候,都会用到这个方法。对于当前这种情况,我们需要获取的是某个database下的所有table列表,即loadTableList这个方法的功能。我们以这种情况为例,来简单看下loadWithCaching的处理逻辑:
public ImmutableCollection<TBriefTableMeta> loadTableList(final String dbName)
throws MetaException, UnknownDBException, TException {
ImmutableMap<String, TBriefTableMeta> res = loadWithCaching(
"table list of database " + dbName, TABLE_LIST_STATS_CATEGORY,
new DbCacheKey(dbName.toLowerCase(), DbCacheKey.DbInfoType.TABLE_LIST),
new Callable<ImmutableMap<String, TBriefTableMeta>>() {
@Override
public ImmutableMap<String, TBriefTableMeta> call() throws Exception {
//省略函数主体
}
});
return res.values();
}
private <CacheKeyType, ValueType> ValueType loadWithCaching(String itemString,
String statsCategory, CacheKeyType key,
final Callable<ValueType> loadCallable) throws TException {
//省略函数主体
}
loadWithCaching方法主要接收四个参数:
最终,这里的loadWithCaching返回的就是一个ImmutableMap<String, TBriefTableMeta>类型对象的value集合。Map中的每一条记录都代表database下的一个表,这里以tpch库为例,对应的数据如下所示:
初次获取到这些元数据信息之后,就会在CatalogdMetaProvider中的本地缓存中保存下来,如下所示:
/**
* The underlying cache.
*
* The keys in this cache are various types of objects (strings, DbCacheKey, etc).
* The values are also variant depending on the type of cache key. While any key
* is being loaded, it is a Future<T>, which gets replaced with a non-wrapped object
* once it is successfully loaded (see {@link #getIfPresent(Object)} for a convenient
* wrapper).
*
* For details of the usage of Futures within the cache, see
* {@link #loadWithCaching(String, String, Object, Callable).
*/
final Cache<Object,Object> cache_;
这里使用了guava的Cache相关类作为本地缓存,并进行相关的缓存操作。之后就可以直接通过缓存直接获取元数据,无需每次都跟catalogd进行交互。接着,在getTableIfCached方法中,先通过loadTableNames方法将所有的table都初始化为LocalIncompleteTable,然后放到了tables_中,这是一个Map<String, FeTable>类型的map。最后按照tblName进行匹配,返回对应的LocalIncompleteTable,相关代码如下所示:
//LocalDb.getTableIfCached(),删除部分无关代码
public FeTable getTableIfCached(String tblName) {
loadTableNames();
if (!tables_.containsKey(tblName)) {
return null;
}
return tables_.get(tblName);
}
//LocalDb.loadTableNames()
for (TBriefTableMeta meta : metaProvider.loadTableList(name_)) {
newMap.put(meta.getName(), new LocalIncompleteTable(this, meta));
}
tables_ = newMap;
获取到指定的FeTable之后,在getTable方法中进行判断,如果是incomplete状态,则需要进行load(这里的LocalIncompleteTable就是incomplete),相关代码如下所示:
//LocalDb.getTable(),删除部分无关代码
public FeTable getTable(String tableName) {
FeTable tbl = getTableIfCached(tblName);
if (tbl instanceof LocalIncompleteTable) {
tbl = LocalTable.load(this, tblName);
tables_.put(tblName, tbl);
}
return tbl;
}
加载完成之后,就得到了一个LocalFsTable对象,最终返回。所以,最后在getMissingTables函数中,得到的FeTable是一个LocalFsTable,这个表已经加载了,所以不是missing table。由此我们可以知道,在LocalCatalog模式下,SQL中涉及到的表,都不会是missing table(普通Catalog模式下的处理逻辑有所不同,如果是第一次访问的表,则会被当作missing tables,然后后续统一进行加载,之后再次访问的时候,就是直接返回具体的表,例如HdfsTable,感兴趣的同学可以自行阅读源码,这里不再展开)。后续就会按照正常的流程继续执行analysis操作。
在原先的普通Catalog模式下,表的元数据信息(包括分区、文件、block等)都是通过statestored的topic广播到各个c节点。即使SQL中只查询了表的一个分区,c节点的jvm也会缓存整个表的元数据信息。但是在LocalCatalog模式下,实现了分区粒度的元数据缓存,c节点只会缓存SQL查询的分区,而不是全量的元数据信息,也就是我们文章开头提到的“Fetch-on-demand”。下面我们就结合代码来看一下,LocalCatalog模式下,c节点如何进行分区粒度的元数据缓存的。 我们在上一章节中提到,获取missing table的时候,会将SQL中涉及到的表进行load操作。在进行load的时候,就会先加载表的分区信息,如下所示:
getTable(LocalDb.java):127
-load(LocalTable.java):121
--loadColumnStats(LocalFsTable.java):557
---loadPartitionValueMap(LocalFsTable.java):486
----loadPartitionSpecs(LocalFsTable.java):526
-----loadPartitionList(CatalogdMetaProvider.java):854
------loadWithCaching(CatalogdMetaProvider.java)
最终也是通过loadWithCaching方法来获取表的分区元数据信息。不过与上面的DbCacheKey不一样,这里用到了另外一种cache key:
public List<PartitionRef> loadPartitionList(final TableMetaRef table)
throws TException {
PartitionListCacheKey key = new PartitionListCacheKey((TableMetaRefImpl) table);
return (List<PartitionRef>) loadWithCaching("partition list for " + table,
PARTITION_LIST_STATS_CATEGORY, key, new Callable<List<PartitionRef>>() {
public List<PartitionRef> call() throws Exception {
//省略函数主体
}
});
}
这里用到了PartitionListCacheKey,表示的是一个表的partition list,获取的是表的所有分区信息。但是并不是完整的分区信息,这里我们以functional_parquet.alltypes表为例,如下所示:
可以看到,这里只包含了分区id和name,没有location、file等信息。因此,我们可以理解为这里的partition list只是一个分区的概要信息。接着,在创建HdfsScanNode的时候,会去加载SQL中涉及到的分区的详细元数据信息,如下所示:
createScanNode(SingleNodePlanner.java):1820
-createHdfsScanPlan(SingleNodePlanner.java):1530
--prunePartitions(HdfsPartitionPruner.java):174
---loadPartitions(LocalFsTable.java):445
----loadPartitionsByRefs(CatalogdMetaProvider.java):904
-----loadPartitionsFromCatalogd(CatalogdMetaProvider):931
----loadPartitionsByRefs(CatalogdMetaProvider.java):908
-----storePartitionsInCache(CatalogdMetaProvider):931
这里我们只截取了部分堆栈信息。最终,就是通过loadPartitionsFromCatalogd方法向catalogd获取指定分区的元数据信息。然后构造对应的PartitionCacheKey和PartitionMetadataImpl,分别作为cache key和value,存入到本地缓存中。这里我们仍然以functional_parquet.alltypes表为例,查询分区year=2009 and month=1,最终得到的缓存就是这样的:
可以看到,cache value中已经有了location、file等具体的信息。由此可知,在LocalCatalog模式下,Impala将table与partition进行了解耦。在本地缓存中,对于指定表,保存了一个partition list的记录,即PartitionListCacheKey。同时,对于每个SQL查询的分区,也有单独的分区缓存记录,也就是PartitionCacheKey。通过这种解耦,实现了分区粒度的元数据缓存。
我们在上面介绍loadWithCaching方法的时候,提到了有两个参数,其中一个就是cache key,表示缓存是哪种类型的数据。目前Impala支持了若干种类型的数据进行本地缓存,我们在上面提到了DbCacheKey、PartitionListCacheKey和PartitionCacheKey。分别表示的是database、partition list、partition的cache key。每种cache key对应的value都是不一样的。以DbCacheKey为例,在构造DbCacheKey的时候,也分不同的类型,一共有三种:
//CatalogdMetaProvider.DbCacheKey
static enum DbInfoType {
HMS_METADATA,
TABLE_LIST,
FUNCTION_NAMES
}
HMS_METADATA是我们在获取database本身数据的时候使用的类型,这种场景对应的value是org.apache.hadoop.hive.metastore.api.database;TABLE_LIST就是我们上面在获取db的table list时的情况,value是ImmutableMap<String, TBriefTableMeta>,每一条记录表示db下面的一个表;FUNCTION_NAMES,表示获取database下的函数,value类型是List。 除了这三种cache key之外,还有TableCacheKey,在缓存一个具体的表信息时,用其作为key,此时value是TableMetaRefImpl;ColStatsCacheKey,在缓存某个表的某列统计信息数据时,作为key使用,它的value则是ColumnStatisticsObj。还有其他的一些cache key,都位于CatalogdMetaProvider中,这里就不再是一一介绍,感兴趣的同学可以自行阅读源码。 另外一个参数statsCategory,这个参数主要是用来统计在访问cache内容时的命中情况。在调用loadWithCaching方法时,最后会对当前这个cache记录的访问情况进行统计。相关函数如下所示:
//CatalogdMetaProvider.java
private void addStatsToProfile(String statsCategory, int numHits, int numMisses,
Stopwatch stopwatch) {
FrontendProfile profile = FrontendProfile.getCurrentOrNull();
if (profile == null) return;
final String prefix = CATALOG_FETCH_PREFIX + "." +
Preconditions.checkNotNull(statsCategory) + ".";
profile.addToCounter(prefix + "Requests", TUnit.NONE, numHits + numMisses);
profile.addToCounter(prefix + "Time", TUnit.TIME_MS,
stopwatch.elapsed(TimeUnit.MILLISECONDS));
profile.addToCounter(prefix + "Hits", TUnit.NONE, numHits);
profile.addToCounter(prefix + "Misses", TUnit.NONE, numMisses);
}
可以看到,总共统计三种情况:hits、misses和requests。针对本地缓存中的不同数据,profile中也会按类进行统计,目前支持如下的这些类目:
//CatalogdMetaProvider.java
private static final String CATALOG_FETCH_PREFIX = "CatalogFetch";
private static final String DB_LIST_STATS_CATEGORY = "DatabaseList";
private static final String DB_METADATA_STATS_CATEGORY = "Databases";
private static final String TABLE_LIST_STATS_CATEGORY = "TableList";
private static final String TABLE_METADATA_CACHE_CATEGORY = "Tables";
private static final String PARTITION_LIST_STATS_CATEGORY = "PartitionLists";
private static final String PARTITIONS_STATS_CATEGORY = "Partitions";
private static final String COLUMN_STATS_STATS_CATEGORY = "ColumnStats";
private static final String GLOBAL_CONFIGURATION_STATS_CATEGORY = "Config";
private static final String FUNCTION_LIST_STATS_CATEGORY = "FunctionLists";
private static final String FUNCTIONS_STATS_CATEGORY = "Functions";
private static final String RPC_STATS_CATEGORY = "RPCs";
private static final String STORAGE_METADATA_LOAD_CATEGORY = "StorageLoad";
当SQL执行完成之后,我们就可以在profile页面看到这个统计了,如下所示:
到这里,关于在LocalCatalog模式下,c节点的一些处理流程就介绍的差不多了。由于整体改动非常大,本文也只是挑选了一些场景,结合代码进行讲解。总结一下,本文主要通过两个场景:Missing Tables的获取和分区粒度的元数据缓存,讲述了LocalCatalog模式下,c节点FE端的处理逻辑。同时也简单介绍了FE端table和db相关的一些class,以及本地缓存的相关内容。由于篇幅原因,关于catalogd端的处理流程会在后续的文章中跟大家继续分享。目前,Impala最新4.0版本,对于LocalCatalog模式的支持也已经比较成熟,感兴趣的读者可以尝试一下。此外,本文是笔者基于社区4.0.0代码的分析而来,如有错误,欢迎批评指正。