前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >LocalCatalog详解之Coordinator处理流程

LocalCatalog详解之Coordinator处理流程

作者头像
skyyws
发布2022-05-20 08:44:19
2480
发布2022-05-20 08:44:19
举报
文章被收录于专栏:skyyws的技术专栏

相关背景

用过Impala的同学都知道,Impala本身引入了一个catalogd服务,来缓存hms和nn中的一些元数据,例如表的信息、文件信息、block信息等。同时,这份元数据会通过statestored广播到所有的coordinator节点(以下简称c节点),executor节点不需要缓存元数据。这种设计可以极大的提升查询性能,每次SQL解析的时候,不需要再跟hms/nn进行交互,所有的元数据操作都通过catalogd来进行,c节点只需要周期性地从statestored获取元数据信息进行同步即可。 但是这种设计也带来了一定的问题,当元数据量非常大的时候,catalogd本身就成为了瓶颈,会出现各种各样的问题。为此,社区从3.x版本就开始开发了一种新的catalog模式,称为LocalCatalog模式,也可以叫“Fetch-on-demand”。关于这个LocalCatalog模式的相关讨论,可以参考社区JIRA:IMPALA-7127,里面也有设计文档,这里就不再展开说明。本文主要会从两个常见的场景出发,结合代码,来跟大家一起学习下LocalCatalog模式下,c节点的一些处理逻辑。由于LocalCatalog模式涉及到的内容非常多,因此本文可以无法一一覆盖,敬请谅解。

FE端相关类介绍

当c节点接收到client发来的SQL请求之后,会通过JNI来调用FE端的代码,进行SQL解析,相关操作的API入口位于JniFrontend.java中,具体的执行处理逻辑位于Frontend.java中。当c节点启动的时候,系统会根据配置项来构造一个对应的Frontend实例,相关代码如下所示:

代码语言:javascript
复制
//JniFrontend.ctor
if (cfg.is_coordinator) {
  frontend_ = new Frontend(authzFactory, isBackendTest);
} else {
  frontend_ = null;
}

//Frontend.ctor
public Frontend(AuthorizationFactory authzFactory, boolean isBackendTest)
    throws ImpalaException {
  this(authzFactory, FeCatalogManager.createFromBackendConfig(), isBackendTest);
}

private Frontend(AuthorizationFactory authzFactory, FeCatalogManager catalogManager,
      boolean isBackendTest) throws ImpalaException {
  //省略主体代码
}

在Frontend的private构造函数参数中,需要传入一个FeCatalogManager变量,用于进行后续的catalog相关操作。这里的createFromBackendConfig()函数,就是根据BE端的配置项,来决定启动哪种模式:

代码语言:javascript
复制
//FeCatalogManager.java
public static FeCatalogManager createFromBackendConfig() {
  if (BackendConfig.INSTANCE.getBackendCfg().use_local_catalog) {
    return new LocalImpl();
  } else {
    return new CatalogdImpl();
  }
}

如果use_local_catalog为true,返回LocalImpl,表示启动LocalCatalog模式;否则启返回CatalogImpl,表示沿用之前的普通Catalog模式。关于FeCatalogManager的UML类图如下所示:

这个FeCatalogManager只是用来管理产生相应的FeCatalog,并不负责实际的元数据操作。FeCatalogManager通过getOrCreateCatalog()方法来构造对应的FeCatalog变量:

代码语言:javascript
复制
//FeCatalogManager.CatalogdImpl
public FeCatalog getOrCreateCatalog() {
  return catalog_.get();
}
    
//FeCatalogManager.LocalImpl
public FeCatalog getOrCreateCatalog() {
  PROVIDER.setAuthzChecker(authzChecker_);
  return new LocalCatalog(PROVIDER, DEFAULT_KUDU_MASTER_HOSTS);
}

对于CatalogdImpl,直接返回了ImpaladCatalog类型的私有成员变量,用于后续的元数据操作;而对于LocalImpl,每次都是构造了一个新的LocalCatalog,其中包含了CatalogdMetaProvider,这个类就是实际进行元数据操作的,后面我们会再详细介绍。这两个Catalog都是FeCatalog的实现,相关的UML类图如下所示:

简单介绍下图中的相关类:Catalog是一个abstract class,用于读取/更新HMS中的元数据,它的两个子类,CatalogServiceCatalog主要是在catalogd中进行各种元数据相关的操作;ImpaladCatalog是在impalad中进行各种元数据的相关操作(普通Catalog模式,即use_local_catalog为false)。LocalCatalog就是在LocalCatalog模式下,利用CatalogdMetaProvider,在impalad中进行各种的元数据操作。FeCatalog是一个接口,有一些元数据相关操作相关的api,例如getTable、getDb等。

