前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Druid源码阅读(一):Druid Hadoop-based ingestion实现

Druid源码阅读(一):Druid Hadoop-based ingestion实现

原创
作者头像
2011aad
发布2020-04-23 00:09:36
2.3K1
发布2020-04-23 00:09:36
举报
文章被收录于专栏:大数据引擎源码阅读

一、Druid Hadoop-based ingestion简介

Apache Druid是一款开源时序OLAP数据库,支持流数据摄入和批数据摄入两种数据写入方式,其中批数据摄入又包括Native batch和Hadoop-based两种方式。根据官方文档[1],Druid推荐使用Native batch方式摄入数据,因为这种方式更灵活,且没有对外部Hadoop集群的依赖。但Hadoop-based数据摄入也有其优势: 1. 生成Segment的离线计算过程可以使用Hadoop集群的计算资源,减少了druid集群的计算压力,做到计算和存储分离;2. 与大数据处理生态对接更方便,前期的数据预处理可以使用Spark、Flink等计算引擎,将处理结果放在某个HDFS目录下即可。

Hadoop-based数据摄入一般是向Druid Overload节点提交一个Json文件,里面会定义数据源、写入的datasource、数据格式等信息,具体语法可参考Druid官方文档[2]。

本文的目的就是对照Druid源码,解析Druid如何通过MapReduce任务完成索引计算并生成Segment文件存储。本文会聚焦于MapReduce任务的执行,略过Druid数据聚合和生成索引的逻辑。数据聚合和生成索引的逻辑相对比较复杂且独立,打算后续在另外的文章中详细描述。

二、MapReduce任务

Druid的Hadoop数据摄入任务实现在indexing-hadoop子工程中,核心代码是IndexGeneratorJob.java这个文件。接下来就深入这个文件,看看Druid如何将HDFS文件中的数据通过MapReduce任务转化为Segment存储下来。

任务构建

现在Spark、Flink框架比较流行,可能很多人已经不认识原始的MapReduce任务代码了。下面的代码就完成了构建一个MapReduce任务并提交给Hadoop集群。可以看到任务使用IndexGeneratorMapper类作为Mapper、使用IndexGeneratorPartitioner作为Partitioner、使用IndexGeneratorCombiner作为Combiner、使用IndexGeneratorReducer作为Reducer,并设置输入输出路径、任务配置参数,最后job.submit()提交任务。

这里job.getConfiguration().set("io.sort.record.percent","0.23")是配置环形缓冲区中用多大比例来保存数据索引,默认值是0.05。

代码语言:java
复制
job.getConfiguration().set("io.sort.record.percent", "0.23");

job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);
SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class);

if (config.getSchema().getTuningConfig().getUseCombiner()) {
    job.setCombinerClass(IndexGeneratorCombiner.class);
    job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
}

// setReducerClass(job); 
job.setReducerClass(IndexGeneratorReducer.class);

job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(IndexGeneratorOutputFormat.class);
FileOutputFormat.setOutputPath(job, config.makeIntermediatePath());

config.addInputPaths(job);

config.intoConfiguration(job);

JobHelper.setupClasspath(
    JobHelper.distributedClassPath(config.getWorkingPath()),
    JobHelper.distributedClassPath(config.makeIntermediatePath()),
    job
);

job.submit();

下面就具体说明Mapper、Combiner、Reducer的逻辑。

Mapper: IndexGeneratorMapper

IndexGeneratorMapper继承自HadoopDruidIndexerMapper<BytesWritable, BytesWritable>,进入HadoopDruidIndexerMapper,我们看到了熟悉的map函数。

代码语言:java
复制
protected void map(Object key, Object value, Context context) throws IOException, InterruptedException
{
    try {
      final List<InputRow> inputRows = parseInputRow(value, parser);

      for (InputRow inputRow : inputRows) {
        try {
          if (inputRow == null) {
            // Throw away null rows from the parser.
            log.debug("Throwing away row [%s]", value);
            context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
            continue;
          }

          if (!Intervals.ETERNITY.contains(inputRow.getTimestamp())) {
            final String errorMsg = StringUtils.format(
                "Encountered row with timestamp that cannot be represented as a long: [%s]",
                inputRow
            );
            throw new ParseException(errorMsg);
          }

          if (!granularitySpec.bucketIntervals().isPresent()
              || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
                                .isPresent()) {
            innerMap(inputRow, context);
          } else {
            context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
          }
        }
        catch (ParseException pe) {
          handleParseException(pe, context);
        }
      }
    }
    catch (ParseException pe) {
      handleParseException(pe, context);
    }
    catch (RuntimeException e) {
      throw new RE(e, "Failure on row[%s]", value);
    }
}

