Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >ES-Spark连接ES后,ES Client节点流量打满分析

ES-Spark连接ES后,ES Client节点流量打满分析

作者头像
YG
发布于 2018-05-23 09:17:54
发布于 2018-05-23 09:17:54
3.3K30
代码可运行
举报
文章被收录于专栏:YG小书屋YG小书屋
运行总次数:0
代码可运行

问题描述

前段时间用es-spark读取es数遇到了client节点流量打满的现象。es-spark配置的es.nodes是es的域名。由于其中一个client是master节点,然后普通查询变得特别慢,运行20多分钟后,主节点崩溃。

解决方法

临时解决方案:降低es-spark的并发,并重启主节点。

最终解决方案:设置es.nodes.wan.only为false,即不用域名访问。将es.nodes配置为client节点的IP。

原因分析

域名访问时必须配置参数es.nodes.wan.only为true,关于该参数的解释如下:

Whether the connector is used against an Elasticsearch instance in a cloud/restricted environment over the WAN, such as Amazon Web Services. In this mode, the connector disables discovery and onlyconnects through the declared es.nodes during all operations, including reads and writes. Note that in this mode, performance is highly affected.

es.nodes.wan.only设置为true时即只通过client节点进行读取操作,因此主节点负载会特别高,性能很差。长时间运行后,java gc回收一次要几十秒,慢慢的OOM,系统崩溃。

配置es.nodes为client节点的IP后,spark只通过data节点访问ES:

es.nodes.data.only (default true) Whether to use Elasticsearch data nodes only. When enabled, elasticsearch-hadoop will route all its requests (after nodes discovery, if enabled) through the data nodes within the cluster. The purpose of this configuration setting is to avoid overwhelming non-data nodes as these tend to be "smaller" nodes. This is enabled by default.

es.nodes.data.only 默认为true,即spark所有的请求都会发到数据节点,不在通过client节点进行请求的转发,client节点只用来服务普通的查询。

源码角度分析

1、es-spark 读

其架构图如下所示:

es_spark_read.png

我们知道spark能动态的发现节点,,但当我们配置wan.only为true的时候,整个集群的节点IP中只有从域名中解析出来的IP:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private static List<String> qualifyNodes(String nodes, int defaultPort, boolean resolveHostNames)
 {
   List<String> list = StringUtils.tokenize(nodes);
   for (int i = 0; i < list.size(); i++)
   {
     String nodeIp = resolveHostNames ? resolveHostToIpIfNecessary((String)list.get(i)) : (String)list.get(i);
     list.set(i, qualifyNode(nodeIp, defaultPort));
   }
   return list;
 }

从源码角度以scroll为例:

JavaEsSpark.esJsonRDD()-->JavaEsRDD.compute()-->JavaEsRDDIterator(继承AbstractEsRDDIterator).reader$lzycompute() 在lzycompute方法中我们可以看到,执行请求的是RestService:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private ScrollQuery reader$lzycompute()
 {
   synchronized (this)
   {
     if (!this.bitmap$0)
     {
       initialized_$eq(true);
       Settings settings = this.partition.settings();

       initReader(settings, log());

       RestService.PartitionReader readr = RestService.createReader(settings, this.partition, log());this.reader =
         readr.scrollQuery();this.bitmap$0 = true;
     }
     return this.reader;
   }
 }

在createReader方法中会判断spark节点和当前请求请求的shard是否是同一个节点,如果是同一个节点,则将该IP写入Setting,用本地节点IP进行请求(执行请求的时候,从setting中读取该ip):

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if ((!SettingsUtils.hasPinnedNode(settings)) && (partition.getLocations().length > 0))
{
  String pinAddress = checkLocality(partition.getLocations(), log);
  if (pinAddress != null)
  {
    if (log.isDebugEnabled()) {
      log.debug(String.format("Partition reader instance [%s] assigned to [%s]:[%s]", new Object[] { partition, pinAddress }));
    }
    SettingsUtils.pinNode(settings, pinAddress);
  }
}