C节点处理流程

上面介绍了元数据操作的相关类。接下来,我们以普通的查询流程为例,看看在LocalCatalog模式下,FE是如何进行元数据的加载和缓存的。下面就是c节点在接收到SQL解析请求之后,FE端的函数调用栈:

代码语言:javascript
复制
createExecRequest(JniFrontend.java):162
-createExecRequest(Frontend.java):1595
--getTExecRequest(Frontend.java):1625
---doCreateExecRequest(Frontend.java):1655
----loadTables(StmtMetadataLoader.java):138
-----loadTables(StmtMetadataLoader.java):162
------getMissingTables(StmtMetadataLoader.java)

Impala首先会对SQL进行parse,转换成一个AST。然后利用表的元数据进行analysis。在进行analysis阶段之前,会先检查SQL中涉及到的表,是否都已经加载。如果没有的话,需要主动加载元数据,并缓存在c节点的内存中。相关的代码如下所示:

代码语言:javascript
复制
//StmtMetadataLoader.loadTables()
FeCatalog catalog = fe_.getCatalog();
Set<TableName> missingTbls = getMissingTables(catalog, tbls);

对于LocalCatalog模式,就是通过上面提到的LocalImpl.getOrCreateCatalog方法,每次创建一个新的LocalCatalog返回。实际的元数据操作是通过LocalCatalog的私有成员变量来执行的(类型是CatalogdMetaProvider)。我们沿着getMissingTables()函数的处理逻辑继续往下看:

代码语言:javascript
复制
//StmtMetadataLoader.getMissingTables()
for (TableName tblName: tbls) {
  if (loadedOrFailedTbls_.containsKey(tblName)) continue;
  FeDb db = catalog.getDb(tblName.getDb());
  if (db == null) continue;
  dbs_.add(tblName.getDb());
  FeTable tbl = db.getTable(tblName.getTbl());
  if (tbl == null) continue;
  if (!tbl.isLoaded()) {
    missingTbls.add(tblName);
    continue;
  }
  //省略部分代码
}

这段代码的功能,就是获取SQL中没有加载的表,称之为Missing Table。对于这些表,需要先加载完元数据,才能继续后面的analysis操作。首先根据FeCatalog(这里就是LocalCatalog)和表的库名,来获取对应的FeDb。关于FeDb相关的UML类图如下所示:

FeDb就是在FE中,database的一个接口,提供了一些与database相关的api,例如getMetaStoreDb、containsTable、getTable等。在LocalCatalog模式中,FeDb实际上是就是LocalDb。接着就会根据这个LocalDb来获取对应的FeTable。与FeDb类似,FeTable就是在FE中,table的一个接口。目前Impala支持多种table,例如hdfs、kudu、hbase等。这里我们以hdfs表为例进行介绍,相关的UML类图如下所示:

这个图相对比较复杂,这里简单介绍一下,不感兴趣的同学可以先跳过。FeTable是在FE端与表进行交互的一个接口,Impala针对每种类型的表,又实现了各自的接口,例如FeFsTable(parquet、orc、text等)、FeHBaseTable、FeKuduTable等,这些都继承自FeTable。 在普通的Catalog模式下,Impala实现了一个Table,这是一个abstract class,可以理解为表作为一个catalog object的基类。因此,对于每一个具体的表,例如HdfsTable,需要继承Table,并实现FeFsTable。在LocalCatalog模式下,Impala又实现了一个LocalTable类,与Table对应。此时对于每一种具体的表,例如LocalFsTable,也需要继承LocalTable,并实现FeFsTable。对于其他类型的表,kudu、hbase等,也是如此。 除此之外,图中还有一个接口FeIncompleteTable,分别有两个实现:IncompleteTable和LocalIncompleteTable,代表的就是表加载失败之后的状态。每种具体类型的表,如果加载失败了,都会变成incomplete状态,同时还会保存加载失败的原因。 到这里,关于FeTable的介绍就差不多已经结束了。回到上面的那段代码,在LocalCatalog模式下,我们获取到的FeTable实际上就是一个LocalFsTable。接下来,我们就看一下,如何根据表名,通过LocalDb来获取对应的LocalFsTable,这里我们承接了上面的调用栈:

代码语言:javascript
复制
getMissingTables(StmtMetadataLoader.java):307
-getTable(LocalDb.java):123
--getTableIfCached(LocalDb.java):110
---loadTableNames(LocalDb.java):170
----loadTableList(CatalogdMetaProvider.java):662
-----loadWithCaching(CatalogdMetaProvider.java)

