本文是《ElasticSearch搜索引擎详解》系列文章的第二篇:ElasticSearch详解——2.阅读源码详解ES启动过程。 有兴趣的读者可以订阅《ElasticSearch搜索引擎详解》专栏,及时获取最新文章通知。
说明:本文章使用的ES版本是:6.7.0
上一篇文章ElasticSearch详解——1.源码编译和本地Debug环境搭建说了ES的源码编译以及如何在本地编译。这一篇文章主要说明ES的启动过程。
启动函数:org.elasticsearch.bootstrap.ElasticSearch
设置如下断点:
启动在上一篇文章ElasticSearch详解——1.源码编译和本地Debug环境搭建中介绍的Debug模式中的一种,这里用的远程Debug模式。
跟着Debug流程走一遍,可以看出ES启动流程大概分为以下几个阶段:
程序入口代码如下:
Bootstrap阶段做的事情比较多,主要方法如下:
/**
* This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch.
*/
static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
// force the class initializer for BootstrapInfo to run before
// the security manager is installed
BootstrapInfo.init();
INSTANCE = new Bootstrap();
final SecureSettings keystore = loadSecureSettings(initialEnv);
final Environment environment = createEnvironment(foreground, pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
if (Node.NODE_NAME_SETTING.exists(environment.settings())) {
LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
}
try {
LogConfigurator.configure(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
if (environment.pidFile() != null) {
try {
PidFile.create(environment.pidFile(), true);
} catch (IOException e) {
throw new BootstrapException(e);
}
}
final boolean closeStandardStreams = (foreground == false) || quiet;
try {
if (closeStandardStreams) {
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
closeSystOut();
}
// fail if somebody replaced the lucene jars
checkLucene();
// install the default uncaught exception handler; must be done before security is
// initialized as we do not want to grant the runtime permission
// setDefaultUncaughtExceptionHandler
Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
INSTANCE.setup(true, environment);
try {
// any secure settings must be read during node construction
IOUtils.close(keystore);
} catch (IOException e) {
throw new BootstrapException(e);
}
INSTANCE.start();
if (closeStandardStreams) {
closeSysError();
}
} catch (NodeValidationException | RuntimeException e) {
// disable console logging, so user does not see the exception twice (jvm will show it already)
final Logger rootLogger = LogManager.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (foreground && maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
Logger logger = LogManager.getLogger(Bootstrap.class);
// HACK, it sucks to do this, but we will run users out of disk space otherwise
if (e instanceof CreationException) {
// guice: log the shortened exc to the log file
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = null;
try {
ps = new PrintStream(os, false, "UTF-8");
} catch (UnsupportedEncodingException uee) {
assert false;
e.addSuppressed(uee);
}
new StartupException(e).printStackTrace(ps);
ps.flush();
try {
logger.error("Guice Exception: {}", os.toString("UTF-8"));
} catch (UnsupportedEncodingException uee) {
assert false;
e.addSuppressed(uee);
}
} else if (e instanceof NodeValidationException) {
logger.error("node validation exception\n{}", e.getMessage());
} else {
// full exception
logger.error("Exception", e);
}
// re-enable it if appropriate, so they can see any logging during the shutdown process
if (foreground && maybeConsoleAppender != null) {
Loggers.addAppender(rootLogger, maybeConsoleAppender);
}
throw e;
}
}
详细流程如下:
log4j2.properties
配置文件加载日志相关配置在第二个阶段中的最后两步都就是和创建节点相关的。
在Bootstrap.init中调用该方法。
setup方法如下:
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();
try {
spawner.spawnNativeControllers(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
initializeNatives(
environment.tmpFile(),
BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
// initialize probes before the security manager is installed
initializeProbes();
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
}
}
});
}
try {
// look for jar hell
final Logger logger = LogManager.getLogger(JarHell.class);
JarHell.checkJarHell(logger::debug);
} catch (IOException | URISyntaxException e) {
throw new BootstrapException(e);
}
// Log ifconfig output before SecurityManager is installed
IfConfig.logIfNecessary();
// install SM after natives, shutdown hooks, etc.
try {
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
throw new BootstrapException(e);
}
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
@Override
protected void registerDerivedNodeNameWithLogger(String nodeName) {
LogConfigurator.setNodeName(nodeName);
}
};
}
Node的创建过程很复杂,这里只大概说一下里面做了哪些事情,详细的过程还需读者细度源码。其部分代码如下:
/**
* Constructs a node
*
* @param environment the environment for this node
* @param classpathPlugins the plugins to be loaded from the classpath
* @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the
* test framework for tests that rely on being able to set private settings
*/
protected Node(
final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) {
logger = LogManager.getLogger(Node.class);
final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
boolean success = false;
try {
Settings tmpSettings = Settings.builder().put(environment.settings())
.put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();
/*
* Create the node environment as soon as possible so we can
* recover the node id which we might have to use to derive the
* node name. And it is important to get *that* as soon as possible
* so that log lines can contain it.
*/
boolean nodeNameExplicitlyDefined = NODE_NAME_SETTING.exists(tmpSettings);
try {
Consumer<String> nodeIdConsumer = nodeNameExplicitlyDefined ?
nodeId -> {} : nodeId -> registerDerivedNodeNameWithLogger(nodeIdToNodeName(nodeId));
nodeEnvironment = new NodeEnvironment(tmpSettings, environment, nodeIdConsumer);
resourcesToClose.add(nodeEnvironment);
} catch (IOException ex) {
throw new IllegalStateException("Failed to create node environment", ex);
}
if (nodeNameExplicitlyDefined) {
logger.info("node name [{}], node ID [{}]",
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId());
} else {
tmpSettings = Settings.builder()
.put(tmpSettings)
.put(NODE_NAME_SETTING.getKey(), nodeIdToNodeName(nodeEnvironment.nodeId()))
.build();
logger.info("node name derived from node ID [{}]; set [{}] to override",
nodeEnvironment.nodeId(), NODE_NAME_SETTING.getKey());
}
final JvmInfo jvmInfo = JvmInfo.jvmInfo();
logger.info(
"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
jvmInfo.pid(),
Build.CURRENT.flavor().displayName(),
Build.CURRENT.type().displayName(),
Build.CURRENT.shortHash(),
Build.CURRENT.date(),
Constants.OS_NAME,
Constants.OS_VERSION,
Constants.OS_ARCH,
Constants.JVM_VENDOR,
Constants.JVM_NAME,
Constants.JAVA_VERSION,
Constants.JVM_VERSION);
logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
if (logger.isDebugEnabled()) {
logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]",
environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile());
}
...
}
Node 实例化对象过程如下:
if (NetworkModule.HTTP_ENABLED.get(settings)) {
logger.debug("initializing HTTP handlers ...");
actionModule.initRestHandlers(() -> clusterService.state().nodes());
}
在new PluginsService中有代码:Set<Bundle> modules = getModuleBundles(modulesDirectory);,用来加载模块和插件,跟进代码来到org.elasticsearch.plugins.PluginsService#readPluginBundle方法如下:
// get a bundle for a single plugin dir
private static Bundle readPluginBundle(final Set<Bundle> bundles, final Path plugin, String type) throws IOException {
LogManager.getLogger(PluginsService.class).trace("--- adding [{}] [{}]", type, plugin.toAbsolutePath());
final PluginInfo info;
try {
info = PluginInfo.readFromProperties(plugin);
} catch (final IOException e) {
throw new IllegalStateException("Could not load plugin descriptor for " + type +
" directory [" + plugin.getFileName() + "]", e);
}
final Bundle bundle = new Bundle(info, plugin);
if (bundles.add(bundle) == false) {
throw new IllegalStateException("duplicate " + type + ": " + info);
}
return bundle;
}
其中的info = PluginInfo.readFromProperties(plugin);就是从指定目录加载模块或者插件,代码如下:
/**
* Reads the plugin descriptor file.
*
* @param path the path to the root directory for the plugin
* @return the plugin info
* @throws IOException if an I/O exception occurred reading the plugin descriptor
*/
public static PluginInfo readFromProperties(final Path path) throws IOException {
final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);
final Map<String, String> propsMap;
{
final Properties props = new Properties();
try (InputStream stream = Files.newInputStream(descriptor)) {
props.load(stream);
}
propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));
}
final String name = propsMap.remove("name");
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException(
"property [name] is missing in [" + descriptor + "]");
}
final String description = propsMap.remove("description");
if (description == null) {
throw new IllegalArgumentException(
"property [description] is missing for plugin [" + name + "]");
}
final String version = propsMap.remove("version");
if (version == null) {
throw new IllegalArgumentException(
"property [version] is missing for plugin [" + name + "]");
}
final String esVersionString = propsMap.remove("elasticsearch.version");
if (esVersionString == null) {
throw new IllegalArgumentException(
"property [elasticsearch.version] is missing for plugin [" + name + "]");
}
final Version esVersion = Version.fromString(esVersionString);
final String javaVersionString = propsMap.remove("java.version");
if (javaVersionString == null) {
throw new IllegalArgumentException(
"property [java.version] is missing for plugin [" + name + "]");
}
JarHell.checkVersionFormat(javaVersionString);
final String classname = propsMap.remove("classname");
if (classname == null) {
throw new IllegalArgumentException(
"property [classname] is missing for plugin [" + name + "]");
}
final String extendedString = propsMap.remove("extended.plugins");
final List<String> extendedPlugins;
if (extendedString == null) {
extendedPlugins = Collections.emptyList();
} else {
extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));
}
final String hasNativeControllerValue = propsMap.remove("has.native.controller");
final boolean hasNativeController;
if (hasNativeControllerValue == null) {
hasNativeController = false;
} else {
switch (hasNativeControllerValue) {
case "true":
hasNativeController = true;
break;
case "false":
hasNativeController = false;
break;
default:
final String message = String.format(
Locale.ROOT,
"property [%s] must be [%s], [%s], or unspecified but was [%s]",
"has_native_controller",
"true",
"false",
hasNativeControllerValue);
throw new IllegalArgumentException(message);
}
}
if (esVersion.before(Version.V_6_3_0) && esVersion.onOrAfter(Version.V_6_0_0_beta2)) {
propsMap.remove("requires.keystore");
}
if (propsMap.isEmpty() == false) {
throw new IllegalArgumentException("Unknown properties in plugin descriptor: " + propsMap.keySet());
}
return new PluginInfo(name, description, version, esVersion, javaVersionString,
classname, extendedPlugins, hasNativeController);
}
PluginInfo类有两个全局常量:
public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties";
public static final String ES_PLUGIN_POLICY = "plugin-security.policy";
这是两个配置模板,每个插件和模块都会按照plugin-descriptor.properties中的模板读取响应的配置:name、description、version、elasticsearch.version、java.version、classname、has.native.controller、require.keystore。用这些配置,最终封装成一个PluginInfo对象。最终返回给PluginsService的数据结构如下:Set<Bundle(PluginInfo, path)>
ES的线程池类型:
public enum ThreadPoolType {
DIRECT("direct"),
FIXED("fixed"),
FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"),
SCALING("scaling");
private final String type;
public String getType() {
return type;
}
ThreadPoolType(String type) {
this.type = type;
}
private static final Map<String, ThreadPoolType> TYPE_MAP;
static {
Map<String, ThreadPoolType> typeMap = new HashMap<>();
for (ThreadPoolType threadPoolType : ThreadPoolType.values()) {
typeMap.put(threadPoolType.getType(), threadPoolType);
}
TYPE_MAP = Collections.unmodifiableMap(typeMap);
}
public static ThreadPoolType fromType(String type) {
ThreadPoolType threadPoolType = TYPE_MAP.get(type);
if (threadPoolType == null) {
throw new IllegalArgumentException("no ThreadPoolType for " + type);
}
return threadPoolType;
}
}
如上,四种类型分别为:
这一步当中,ThreadPool()创建了很多线程池,线程池的名称如下:
public static class Names {
public static final String SAME = "same";
public static final String GENERIC = "generic";
public static final String LISTENER = "listener";
public static final String GET = "get";
public static final String ANALYZE = "analyze";
public static final String INDEX = "index";
public static final String WRITE = "write";
public static final String SEARCH = "search";
public static final String SEARCH_THROTTLED = "search_throttled";
public static final String MANAGEMENT = "management";
public static final String FLUSH = "flush";
public static final String REFRESH = "refresh";
public static final String WARMER = "warmer";
public static final String SNAPSHOT = "snapshot";
public static final String FORCE_MERGE = "force_merge";
public static final String FETCH_SHARD_STARTED = "fetch_shard_started";
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
}
参考官方文档可以查看各个线程池的作用,线程池类型,线程数量,等待队列数量等。
在Bootstrap.init中调用该方法。
完成上面的步骤之后,如果是控制台启动服务,可以再控制台看到输出如下:
如果看到日志:
[elasticsearch] [2019-04-09T20:01:12,428][INFO ][o.e.n.Node ] [node-0] starting ...
就说明Node已经开始启动了。
Node 的启动步骤,大概做了这些事情:
服务名 | 简介 |
---|---|
IndicesService | 索引管理 |
IndicesClusterStateService | 跨集群同步 |
SnapshotsService | 负责创建快照 |
SnapshotShardsService | 此服务在数据和主节点上运行,并控制这些节点上当前快照的分片。 它负责启动和停止分片级别快照 |
RoutingService | 侦听集群状态,当它收到ClusterChangedEvent(集群改变事件)将验证集群状态,路由表可能会更新 |
SearchService | 搜索服务 |
ClusterService | 集群管理 |
NodeConnectionsService | 此组件负责在节点添加到群集状态后连接到节点,并在删除它们时断开连接。 此外,它会定期检查所有连接是否仍处于打开状态,并在需要时还原它们。 请注意,如果节点断开/不响应ping,则此组件不负责从群集中删除节点。 这是由NodesFaultDetection完成的。 主故障检测由链接MasterFaultDetection完成。 |
ResourceWatcherService | 通用资源观察器服务 |
GatewayService | 网关服务 |
Discovery | 节点发现? |
TransportService | 节点间数据同步网络服务 |
TaskResultsService | - |
HttpServerTransport | 外部网络服务 |
当看到控制台如下输出则说明该节点启动成功:
[elasticsearch] [2019-04-09T20:04:16,388][INFO ][o.e.n.Node ] [node-0] started
从上面的步骤可以看出Elasticsearch的单节点启动过程还是很复杂的,而且文章只是列出了大概的启动步骤,还有很多细节没有深挖,比如节点和集群的相互发现与加入,节点间的数据同步,集群master是如何选举的等。细节还需各位读者深读源码。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。