首先parseInputRow,将文本格式输入中的每一行转换为InputRow,这里的parser实例是HadoopyStringInputRowParser,inputRows的实例是MapBasedInputRow。具体调用的ParserSpec会根据提交Json中的spec.dataSchema.parser来实例化,Druid官方文档[3]中说明的数据格式在图一中都能找到对应ParserSpec实现。

图一 Druid ParserSpecs UML
图一 Druid ParserSpecs UML

然后对每一行数据做一个过滤,过滤掉空行、没有时间戳的行以及不在任务指定时间范围内的行。这里的判断逻辑是基于提交Json中的spec.dataSchema.granularitySpec.intervals字段,若该字段不存在,则任意时间的数据都可以摄入;若指定了该字段,则需要检查当前行的时间戳是否在需要摄入的时间范围内,以决定是否丢弃该行数据。如果通过了这些条件的校验,函数最终会调用到innerMap函数,否则会对丢弃的行计数,用于日志或监控。

下面来看IndexGeneratorMapper.innerMap函数。

代码语言:java
复制
protected void innerMap(
        InputRow inputRow,
        Context context
    ) throws IOException, InterruptedException
{
    // Group by bucket, sort by timestamp
    final Optional<Bucket> bucket = getConfig().getBucket(inputRow);

    if (!bucket.isPresent()) {
        throw new ISE("WTF?! No bucket found for row: %s", inputRow);
    }

    final long truncatedTimestamp = granularitySpec.getQueryGranularity()
                                                   .bucketStart(inputRow.getTimestamp())
                                                   .getMillis();
    
    // type SegmentInputRow serves as a marker that these InputRow instances have already been combined
    // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
    // data
    InputRowSerde.SerializeResult serializeResult = inputRow instanceof SegmentInputRow ?
                                                      InputRowSerde.toBytes(
                                                          typeHelperMap,
                                                          inputRow,
                                                          aggsForSerializingSegmentInputRow
                                                      )
                                                                                          :
                                                      InputRowSerde.toBytes(
                                                          typeHelperMap,
                                                          inputRow,
                                                          aggregators
                                                      );

    final byte[] hashedDimensions = HASH_FUNCTION.hashBytes(
        HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsBytes(
            Rows.toGroupKey(
                truncatedTimestamp,
                inputRow
            )
        )
    ).asBytes();
    
    context.write(
        new SortableBytes(
            bucket.get().toGroupKey(),
            // sort rows by truncated timestamp and hashed dimensions to help reduce spilling on the reducer side
            ByteBuffer.allocate(Long.BYTES + hashedDimensions.length)
                      .putLong(truncatedTimestamp)
                      .put(hashedDimensions)
                      .array()
        ).toBytesWritable(),
        new BytesWritable(serializeResult.getSerializedRow())
    );

    ParseException pe = IncrementalIndex.getCombinedParseException(
        inputRow,
        serializeResult.getParseExceptionMessages(),
        null
    );
    if (pe != null) {
      throw pe;
    } else {
      context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_PROCESSED_COUNTER).increment(1);
    }
}

这里第一步是获取该行所在的Bucket,代码中的Bucket的概念与Segment的概念是对应的,即在运算阶段属于同一个Bucket的数据,最终会被写入同一个Segment文件中。Bucket由shardNum、truncatedTimestamp和partitionNum唯一确定(理论上通过shardNum就可以唯一确定,后面会看到shardNum、truncatedTimestamp和partitionNum之间的关系)。因为在进入该函数前已经按时间区间做了过滤,所以这里获取Bucket理论上一定会成功。然后需要对行的时间戳做截取,比如queryGranularity设置为小时,这里就会将时间戳截取至小时粒度,剩余的精度全部补0。时间戳截取的目的是为了方便rollup,即将属于同一个时间区间的数据预聚合起来,这样虽然丢失了部分原始信息,但可以很大程度减少存储的数据量,并提升查询效率。

下一步将InputRow序列化,作为Map输出的Value部分。这里判断了inputRow instanceof SegmentInputRow,即当前inputRow是原始数据还是其他Segment中已经预聚合好的数据,据此会在序列化时使用不同的aggregator。因为如果是Segment中预聚合好的数据,对应的aggregator需要做一些变化。例如指定某一指标需要Count聚合,对于原始数据就是用Count聚合就好,而对于预聚合好的Segment,就要使用LongSum聚合。

最后一步就是是输出Key-Value对,其中key是固定160字节的BytesWritable(包括一个四字节的SortableBytes长度和156字节的SortableBytes),Value就是InputRow序列化后的BytesWritable。SortableBytes的格式如下图二所示,其中groupKey标识了归属的Bucket,其作用是在Reduce阶段将属于同一个Bucket的数据放入同一次reduce函数的调用中,从而保存在同一个Segment文件里。sortKey中hashedDimensions是根据当前行截取后的时间戳以及所有维度的取值计算出的哈希值,sortKey的作用是将所有维度值相同的行排序时排在一起,可以减少Combine阶段和Reduce阶段的spill。