通过PartitionReader.scrollQuery()-->SearchRequestBuilder.build()-->RestRepository.scanLimit()-->ScrollQuery.hasNext()-->RestRepository.scroll()-->RestClient.execute()-->NetWorkClient.execute()-->Transport.execute()

其实我们看到的最终要的执行是在NetWorkClient中,他会打乱所有的数据节点,并从中选出一个节点用来通信,如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public NetworkClient(Settings settings, TransportFactory transportFactory)
{
  this.settings = settings.copy();
  this.nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
  this.transportFactory = transportFactory;

  Collections.shuffle(this.nodes);//打乱排序
  if (SettingsUtils.hasPinnedNode(settings))
  {
    String pinnedNode = SettingsUtils.getPinnedNode(settings);
    if (log.isDebugEnabled()) {
      log.debug("Opening (pinned) network client to " + pinnedNode);
    }
    this.nodes.remove(pinnedNode);
    this.nodes.add(0, pinnedNode);
  }
  selectNextNode();

  Assert.notNull(this.currentTransport, "no node information provided");
}


private boolean selectNextNode()
{
  if (this.nextClient >= this.nodes.size()) {
    return false;
  }
  if (this.currentTransport != null) {
    this.stats.nodeRetries += 1;
  }
  closeTransport();
  this.currentNode = ((String)this.nodes.get(this.nextClient++));
  SettingsUtils.pinNode(this.settings, this.currentNode);
  this.currentTransport = this.transportFactory.create(this.settings, this.currentNode);
  return true;
}
2、es-spark 写

其架构图如下所示:

es_spark_write.png

从源码角度来看: 写请求的时候,如果wan.only配置为true,则节点IP就是从域名解析出的IP中随机选择一个进行写操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (settings.getNodesWANOnly()) {
  return randomNodeWrite(settings, currentInstance, resource, log);
}

以bulk为例,其操作过程如下:

EsSpark.doSaveToEs()-->EsRDDWriter.write()-->RestService.createWriter()

在createWriter中首先随机或者按照split选择一个节点:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
int selectedNode = currentSplit < 0 ? new Random().nextInt(nodes.size()) : currentSplit % nodes.size();
SettingsUtils.pinNode(settings, (String)nodes.get(selectedNode));

最终的改变是在RestService的initSingleIndex方法中,通过根据当前的split,找到对应的shard,然后获取到shard所在的IP,写入setting中(执行请求的时候,从setting中读取该ip)。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (currentInstance <= 0) {
   currentInstance = new Random().nextInt(targetShards.size()) + 1;
 }
 int bucket = currentInstance % targetShards.size();
 ShardInfo chosenShard = (ShardInfo)orderedShards.get(bucket);
 NodeInfo targetNode = (NodeInfo)targetShards.get(chosenShard);

 SettingsUtils.pinNode(settings, targetNode.getPublishAddress());
 String node = SettingsUtils.getPinnedNode(settings);
 repository = new RestRepository(settings);

接下来就是RestRepository.tryFlush()-->RestClient.bulk()-->NetWorkClient.execute()-->Transport.execute(),这一套流程和读差不多,这里就不再介绍。

3、shard-partition 对应关系

es-spark写的话就是就是一个partition对应一个shard,这里从上述的es-spark写代码中可以看出,不再过多介绍。

es-spark读的时候是按照shard的文档数来分的:

partition=numberOfDoc(shard)/100000

100000是默认的配置,这个可通过es.input.max.docs.per.partition配置。

假设一个shard有23w条doc,10w条一个partition,则分为3个partition。读操作时shard-partition 的架构图如下所示:

partition_shard.png

从源码角度来说,如果是5.X版本,则用scrollSlice提高并发度。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
if (version.onOrAfter(EsMajorVersion.V_5_X)) {
  partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards);
} else {
  partitions = findShardPartitions(settings, mapping, nodesMap, shards);
}