可以看到,最终在CatalogdMetaProvider.java中,就是通过这个loadWithCaching方法来向catalogd获取相关的元数据信息。在获取database、table等信息的时候,都会用到这个方法。对于当前这种情况,我们需要获取的是某个database下的所有table列表,即loadTableList这个方法的功能。我们以这种情况为例,来简单看下loadWithCaching的处理逻辑:

代码语言:javascript
复制
public ImmutableCollection<TBriefTableMeta> loadTableList(final String dbName)
    throws MetaException, UnknownDBException, TException {
  ImmutableMap<String, TBriefTableMeta> res = loadWithCaching(
      "table list of database " + dbName, TABLE_LIST_STATS_CATEGORY,
      new DbCacheKey(dbName.toLowerCase(), DbCacheKey.DbInfoType.TABLE_LIST),
      new Callable<ImmutableMap<String, TBriefTableMeta>>() {
        @Override
        public ImmutableMap<String, TBriefTableMeta> call() throws Exception {
          //省略函数主体
        }
    });
  return res.values();
}

private <CacheKeyType, ValueType> ValueType loadWithCaching(String itemString,
    String statsCategory, CacheKeyType key,
    final Callable<ValueType> loadCallable) throws TException {
//省略函数主体
}

loadWithCaching方法主要接收四个参数:

  • itemString,这个主要用于日志记录的时候,说明该此次请求的场景,这里就是:"table list of database " + dbName,表示获取指定database的table list;
  • statsCategory,这个主要是在profile里面按照各种场景进行归类统计,这里就是:TABLE_LIST_STATS_CATEGORY,代表的是TableList。关于这个参数后面章节再展开介绍;
  • key,这个表示的是在本地缓存中的key,目前也有好几种类型,当前对应的是DbCacheKey这种类型的缓存,关于cache key的更多内容,也放在后面章节介绍;
  • loadCallable,是一个实现了Callable接口的对象,我们省略了call函数的主体,该函数主要就是通过rpc请求向catalogd获取对应的元数据信息,关于这块更多的内容,我们再后续文章讲catalogd的时候,再进行分析,这里暂不展开;

最终,这里的loadWithCaching返回的就是一个ImmutableMap<String, TBriefTableMeta>类型对象的value集合。Map中的每一条记录都代表database下的一个表,这里以tpch库为例,对应的数据如下所示:

初次获取到这些元数据信息之后,就会在CatalogdMetaProvider中的本地缓存中保存下来,如下所示:

代码语言:javascript
复制
/**
 * The underlying cache.
 *
 * The keys in this cache are various types of objects (strings, DbCacheKey, etc).
 * The values are also variant depending on the type of cache key. While any key
 * is being loaded, it is a Future<T>, which gets replaced with a non-wrapped object
 * once it is successfully loaded (see {@link #getIfPresent(Object)} for a convenient
 * wrapper).
 *
 * For details of the usage of Futures within the cache, see
 * {@link #loadWithCaching(String, String, Object, Callable).
 */
final Cache<Object,Object> cache_;

这里使用了guava的Cache相关类作为本地缓存,并进行相关的缓存操作。之后就可以直接通过缓存直接获取元数据,无需每次都跟catalogd进行交互。接着,在getTableIfCached方法中,先通过loadTableNames方法将所有的table都初始化为LocalIncompleteTable,然后放到了tables_中,这是一个Map<String, FeTable>类型的map。最后按照tblName进行匹配,返回对应的LocalIncompleteTable,相关代码如下所示:

代码语言:javascript
复制
//LocalDb.getTableIfCached(),删除部分无关代码
public FeTable getTableIfCached(String tblName) {
  loadTableNames();
  if (!tables_.containsKey(tblName)) {
    return null;
  }
  return tables_.get(tblName);
}
//LocalDb.loadTableNames()
for (TBriefTableMeta meta : metaProvider.loadTableList(name_)) {
  newMap.put(meta.getName(), new LocalIncompleteTable(this, meta));
}
tables_ = newMap;

获取到指定的FeTable之后,在getTable方法中进行判断,如果是incomplete状态,则需要进行load(这里的LocalIncompleteTable就是incomplete),相关代码如下所示:

代码语言:javascript
复制
//LocalDb.getTable(),删除部分无关代码
public FeTable getTable(String tableName) {
  FeTable tbl = getTableIfCached(tblName);
  if (tbl instanceof LocalIncompleteTable) {
    tbl = LocalTable.load(this, tblName);
    tables_.put(tblName, tbl);
  }
  return tbl;
}