就这个Map输出的Key来讲,我认为还是有优化的空间:1. 图中各个字段均是定长的,最前面4个字节的groupKeySize可以省去;2. sortKey中不需要再写入truncatedTimestamp,时间戳在hashedDimensions中已有体现。

图二 Map Output SortableBytes
图二 Map Output SortableBytes

Combiner: IndexGeneratorCombiner

MapReduce中Combiner的作用是预聚合单节点Key相同的数据,减少Shuffle过程的数据传输量。由于任务设置了setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class),在Combine过程中会使用BytesWritable.Comparator作为比较运算符,对Map输出的Key-Value对进行分组预聚合。进入BytesWritable.Comparator,可以看到这个Comparator比较跳过了前4个字符,比较了Map阶段输出的SortableBytes。因此Combine阶段只有所有维度取值都相同的行才会被聚合在一起,输入到reduce函数中。

代码语言:java
复制
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class)

public static class Comparator extends WritableComparator {
    public Comparator() { super(BytesWritable.class);}

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return compareBytes(b1, s1 + 4, l1 - 4, b2, s2 + 4, l2 - 4);
    }
}

Combine阶段预聚合逻辑如下。首先,如果只有一行数据,则直接将该行数据输出;如果有多条数据,则需要将其预聚合起来,这也是Druid rollup的核心过程,将所有维度值(包括TruncatedTimestamp)相同的数据压缩成一行,减少数据存储量。

数据的聚合过程就是创建一个index,并不断把数据行加入index中,如果index满了(index.canAppendRow()里检查了Segment行数的配置和Segment大小的配置),就flush当前index,并打开一个新的index。具体index是如何计算的本文不去细究,后续会更新另外的文章解析索引构建的方法。

代码语言:java
复制
@Override
protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, final Context context)
    throws IOException, InterruptedException
{
    Iterator<BytesWritable> iter = values.iterator();
    BytesWritable first = iter.next();

    if (iter.hasNext()) {
        LinkedHashSet<String> dimOrder = new LinkedHashSet<>();
        SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
        Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
        IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null);
        index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators));

        while (iter.hasNext()) {
            context.progress();
            InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators);

            if (!index.canAppendRow()) {
                dimOrder.addAll(index.getDimensionOrder());
                log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
                flushIndexToContextAndClose(key, index, context);
                index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities());
            }
            index.add(value);
        }

        flushIndexToContextAndClose(key, index, context);
    } else {
        context.write(key, first);
    }
}

Partitioner: IndexGeneratorPartitioner

Map或Combine阶段输出的Key-Value对会使用指定的Partitioner进行分区,之后Reducer会从每个Map或Combine的结果中读取属于自己的分区数据,完成Shuffle的过程。

代码语言:java
复制
int numReducers = Iterables.size(config.getAllBuckets().get());
if (numReducers == 0) {
    throw new RuntimeException("No buckets?? seems there is no data to index.");
}
job.setNumReduceTasks(numReducers);
//======================================================================================
@Override
public int getPartition(BytesWritable bytesWritable, Writable value, int numPartitions)
{
    final ByteBuffer bytes = ByteBuffer.wrap(bytesWritable.getBytes());
    bytes.position(4); // Skip length added by SortableBytes
    int shardNum = bytes.getInt();
    if ("local".equals(JobHelper.getJobTrackerAddress(config))) {
        return shardNum % numPartitions;
    } else {
        if (shardNum >= numPartitions) {
            throw new ISE("Not enough partitions, shard[%,d] >= numPartitions[%,d]", shardNum, numPartitions);
        }
        return shardNum;
    }
}

Reduce Task的个数取决于两部分配置。1. granularitySpec中时间段的划分:假设granularitySpec.intervals设置了一个一天的时间区间,granularitySpec.segmentGranularity.period配置了"PT1H",即以小时为粒度划分Segment,这样就会划分出24个Segment,相应的就有24个Reduce Task;2. tuningConfig.partitionsSpec中numShards配置:假设这个numShards配置的值为2,那么上面的每个时间区间又会分为2个分片,这样就有24*2=48个Reduce Task。

代码语言:json
复制
"granularitySpec": {
    "type": "uniform",
    "segmentGranularity": {
         "type": "period",
         "period": "PT1H",
         "origin": null
    },
    "rollup": true,
    "intervals": [
        "2020-04-15T16:00:00.000Z/2020-04-16T16:00:00.000Z"
    ]
}

"tuningConfig": {
    "type": "hadoop",
    "partitionsSpec": {
        "type": "hashed",
        "numShards": 2
    }
}