在findSlicePartitions中给出了计算公式:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
for (List<Map<String, Object>> group : shards)
{
  String index = null;
  int shardId = -1;
  List<String> locationList = new ArrayList();
  for (Map<String, Object> replica : group)
  {
    ShardInfo shard = new ShardInfo(replica);
    index = shard.getIndex();
    shardId = shard.getName().intValue();
    if (nodes.containsKey(shard.getNode())) {
      locationList.add(((NodeInfo)nodes.get(shard.getNode())).getPublishAddress());
    }
  }
  String[] locations = (String[])locationList.toArray(new String[0]);
  StringBuilder indexAndType = new StringBuilder(index);
  if (StringUtils.hasLength(types))
  {
    indexAndType.append("/");
    indexAndType.append(types);
  }
  long numDocs = client.count(indexAndType.toString(), Integer.toString(shardId), query);
  int numPartitions = (int)Math.max(1L, numDocs / maxDocsPerPartition);
  for (int i = 0; i < numPartitions; i++)
  {
    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
    partitions.add(new PartitionDefinition(settings, mapping, index, shardId, slice, locations));
  }
}

public int getMaxDocsPerPartition()
{
  return Integer.parseInt(getProperty("es.input.max.docs.per.partition", Integer.toString(100000)));
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.02.08 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
3 条评论
热度
最新
分析地好详细啊
分析地好详细啊
回复回复点赞举报
若栖回复YG
流量打满是什么?
流量打满是什么?
回复回复点赞举报
姐姐回复YG
谢谢分享
谢谢分享
回复回复点赞举报
推荐阅读
编辑精选文章
换一批
自定义Spark Partitioner提升es-hadoop Bulk效率
之前写过一篇文章,如何提高ElasticSearch 索引速度。除了对ES本身的优化以外,我现在大体思路是尽量将逻辑外移到Spark上,Spark的分布式计算能力强,cpu密集型的很适合。这篇文章涉及的调整也是对SparkES 多维分析引擎设计 中提及的一个重要概念“shard to partition ,partition to shard ” 的实现。不过目前只涉及到构建索引那块。
用户2936994
2018/08/27
9070
Spark Core读取ES的分区问题分析
ES也是比较火热,在日志数据分析,规则分析等确实很方便,说实话用es stack 浪尖觉得可以解决很多公司的数据分析需求。极客时间下周一要上线新的ES课程,有需要的暂时别购买,到时候还找浪尖返现吧。
Spark学习技巧
2019/06/20
1.6K0
ES-Hadoop 实践
在大数据背景下,适用于不同场景下的框架、系统层出不穷,在批量数据计算上hadoop鲜有敌手,而在实时搜索领域es则是独孤求败,那如何能让数据同时结合两者优势呢?本文介绍的es-hadoop将做到这点。关于es-hadoop的使用在ethanbzhang之前的两篇文章《腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇》和《腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇》中已经进行了一些介绍,本文一方面是对其内容的一些补充,另一方面也是对个人实践过程的一个总结。
franyang
2019/12/03
3.5K0
ES-Hadoop 实践
Hadoop/Spark读写ES之性能调优
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2020/04/09
5.6K0
Hadoop/Spark读写ES之性能调优
Elasticsearch源码分析-写入解析
Elasticsearch(ES)是一个基于Lucene的近实时分布式存储及搜索分析系统,其应用场景广泛,可应用于日志分析、全文检索、结构化数据分析等多种场景,既可作为NoSQL数据库,也可作为搜索引擎。由于ES具备如此强悍的能力,因此吸引了很多公司争相使用,如维基百科、GitHub、Stack Overflow等。
技术姐
2018/11/07
6K0
Spark读写ES最佳实践
更换代码中公网ip为内网ip,选择maven assembly plugin进行打包,上传带依赖的jar包到EMR上,运行"ReadES"
沈小翊
2023/11/14
8160
Hive如何创建elasticsearch外部表
Elasticsearch 是一个开源的分布式搜索和分析引擎,建立在 Apache Lucene 基础上。它提供了一个可扩展的、实时的搜索和分析平台,用于处理和分析大规模的结构化和非结构化数据。 在类实时读写与全文检索上有极大的优势。
空洞的盒子
2023/11/27
1.6K1
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2018/12/29
8.8K0
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
ethanzhang
2018/12/10
5.4K1
腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
Influxdb Cluster下的数据写入
3.2 调用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在创建,创建过程涉及到将CreateShardGroup的请求发送给MetadataServer并等待本地更新到新的MetaData数据;
扫帚的影子
2018/12/12
1.2K0
Elasticsearch快照(snapshot)备份原理及分析
Snapshot是Elasticsearch提供的一种将集群数据备份至远程存储库的功能。例如将数据备份至S3,HDFS,共享文件系统等。
空洞的盒子
2024/08/02
2K3
ES系列(五):获取单条数据get处理过程实现
前面讲的都是些比较大的东西,即框架层面的东西。今天咱们来个轻松点的,只讲一个点:如题,get单条记录的es查询实现。
烂猪皮
2021/06/10
1.3K0
ES系列(五):获取单条数据get处理过程实现
ES5.6 Bulk源码解析
Bulk注册 在启动类BootStrap的start()方法中,启动了node.start()方法。在Node初始化的过程中,加载了一系列的模块和插件,其中包含ActionModel。 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSetting
YG
2018/05/23
9901
ES系列(七):多节点任务的高效分发与收集实现
我们知道,当我们对es发起search请求或其他操作时,往往都是随机选择一个coordinator发起请求。而这请求,可能是该节点能处理,也可能是该节点不能处理的,也可能是需要多节点共同处理的,可以说是情况比较复杂。
烂猪皮
2021/07/16
8960
聊聊elasticsearch的TransportProxyClient
本文主要研究一下elasticsearch的TransportProxyClient
code4it
2019/04/19
7420
聊聊elasticsearch的TransportProxyClient
ClickHouse 最近跟Es杠上了,日志场景谁更适合
点击上方蓝色字体,选择“设为星标” 回复”学习资料“获取学习宝典 文章来源:https://c1n.cn/yoNYE 目录 背景 Elasticsearch vs ClickHouse 成本分析 环境部署 总结 背景 saas 服务未来会面临数据安全、合规等问题。公司的业务需要沉淀一套私有化部署能力,帮助业务提升行业竞争力。 为了完善平台系统能力、我们需要沉淀一套数据体系帮助运营分析活动效果、提升运营能力。 然而在实际的开发过程中,如果直接部署一套大数据体系,对于使用者来说将是一笔比较大的服务器开销。
猿天地
2022/08/26
2.7K0
ClickHouse 最近跟Es杠上了,日志场景谁更适合
深入剖析Redis客户端Jedis的特性和原理
Redis作为目前通用的缓存选型,因其高性能而倍受欢迎。Redis的2.x版本仅支持单机模式,从3.0版本开始引入集群模式。
2020labs小助手
2021/11/02
1.9K0
《Elasticsearch 源码解析与优化实战》第12章:allocation模型分析
本文主要分析allocation 模块的结构和原理,然后以集群启动过程为例分析 allocation 模块的工作过程
HLee
2021/05/27
1.1K1
《Elasticsearch 源码解析与优化实战》第12章:allocation模型分析
浪尖说spark的coalesce的利弊及原理
浪尖的粉丝应该很久没见浪尖发过spark源码解读的文章,今天浪尖在这里给大家分享一篇文章,帮助大家进一步理解rdd如何在spark中被计算的,同时解释一下coalesce降低分区的原理及使用问题。
Spark学习技巧
2020/04/07
4.1K0
Spark源代码分析——谈RDD和依赖关系
为了更好地理解,这里我们使用HDFS上常见的HDFS实现:Hadoop RDD的实现。
jack.yang
2025/04/05
910
相关推荐
自定义Spark Partitioner提升es-hadoop Bulk效率
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验