在大数据背景下,适用于不同场景下的框架、系统层出不穷,在批量数据计算上hadoop鲜有敌手,而在实时搜索领域es则是独孤求败,那如何能让数据同时结合两者优势呢?本文介绍的es-hadoop将做到这点。关于es-hadoop的使用在ethanbzhang之前的两篇文章《腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇》和《腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇》中已经进行了一些介绍,本文一方面是对其内容的一些补充,另一方面也是对个人实践过程的一个总结。
hadoop是一个优秀的批量数据处理系统,能对数据进行复杂计算,但实时的(全文)搜索却比较困难;在这两点上ES与之几乎相反:很好的支持实时(全文)搜索,但只能用聚合查询进行简单的数据计算,也无法支持大批量的数据。
ES hadoop是一个ES对接hadoop生态的工具,它允许hadoop任务(比如MR、hive、pig、spark等)与ES交互,比如让hadoop以ES作为数据源进行计算、将计算的中间或结果数据存储到ES中等,这意味着它将能够整合Elasticsearch和hadoop各自的优势。
另外,ES-hadoop还提供插件Hadoop HDFS Repository Plugin允许将ES数据备份到hdfs或从其恢复。
这部分将介绍ES-hadoop是如何将ES和hadoop的数据实体进行映射的。
在分布式系统中,扩展计算能力的一个关键因素是:并行,或者说是将一个任务划分成多个更小的任务,使他们同时运行在集群的不同节点上,读取数据的不同部分进行计算。这个概念对应hadoop是splits、对应spark是partition、对应Elasticsearch则是shards。简单来讲,更多的splits、partition或shards意味着能有更多的任务同时读数据源的不同部分并进行计算,提高了计算能力。
既然并行如此重要,那么在hadoop中使用es-hadoop与ES进行数据交互时,它仍然应该能够并行的读写数据的不同部分,否则计算能力将大大降低。
在spark、MR等系统中使用elasticsearch-hadoop从ES读取数据时,shard是一个关键的角色,因为elasticsearch-hadoop将为ES索引中的每个shard创建一个Hadoop InputSplit或Spark Partition。
大家看到这里可能会有疑问:es-hadoop是如何同时读取ES索引中不同shard数据的呢?这里也对其实现做简单的介绍。
通过文章Spark Core读取ES的分区问题分析中的源码分析了解到,当es-hadoop从ES读取索引数据时,它获取索引各个shard的信息,包括:shard id、所在节点id等,并创建对应的Spark partition;除了按shard创建parition的方式外,es-hadoop还允许按shard最大doc数创建partition(配置项:es.input.max.docs.per.partition),这意味着es-hadoop能将shard切分成更小的数据集对应于partition,这对shard容量过大的情况将非常适用。 但说到这里,其实还是没能解答疑问:如何同时获取不同shard的数据呢?通过阅读elasticsearch-hadoop源码我找到了答案:
在文件mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java:389
,createReader
方法创建了PartitionReader,用于将读取ES shards生成partition。在方法中可以看到其构造了ES 查询请求:
public static PartitionReader createReader(Settings settings, PartitionDefinition partition, Log log) {
// ...
SearchRequestBuilder requestBuilder =
new SearchRequestBuilder(clusterInfo.getMajorVersion(), includeVersion)
.resource(read)
// Overwrite the index name from the resource to be that of the concrete index in the partition definition
.indices(partition.getIndex())
.query(QueryUtils.parseQuery(settings))
.scroll(settings.getScrollKeepAlive())
.size(settings.getScrollSize())
.limit(settings.getScrollLimit())
.fields(SettingsUtils.determineSourceFields(settings))
.filters(QueryUtils.parseFilters(settings))
.shard(Integer.toString(partition.getShardId()))
.readMetadata(settings.getReadMetadata())
.local(true)
.preference(settings.getShardPreference())
.excludeSource(settings.getExcludeSource());
可以看到:
和读取类似的,es-hadoop能够将hadoop的splits或spark partition数据对应成shard并行的写入ES。
这里以一个使用spark对es索引数据进行单词计数(wordcount)的使用示例,介绍es-hadoop中spark是如何操作es数据的。示例源码位于:https://github.com/yyff/es-spark-wordcount
SparkConf sparkConf = new SparkConf().setAppName("wordcount")
.setMaster("local[*]").set("es.index.auto.create", "true")
.set("es.nodes", esHost).set("es.port", esPort).set("es.nodes.wan.only", "true")
.set("es.net.http.auth.user", "elastic")
.set("es.net.http.auth.pass", password);
详细配置见:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/mapreduce.html
1、 调用JavaEsSpark.esRDD从索引查询中创建RDD
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, indexName, query).values();
2、 统计词频
JavaPairRDD<String, Integer> counts = esRDD.flatMap(doc -> {
String fieldValue = (String)doc.get(field);
if (fieldValue == null) {
fieldValue = "";
}
return Arrays.asList(fieldValue.split(" ")).iterator();
}).mapToPair(word -> new Tuple2<String, Integer>(word, 1)).reduceByKey((x, y) -> x + y);
3、 结果输出到文件和ES
counts.saveAsTextFile(outDir);
EsSpark.saveToEs(counts.rdd(), "spark-native");
4、 查看结果
1、 使用配置创建spark session
SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
SQLContext sql = new SQLContext(jsc);
2、 调用JavaEsSparkSQL.esDF从es sql查询中生成Dataset
Dataset<Row> ds = JavaEsSparkSQL.esDF(sql, indexName);
ds = ds.select(ds.col(field));
for (String cond : conditions) {
ds = ds.where(cond);
}
3、 统计词频
JavaPairRDD<String, Integer> counts = ds.javaRDD().flatMap(
row -> {
String line = row.getAs(field);
if (line == null) {
line = "";
}
return Arrays.asList(line.split(" ")).iterator();
}).mapToPair(word -> new Tuple2<String, Integer>(word, 1)).reduceByKey((x, y) -> x + y);
4、 结果输出到文件和ES,关闭session
counts.saveAsTextFile(outDir);
EsSpark.saveToEs(counts.rdd(), "spark-sql");
sparkSession.stop();
5、 查看结果
通过es-hadoop,ES可以作为MR、Hive、Spark等的数据源,这意味着什么呢?意味着对于既需要使用Spark等工具进行批量分析和计算、又需要使用ES做实时搜索的数据,比如常见的业务日志,可以只存在于ES中,而无需重复存储于HDFS等存储中,极大的节省了存储成本。
在使用方面,通过ES-hadoop的实现可以看到,ES的shard和hadoop splits、spark partition有着对应关系,因此对要用于hadoop分析的索引设置合理的分片数变得十分重要,因为这将充分利用hadoop的并行计算能力。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。