加载完成之后,就得到了一个LocalFsTable对象,最终返回。所以,最后在getMissingTables函数中,得到的FeTable是一个LocalFsTable,这个表已经加载了,所以不是missing table。由此我们可以知道,在LocalCatalog模式下,SQL中涉及到的表,都不会是missing table(普通Catalog模式下的处理逻辑有所不同,如果是第一次访问的表,则会被当作missing tables,然后后续统一进行加载,之后再次访问的时候,就是直接返回具体的表,例如HdfsTable,感兴趣的同学可以自行阅读源码,这里不再展开)。后续就会按照正常的流程继续执行analysis操作。

分区粒度的元数据缓存

在原先的普通Catalog模式下,表的元数据信息(包括分区、文件、block等)都是通过statestored的topic广播到各个c节点。即使SQL中只查询了表的一个分区,c节点的jvm也会缓存整个表的元数据信息。但是在LocalCatalog模式下,实现了分区粒度的元数据缓存,c节点只会缓存SQL查询的分区,而不是全量的元数据信息,也就是我们文章开头提到的“Fetch-on-demand”。下面我们就结合代码来看一下,LocalCatalog模式下,c节点如何进行分区粒度的元数据缓存的。 我们在上一章节中提到,获取missing table的时候,会将SQL中涉及到的表进行load操作。在进行load的时候,就会先加载表的分区信息,如下所示:

代码语言:javascript
复制
getTable(LocalDb.java):127
-load(LocalTable.java):121
--loadColumnStats(LocalFsTable.java):557
---loadPartitionValueMap(LocalFsTable.java):486
----loadPartitionSpecs(LocalFsTable.java):526
-----loadPartitionList(CatalogdMetaProvider.java):854
------loadWithCaching(CatalogdMetaProvider.java)

最终也是通过loadWithCaching方法来获取表的分区元数据信息。不过与上面的DbCacheKey不一样,这里用到了另外一种cache key:

代码语言:javascript
复制
public List<PartitionRef> loadPartitionList(final TableMetaRef table)
    throws TException {
  PartitionListCacheKey key = new PartitionListCacheKey((TableMetaRefImpl) table);
  return (List<PartitionRef>) loadWithCaching("partition list for " + table,
      PARTITION_LIST_STATS_CATEGORY, key, new Callable<List<PartitionRef>>() {
        public List<PartitionRef> call() throws Exception {
          //省略函数主体
        }
      });
}

这里用到了PartitionListCacheKey,表示的是一个表的partition list,获取的是表的所有分区信息。但是并不是完整的分区信息,这里我们以functional_parquet.alltypes表为例,如下所示:

可以看到,这里只包含了分区id和name,没有location、file等信息。因此,我们可以理解为这里的partition list只是一个分区的概要信息。接着,在创建HdfsScanNode的时候,会去加载SQL中涉及到的分区的详细元数据信息,如下所示:

代码语言:javascript
复制
createScanNode(SingleNodePlanner.java):1820
-createHdfsScanPlan(SingleNodePlanner.java):1530
--prunePartitions(HdfsPartitionPruner.java):174
---loadPartitions(LocalFsTable.java):445
----loadPartitionsByRefs(CatalogdMetaProvider.java):904
-----loadPartitionsFromCatalogd(CatalogdMetaProvider):931
----loadPartitionsByRefs(CatalogdMetaProvider.java):908
-----storePartitionsInCache(CatalogdMetaProvider):931

这里我们只截取了部分堆栈信息。最终,就是通过loadPartitionsFromCatalogd方法向catalogd获取指定分区的元数据信息。然后构造对应的PartitionCacheKey和PartitionMetadataImpl,分别作为cache key和value,存入到本地缓存中。这里我们仍然以functional_parquet.alltypes表为例,查询分区year=2009 and month=1,最终得到的缓存就是这样的:

可以看到,cache value中已经有了location、file等具体的信息。由此可知,在LocalCatalog模式下,Impala将table与partition进行了解耦。在本地缓存中,对于指定表,保存了一个partition list的记录,即PartitionListCacheKey。同时,对于每个SQL查询的分区,也有单独的分区缓存记录,也就是PartitionCacheKey。通过这种解耦,实现了分区粒度的元数据缓存。

本地缓存相关的内容

我们在上面介绍loadWithCaching方法的时候,提到了有两个参数,其中一个就是cache key,表示缓存是哪种类型的数据。目前Impala支持了若干种类型的数据进行本地缓存,我们在上面提到了DbCacheKey、PartitionListCacheKey和PartitionCacheKey。分别表示的是database、partition list、partition的cache key。每种cache key对应的value都是不一样的。以DbCacheKey为例,在构造DbCacheKey的时候,也分不同的类型,一共有三种:

代码语言:javascript
复制
//CatalogdMetaProvider.DbCacheKey
static enum DbInfoType {
  HMS_METADATA,
  TABLE_LIST,
  FUNCTION_NAMES
}

HMS_METADATA是我们在获取database本身数据的时候使用的类型,这种场景对应的value是org.apache.hadoop.hive.metastore.api.database;TABLE_LIST就是我们上面在获取db的table list时的情况,value是ImmutableMap<String, TBriefTableMeta>,每一条记录表示db下面的一个表;FUNCTION_NAMES,表示获取database下的函数,value类型是List。 除了这三种cache key之外,还有TableCacheKey,在缓存一个具体的表信息时,用其作为key,此时value是TableMetaRefImpl;ColStatsCacheKey,在缓存某个表的某列统计信息数据时,作为key使用,它的value则是ColumnStatisticsObj。还有其他的一些cache key,都位于CatalogdMetaProvider中,这里就不再是一一介绍,感兴趣的同学可以自行阅读源码。 另外一个参数statsCategory,这个参数主要是用来统计在访问cache内容时的命中情况。在调用loadWithCaching方法时,最后会对当前这个cache记录的访问情况进行统计。相关函数如下所示:

代码语言:javascript
复制
//CatalogdMetaProvider.java
private void addStatsToProfile(String statsCategory, int numHits, int numMisses,
    Stopwatch stopwatch) {
  FrontendProfile profile = FrontendProfile.getCurrentOrNull();
  if (profile == null) return;
  final String prefix = CATALOG_FETCH_PREFIX + "." +
      Preconditions.checkNotNull(statsCategory) + ".";
  profile.addToCounter(prefix + "Requests", TUnit.NONE, numHits + numMisses);
  profile.addToCounter(prefix + "Time", TUnit.TIME_MS,
      stopwatch.elapsed(TimeUnit.MILLISECONDS));
  profile.addToCounter(prefix + "Hits", TUnit.NONE, numHits);
  profile.addToCounter(prefix + "Misses", TUnit.NONE, numMisses);
}

可以看到,总共统计三种情况:hits、misses和requests。针对本地缓存中的不同数据,profile中也会按类进行统计,目前支持如下的这些类目:

代码语言:javascript
复制
//CatalogdMetaProvider.java
private static final String CATALOG_FETCH_PREFIX = "CatalogFetch";
private static final String DB_LIST_STATS_CATEGORY = "DatabaseList";
private static final String DB_METADATA_STATS_CATEGORY = "Databases";
private static final String TABLE_LIST_STATS_CATEGORY = "TableList";
private static final String TABLE_METADATA_CACHE_CATEGORY = "Tables";
private static final String PARTITION_LIST_STATS_CATEGORY = "PartitionLists";
private static final String PARTITIONS_STATS_CATEGORY = "Partitions";
private static final String COLUMN_STATS_STATS_CATEGORY = "ColumnStats";
private static final String GLOBAL_CONFIGURATION_STATS_CATEGORY = "Config";
private static final String FUNCTION_LIST_STATS_CATEGORY = "FunctionLists";
private static final String FUNCTIONS_STATS_CATEGORY = "Functions";
private static final String RPC_STATS_CATEGORY = "RPCs";
private static final String STORAGE_METADATA_LOAD_CATEGORY = "StorageLoad";

当SQL执行完成之后,我们就可以在profile页面看到这个统计了,如下所示:

总结

到这里,关于在LocalCatalog模式下,c节点的一些处理流程就介绍的差不多了。由于整体改动非常大,本文也只是挑选了一些场景,结合代码进行讲解。总结一下,本文主要通过两个场景:Missing Tables的获取和分区粒度的元数据缓存,讲述了LocalCatalog模式下,c节点FE端的处理逻辑。同时也简单介绍了FE端table和db相关的一些class,以及本地缓存的相关内容。由于篇幅原因,关于catalogd端的处理流程会在后续的文章中跟大家继续分享。目前,Impala最新4.0版本,对于LocalCatalog模式的支持也已经比较成熟,感兴趣的读者可以尝试一下。此外,本文是笔者基于社区4.0.0代码的分析而来,如有错误,欢迎批评指正。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-09-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 相关背景
  • FE端相关类介绍
  • C节点处理流程
  • 分区粒度的元数据缓存
  • 本地缓存相关的内容
  • 总结
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档