例如对于如上所示配置,在任务执行时会产生48个Reduce Task,其对应的partitionNum和shardNum如下图三所示。可以看到生成任务的shardNum的取值为0-47,而getPartition函数中传入的numPartitions为48(Reduce Task个数),因此可以直接用shardNum作为getPartition返回的结果。这里可以看到,shardNum实际上唯一确定了Bucket,因此相同Bucket中的数据会进入同一个Reduce Task中,最终会存储在同一个Segment中。

图三 partitionNum和shardNum示例
图三 partitionNum和shardNum示例

Reducer: IndexGeneratorReducer

Reducer最后会将属于自己的分区数据收集到一起,调用reduce函数进一步将预聚合过的数据合并为Segment文件,这里合并的逻辑其实和Combine阶段非常类似。注意SortableBytes.useSortableBytesAsMapOutputKey(job, IndexGeneratorPartitioner.class)中的job.setGroupingComparatorClass(SortableBytesGroupingComparator.class),在Reduce阶段,会根据SortableBytesGroupingComparator将数据分组,调用reduce函数,SortableBytesGroupingComparator只比较了图二中的groupKey部分,即同一个Bucket的数据会被放在一起计算。因此Reduce阶段的reduce函数对于每个Task只会执行一次,生成一个Segment(包括一个descriptor.json文件和一个index.zip文件),并写入指定的HDFS路径下。

代码语言:java
复制
public static class SortableBytesGroupingComparator extends WritableComparator
{
    protected SortableBytesGroupingComparator() { super(BytesWritable.class); }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)
    {
        int b1Length = ByteBuffer.wrap(b1, s1 + 4, l1 - 4).getInt();
        int b2Length = ByteBuffer.wrap(b2, s2 + 4, l2 - 4).getInt();

        final int retVal = compareBytes(b1, s1 + 8, b1Length, b2, s2 + 8, b2Length);
        return retVal;
    }
}

三、执行大图

图四是上述的MapReduce过程中数据转化的示例:假设我们的输入有2个HDFS文件,对应2个Map Task,时间区间只划分为两部分,对应的truncatedTimestamp分别为T1和T2,numShards为2,这样会生成4个Segment。dx和dy表示2个维度,dxi和dyi表示具体维度的不同取值,m表示一个指标,mi表示指标的具体取值,m的聚合方式用agg表示。Map阶段会将HDFS文件读取为行数据,Combine阶段会对同一个Map任务的输出将时间和维度值都相同的行预聚合好。Combine后的行数据会根据行所对应的的分区分发到响应的Reduce任务,并进一步聚合生成Segment文件。

图四 Druid Hadoop-based数据摄入MapReduce执行示例
图四 Druid Hadoop-based数据摄入MapReduce执行示例

四、总结

通过学习Druid Hadoop-based数据摄入的流程,把“古老”的MapReduce过程又学习了一遍,学习的过程中参考了[4]中的一张MapReduce工作原理图,推荐想要学习MapReduce的同学都去看下;对任务Spec中各个配置字段的含义也有了更深入的了解,这里也给出一些参数设置建议:

  1. 建议使用targetRowsPerSegment而不是numShards。在partitionsSpec中targetRowsPerSegment和numShards配置是互斥的,使用targetRowsPerSegment配置可以更合理的控制每个Segment的大小,既不会出现超大Segment,也不会出现很多小Segment,利于historical节点进行加载和缓存。
  2. 如果使用numShards,建议同时配置partitionDimensions。partitionDimensions可以配置如账号ID之类的信息,这样同一个账号的数据会保存在同一个Segment中,查询时可以减少读取的Segment数目,提升查询性能。
  3. tuningConfig.useCombiner置为true。这个值默认是false,一般来讲,对于druid数据摄入的场景,预聚合可以很大程度上减少Shuffle过程中的数据传输量,减少作业运行时间。

参考文献

[1] https://druid.apache.org/docs/0.17.1/ingestion/index.html#batch

[2] Hadoop-based ingestion,https://druid.apache.org/docs/latest/ingestion/hadoop.html

[3] Data formats,https://druid.apache.org/docs/latest/ingestion/data-formats.html

[4] mapreduce二次排序详解,https://www.lagou.com/lgeduarticle/8090.html

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Druid Hadoop-based ingestion简介
  • 二、MapReduce任务
    • 任务构建
      • Mapper: IndexGeneratorMapper
        • Combiner: IndexGeneratorCombiner
          • Partitioner: IndexGeneratorPartitioner
            • Reducer: IndexGeneratorReducer
            • 三、执行大图
            • 四、总结
              • 参考文献
              相关产品与服务
              时序数据库 CTSDB
              腾讯云时序数据库(TencentDB for CTSDB)是一种高效、安全、易用的云上时序数据存储服务。特别适用于物联网、大数据和互联网监控等拥有海量时序数据的场景。您可以根据实际业务需求快速创建CTSDB 实例,并随着业务变化实时线性扩展实例。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档