了解过Impala的同学都知道,Impala的节点分为BE和FE两个模块,分别是由C++和Java编写的。对于impalad而言,FE端主要是进行SQL的解析,具体的执行则是在BE端进行的;而对于catalogd而言,主要的元数据操作都是在FE端通过调用hms的API执行的,BE端主要是进行一些RPC通信。关于这两个模块之间是如何交互的,相关的资料比较少。因此,本文笔者就和大家一起学习下,Impala的BE和FE之间是如何通过JNI进行交互的。
如果想在C++调用Java的方法,需要先启动一个Jvm。我们这里以catalogd为例,相关的函数调用如下所示:
CatalogdMain(catalogd-main.cc):55
-InitCommonRuntime(init.cc):385
--InitLibhdfs(jni-util.cc):213
---hdfsConnect
可以看到,Impala并不是直接调用JNI的JNI_CreateJavaVM方法来创建Jvm的。而是在InitLibhdfs方法中,通过调用hdfs的API hdfsConnect方法来实现Jvm的创建,我们查看hdfs源码的部分调用栈:
//项目地址:https://github.com/apache/hadoop-hdfs
hdfsConnect(hdfs.c):171
-hdfsConnectAsUser(hdfs.c):199
--getJNIEnv(hdfsJniHelper.c):463
---JNI_CreateJavaVM
在hdfs的c++库中,通过调用JNI_CreateJavaVM方法,创建了一个新的Jvm。关于JNI_CreateJavaVM方法的官方解释如下所示:
JNI_CreateJavaVM
jint JNI_CreateJavaVM(JavaVM **p_vm, void **p_env, void *vm_args);
Loads and initializes a Java VM. The current thread becomes the main thread. Sets the env argument to the JNI interface pointer of the main thread.
Creation of multiple VMs in a single process is not supported.
可以看到,对于一个进程只能创建一个Jvm。我们沿着上面的Impala调用栈继续往下看:
InitCommonRuntime(init.cc):387
-Init(jni-util.cc):133
--GetJNIEnv(jni-util.h):267
---GetJNIEnvSlowPath(jni-util.cc):233
----FindJavaVMOrDie(jni-util.cc):223
-----JNI_GetCreatedJavaVMs
最后在Impala的FindJavaVMOrDie方法中,通过调用JNI的JNI_GetCreatedJavaVMs方法,可以获取本进程已经创建的Jvm,也就是刚刚通过hdfs的API创建的那个Jvm。当我们需要在BE端调用FE端的方法时,就可以通过这个Jvm获取相应的JNIEnv,然后调用相应的API,我们在下面章节会继续讲解具体的调用场景。
当我们使用start-impala-cluster.py启动测试集群的时候,脚本就会自动设置环境变量JAVA_TOOL_OPTIONS,在创建Jvm的时候会调用这个环境变量。如下所示:
在build_java_tool_options方法中会构造JAVA_TOOL_OPTIONS变量对应的value,默认是会设置一个DEBUG的调试端口配置,然后返回并添加到环境变量中。同时,脚本还支持参数jvm_args,可以追加一些新的Jvm配置,如下所示:
./bin/start-impala-cluster.py '--jvm_args=-Xms10g -Xmx10g'
这样我们就将Jvm的heap size设置成了10g。我们也可以在日志中,看到Jvm相关的配置打印,如下所示:
值得一提的是,除了上面提到的jvm_args可以修改Jvm的参数,还有另外一种方式也可以修改。我们在上一节中提到Jvm是通过hdfs的相关api来创建的。因此,我们还可以通过环境变量LIBHDFS_OPTS来调整Jvm的参数,例如:
export LIBHDFS_OPTS="-Xms10g -Xmx10g"
经过测试发现,当使用start-impala-cluster.py启动测试集群的时候,jvm_args参数的优先级要高于LIBHDFS_OPTS环境变量。直接设置JAVA_TOOL_OPTIONS环境变量则不生效,会在start-impala-cluster.py脚本中被覆盖掉。
上面提到,在启动Impala进程的时候,会先创建一个内嵌的Jvm,接着就可以通过这个Jvm获取相应的JNIEnv对象,来加载FE端的相关方法。以catalogd为例,相关的函数调用如下所示:
CatalogdMain(catalogd-main.cc):63
-Start(catalog-server.cc):276
--Catalog(catalog.cc):75
---LoadJniMethod(jni-util.cc)
通过LoadJniMethod方法就可以加载FE端的方法。对于catalogd而言,这些方法都位于JniCatalog.java类中,在Catalog的构造函数中进行绑定:
//catalog.cc
JniMethodDescriptor methods[] = {
{"<init>", "([B)V", &catalog_ctor_},
{"updateCatalog", "([B)[B", &update_metastore_id_},
{"execDdl", "([B)[B", &exec_ddl_id_},
{"resetMetadata", "([B)[B", &reset_metadata_id_},
//省略余下代码
方法加载完成之后,就可以在BE端通过JNI的相关接口进行调用。这里我们以常见的create table为例,这是一个DDL类型的SQL,对于DDL/DML,SQL首先会提交到coordinator节点,最终是由catalogd来执行的,我们将整个流程归纳如下:
主要分为四个步骤,结合上面的图来分别看下每一步的主要逻辑:
到这里为止,一次create table流程就完成了。可以看到,在这个过程中,coordinator和catalogd都通过JNI调用实现了BE和FE之间的交互。
上面介绍了Impala如何在BE端调用FE的方法。其实,在Impala中也存在FE端调用BE方法的场景,这里简单来看一下。在之前的文章:LocalCatalog详解之Coordinator处理流程中,我们介绍了在LocalCatalog模式下,coordinator节点主要是通过CatalogdMetaProvider类来向catalogd获取所需的元数据,即“Fetch-on-demand”,这个过程就涉及到了FE端对BE方法的调用。在调用loadWithCaching方法时,会实现一个Callable对象,重载call方法。在这个call方法中就会涉及到JNI的调用(我们在那篇文章中省略了call方法主体),相关函数调用如下所示:
loadTableList(CatalogdMetaProvider.java):667
-call(CatalogdMetaProvider.java):670
--sendRequest(CatalogdMetaProvider.java):400
---GetPartialCatalogObject(FeSupport.java):438/442
----NativeGetPartialCatalogObject(FeSupport.java):111
-----Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(fe-support.cc)
这里的call方法实际是在loadWithCaching函数中调用的,为了方便读者阅读源码,我们就按照它实现的位置进行了展示。当调用BE端的方法之后,相应的流程如下所示:
Java_org_apache_impala_service_FeSupport_NativeGetPartialCatalogObject(fe-support.cc):593
-GetPartialCatalogObject(catalog-op-executor.cc):370
--DoRpcWithRetry(client-cache.h)
---GetPartialCatalogObject(catalog-service-client-wrapper.h)
可以看到,与上面的create table类似,这里也是通过RPC,将GetPartialCatalogObject作为参数传给了DoRpcWithRetry,最终也是通过catalogd来获取元数据信息,然后返回给coordinator。关于catalogd的处理流程,不是本文关注的重点,这里不再展开说明,后续会在LocalCatalog系列的文章中再详细介绍。值得一提的是,这些在BE端会被FE调用的方法,在编译完成之后,最终都会位于be/build/latest/service/libfesupport.so这个动态库中。
到这里,关于Impala的FE和BE的交互就介绍的差不多了。总结一下,本文首先介绍了Impala是如何在c++进程中来创建Jvm的,接着又介绍了如何调整集群的Jvm参数。最后通过两个场景讲解了FE和BE之间的JNI调用。总之,在当前在大数据系统很多都是Java实现的情况下,Impala这种结合C++和Java的玩法还是比较有意思的,大家可以了解了解。