腾讯云EMR&Elasticsearch中使用ES-Hadoop之MR&Hive篇
腾讯云EMR&Elasticsearch中使用ES-Hadoop之Spark篇
ES-Hadoop 是 Elastic 官方推出的一个用于对接 Hadoop 生态的工具,使得用户可以使用 Mapreduce(MR)、Spark、Hive 等工具处理 ES 上的数据。众所周知,Hadoop 生态的长处是处理大规模数据集,但是其缺点也很明显,就是当用于交互式分析时,查询时延会比较长。而 ES 在这几个方面的能力很强,对于如 ad-hoc 查询,基本可以做到秒级。ES-Hadoop 的推出提供了一种组合两者优势的可能性。使用 ES-Hadoop,用户只需要对自己代码做出很小的改动,即可以快速处理存储在 ES 中的数据,并且能够享受到 ES 带来的加速效果。
利用ES-Hadoop 组件,可以将 ES 作为 MR/Spark/Hive 等大数据处理引擎的“数据源”,在大数据计算存储分离的架构中扮演存储的角色。这和 MySQL/PG/MongoDB 等其他数据源并无差异。但相对于其他数据源, ES 具有更灵活的全文检索能力,更快的数据选择过滤能力以及快速的UI展示报表的能力。这些能力正是分析引擎最为关键的能力之一。
下面我们将通过特定案例,介绍如何在腾讯云 EMR 和 腾讯云 Elasticsearch 中使用 ES-Hadoop。
购买腾讯云EMR,并勾选hive,spark等组件,以备使用。购买腾讯云Elasticsearch。
这里以Elasticsearch官方标准的workbench测试数据http_logs为例,包含的字段如下:
clientip "10.123.123.43"
request: "GET /images/102328s.gif HTTP/1.1"
status: 2xx
size: 80
测试数据写入方法:
pip install esrally
esrally --target-hosts=10.0.4.17:9200 --distribution-version=5.6.4 --track=http_logs --pipeline=benchmark-only --challenge=append-no-conflicts-index-only
#10.0.4.17:9200 为es的vpc内网ip
'es.nodes' = '10.0.0.17',
'es.port' = '9200',
'es.nodes.wan.only' = 'true',
'es.index.auto.create' = 'false',
'es.resource' = 'logs-201998/type',
'es.read.metadata' = 'true',
'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
'es.index.read.missing.as.empty' = 'true',
'es.input.use.sliced.partitions' = 'false',
'es.input.max.docs.per.partition' = '100000000'
配置ES集群的节点ip,腾讯云ES给用户提供了带负载均衡功能的vpc内网vip,这里结合es.nodes.wan.only这个配置项同时使用。
配置ES集群的端口号。
设置为true,开启ES集群在云上使用vip进行连接,不进行节点嗅探。
如通过Hadoop组件向ES集群写入数据,是否自动创建不存在的index。
指定要读写的index和type
表字段与ES的索引字段的字段名映射
如操作的ES字段涉及到_id之类的内部字段,需要将这个配置开启。
配置单个partition的最大文档数。在执行hive sql的过程中,需要限制mapper的数目,否则ES会面临多个scroll切片的查询,造成CPU打满,影响集群的性能和稳定性。这里需要根据ES索引中数据的总数来灵活的设置。
更多的ES-Hadoop配置项请参考官方文档https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html
su - hadoop
wget http://download.elastic.co/hadoop/elasticsearch-hadoop-5.6.4.zip
hive:-> add jar file:///home/hadoop/elasticsearch-hadoop-5.6.4/dist/elasticsearch-hadoop-hive-5.6.4.jar;
create database if not exists tmp;
drop table tmp.tmp_es;
create external table tmp.tmp_es (uid varchar(255), clientip varchar(255), request varchar(1024), status int)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = '10.0.4.17',
'es.port'='9200',
'es.index.auto.create' = 'false',
'es.resource' ='logs-201998/type',
'es.read.metadata' = 'true',
'es.mapping.names' = 'uid:_metadata._id,clientip:clientip,request:request,status,status',
'es.nodes.wan.only'='true',
'es.index.read.missing.as.empty'='true',
'es.input.use.sliced.partitions'='false',
'es.input.max.docs.per.partition'='100000000'
);
这里以官方测试数据http_logs为例,使用logs-201998这个index,映射了4个字段_id, clientip, request, status
。因为索引文档总量为100w+,设置单partition最大文档数为100000000, 期望mapper数保持在5个以内。
select * from tmp.tmp_es limit 10;
# 写入外部表
insert into tmp.tmp_es values ('sfasfsdf', '10.0.0.11', 'sdfsfa', 200);
# 将hive内部表中的数据导入至ES外部表
drop table tmp.tmp_hive;
create table tmp.tmp_hive (uid varchar(255), clientip varchar(255), request varchar(1024), status int);
insert into tmp.tmp_hive select * from tmp.tmp_es;
如一些较复杂的分析任务,不适合使用hive sql完成。下面这个例子演示了如何通过MR任务,读取HDFS上的JSON文件,并写入ES集群。
我们新增一条http log,clientip设置为特殊的255.255.255.255。写入到http_log.json,并上传至HDFS的/es-hadoop
目录。
{"@timestamp": 895435190,"clientip": "255.255.255.255","request": "GET /images/102328s.gif HTTP/1.1","status": 200,"size": 802}
编译打包如下代码为esmr-1.0-SNAPSHOT.jar,编写Mapper,读取HDFS上的json文件,并在map阶段通过EsOutputFormat
写入。
es.input.json
为true,将源文件按json来解析。完整工程代码可以在这里下载
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WriteToEsWithMR extends Configured implements Tool {
public static class EsMapper extends Mapper<Object, Text, NullWritable, Text> {
private Text doc = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
if (value.getLength() > 0) {
doc.set(value);
System.out.println(value);
context.write(NullWritable.get(), doc);
}
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);
conf.set("es.nodes", "10.0.4.17:9200");
conf.set("es.nodes.wan.only", "true");
conf.set("es.resource", "logs-201998/type");
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setJarByClass(WriteToEsWithMR.class);
job.setMapperClass(EsMapper.class);
FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new WriteToEsWithMR(), args);
System.exit(ret);
}
}
hadoop jar esmr-1.0-SNAPSHOT.jar /es-hadoop
通过API或kibana查询clientip为255.255.255.255的记录
GET logs-201998/_search
{
"query": {
"match": {
"clientip":"255.255.255.255"
}
}
}
本文以腾讯云上的EMR和Elasticsearch为例,介绍了如何通过ES强大的ES-Hadoop组件,在hive和MR上进行数据的查询和写入。可以方便用户将Elasticsearch与Hadoop生态组件结合起来,提供更灵活的分析能力。下一篇将为大家介绍ES-Hadoop之Spark篇的内容,将为大家进一步介绍在spark中如果读取和写入ES数据,敬请期待。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。