Druid
Druid是一个分布式支持实时分析的数据存储系统,为分析而生,在处理数据的规模和数据处理实时性方面比传统OLAP系统有显著的性能改进。与阿里的druid无关。
Druid的三个设计原则
- 快速查询:数据预聚合+内存化+索引
仅存储经过预聚合的数据,如1分钟,1小时等,极大的提高了性能;使用Bitmap和各种压缩技术,并维护一些倒排索引,可以提高内存使用效率和AND,OR操作。
- 水平扩展:分布式数据+并行化查询
一般按照时间范围把聚合数据进行分区处理,对于高维度数据还支持对Segment( < 2000万行)进行分区;历史Segment数据可以存储在本地磁盘,HDFS或云服务中;如果节点故障可借助ZK重新构造数据;Druid内置了容易并行化的集合操作,在直方图方面和去重查询方面采用近似算法保证性能,如HyperLoglog,DataSketches等
- 实时分析:不可变的过去,仅追加的未来
提供基于时间维度的数据存储服务,且每行数据一旦进入系统就不能改变;历史数据以Segment数据文件方式组织,需要查询时再装载到内存
技术特点
- 数据吞吐量大
- 支持流式数据摄入和实时
- 查询灵活且快
- 社区支持力度大
数据格式
- 数据源(类似数据库中表的概念,存放一类数据)
- 时间列:每个数据源都需要有的事件时间,是预聚合的主要依据
- 维度列:用于标识事件和属性,用于聚合
- 指标列:用于聚合计算的列,通常是关键量化指标
- 数据摄入
- 数据查询
- 原生Json查询,Http接口
- 类SQL查询,支持大部分SQL语法(本书出版时还未支持)
数据分析软件分类
- 商业软件
- HP Vertica
- Oracle Exadata
- Teradata
- 时序数据库
- 开源分布式计算平台
- 开源分析数据库
- Pinot
- Kylin
- Google Dremel
- Apache Drill
- Elasticsearch(ES)
- SQL on Hadoop/Spark
- 数据分析云服务
Druid架构
概览
- Druid自身包含的节点
- 实时节点:摄入实时数据,生成Segment数据文件
- 历史节点:加载生成好的数据文件,供查询
- 查询节点:对外提供查询服务,并支持同时查询实时和历史节点,并合并结果
- 协调节点:负责历史节点的数据负载均衡,并管理数据生命周期
- Druid依赖的外部组件
- 元数据库:存储元数据信息,如Segment的相关信息。一般是Mysql
- 分布式协调服务:提供分布式一致性的组件,一般是Zookeeper
- 数据文件存储库:提供数据文件的存储功能,一般是本地磁盘或HDFS等
架构设计思想
索引
提高数据库查找速度的关键之一是减少磁盘的访问次数,并采用树形结构做索引
- 二叉查找树和平衡二叉树
二叉查找树在极端非平衡情况下查询效率会退化到O(N),因此尝试采用平衡二叉树;但是平衡二叉树的树高为:
- 树高越高,查询次数越多越慢。同时,每次访问磁盘会读取多个扇区的数据,远大于单个树节点的值,造成浪费
- B+树
传统关系型数据库的常用结构。
- 每个树节点只放键值,不放数值,叶子节点存放数值,使得树高度较低
- 叶子节点按值大小顺序排序,带指向相邻节点的指针,方便区间数据查询
- 从叶子节点开始更新,以较小的代价实现自平衡
- 缺点是随着数据插入,叶子节点会分裂,导致连续数据被存放在不同的物理磁盘块上,导致较大的IO开销
- 日志结构合并树(LSM)
日志结构的所有方式的将磁盘看做一个大的日志,每次都将新数据和索引结构添加到最末端;LSM通过将数据文件预排序解决了日志结构随机读性能差的问题。
- 使用两颗树来存储数据,其中一部分数据结构存在内存负责插入更新和读请求,并在内存中进行排序;另一部分写在磁盘,负责读操作,有序且不能更改
- 使用日志文件做数据恢复保障,所有操作记录先写Log,再写memtable,最后冲写到sstable
- 定期合并小sstable以减少sstable数量,对每个sstable使用布隆过滤器,以加速数据存在与否的判定
Druid对命令查询职责分离模式(CQRS)的借鉴,优势如下:
- 类LSM-tree使得数据高速写入,并提供快速实时查询
- 不提供已有数据更改,虽然降低数据完整性保障,但是减少了工作量,提高性能
- 对CQRS模式的借鉴使得组件职责分明,易于优化
数据结构
- DataSource(类似于表)
- 时间列:表明每行数据的时间,默认使用UTC并精确到毫秒
- 维度列:来自于OLAP概念,标识类别信息
- 指标列:用于聚合和计算的列,通常是一些数字
支持对任意指标列进行聚合(Roll Up)操作,如同维度列聚合或指定时间粒度的聚合。在存储时就对数据进行聚合是Druid的特点,可以节省存储空间,提高查询效率
- Segment结构
- 通过对segmentGranularity的设置,将不同时间范围的数据存储在不同Segment数据块中;查询数据仅需访问对应时间段内的数据块,效率极大提高。
- 提供面对列的数据压缩存储,并使用Bitmap等技术对访问进行优化
实时节点
实时节点主要负责实时数据摄入,生成Segment数据文件;
Segment文件的制造和传播
- 实时节点通过Firehose来消费实时数据
- 实时节点通过Plumber来生成数据文件,并将多个数据块合并成一个大的Segment
Segment文件的传播过程见上篇
高可用和可扩展性
可以使用一组节点组成Group共同消费一个Topic,使得每个分区不会被多余一个实时节点消费。当节点会主动将Offset提交到ZK,这样能实现节点失败重分配,同时保证了实时扩展性。
为了避免挂掉的节点已消费但未上传的数据丢失,可以采用以下方法
– 使得挂掉的节点恢复,重启时节点会加载所有尚未上传的Segment文件,保证数据完整
– 使用Tranquility和索引服务对Topic进行精确消费和备份。
历史节点
- 启动时,先检查本地已有的Segment文件,并从DeepStorage中下载属于自己但不在本地的Segment数据文件
- 查询时,现将Segment文件加载到内存再提供查询
- 历史节点的查询速度与内存空间大小和负责的Segment数据文件大小之比成正比
层的分组功能
- 数据温度用来描述数据被访问的频繁程度
- 热数据:经常被访问,数据量不大,要求最高响应
- 温数据:不常被访问,数据量中等,要求尽可能快
- 冷数据:偶尔被访问,数据量大,不要求响应速度
- Druid提出层(Tier)的概念,将历史节点根据性能容量分为不同的层,并且可让不同性质的DataSource使用不同的层来存储Segment
高可用和扩展
- 新的历史节点添加后会通过ZK被协调节点发现,协调节点会自动分配Segment给他
- 历史节点被移除后同样被协调节点发现,并将原本分配给这个节点的Segment分配给其他可用节点
查询节点
一般情况下,Druid集群对外提供服务的只有查询节点,查询节点会将实时节点和历史节点查询到的数据合并后返回客户端
缓存
Druid支持使用Cache机制来提高查询效率;查询时首先访问Cache,不命中时才会去访问数据
- 外部Cache,如Memcached
- 内部Cache,查询节点或历史节点的内存
高可用
可以使用如Nginx来完成对多个查询节点的负载均衡,以实现高可用
协调节点
协调节点负责历史节点的数据负载均衡和通过规则管理数据生命周期
数据负载均衡
对于历史节点来说,协调节点类似于他们的Master,协调节点会给历史节点分配数据,来达到数据负载均衡。当协调节点挂掉时,历史节点可以提供查询服务,但是不能接收新的Segment
管理生命周期
协调节点会根据DataSource配置的规则对于每个Segment文件逐条检查,当符合规则时就立即命令历史节点执行这个命令(加载或丢弃)
高可用性
默认情况下,从历史节点挂掉到协调节点重新分配这个节点上的Segment文件到其他历史节点的这段时间内,挂掉节点上的数据是不可访问的;但是可以通过增加副本的方式在多个历史节点上存储同一份数据来保障高可用
索引服务
索引服务也可以产生Segment文件,支持pull,push模式方式,可通过API编程的方式来灵活定义任务配置,并完成跟Segment相关的所有操作
主从架构
索引服务包含统治节点为主节点,中间管理者节点为从节点
统治节点
负责对外接收任务请求,对内将任务分解并下发到从节点上;统治节点提供RESTful的访问方法,可以通过HTTP请求提交任务或查看任务状态。统治节点有以下两种运行模式
- 本地模式:统治节点不止负责集群任务协调分配,也能启动一些苦工(peon)来完成具体工作
- 远程模式:统治节点和中间管理者运行在不同节点上,此时统治节点仅完成集群任务协调分配。
中间管理者&苦工
中间管理者就是索引服务的工作节点,负责接收统治节点分配的任务,并启动相关苦工(独立的JVM)来完成任务
数据摄入
方式
- 流式数据:指不断产生数据的数据源,如消息队列,日志等;Druid提供了Push和Pull两种方式
- Pull方式需要启动一个实时节点,通过不同的Firehose摄入
- Push方式需要启动索引服务,提供一个Http接口来接受数据推送
- 静态数据:指已经产生完全,不会产生新数据的源,如离线数据;也可通过上述两种方式来摄取
流式数据摄取
Pull
- 定义配置文件,包含三部分
- dataSchema 包括数据源的描述,数据类型,列,指标列等等;参考文档
- ioConfig 指定了具体的数据源,如Kafka Topic,Server等配置
- tuningConfig 优化参数
Push
- 启动索引任务,需要向统治节点发送一份Ingestion Spec
- 通过push-event接口发送数据
静态数据摄取
- 索引方式:向统治节点提交索引任务
- 以Hadoop方式摄取:向统治节点Post一个请求,启动Hadoop Index Job,Druid会提交一个MR任务到Hadoop,适合离线数据生成历史分片
流式与批量数据摄取的结合
Lambda架构
满足一个稳定的大规模数据处理系统所需的容错性,低延迟,可扩展性;
– 任何数据可定义为 query = func(all data)
– 人为容错性:数据是易丢失的
– 数据不可变性:数据是只读的,不再变化
– 重新计算:基于上面两个原则,运行函数重新计算结果是可能的
该架构具有如下特点:
– 所有新数据分别分发到批处理层和实时处理层
– 批处理层有两个功能,管理主要数据(只能增加,不能更新)和为下一步计算批处理视图做预计算
– 服务层计算出批处理视图中的数据做索引,以提供低延时,即席查询
– 实时处理层仅处理实时数据,并为服务层提供查询服务
– 任何查询可通过实时层和批处理层的查询结果合并得到
解决时间窗口问题
Druid中,超过时间窗口的数据会被丢弃,为了解决这个问题,参考Lambda架构,实现方式如下:
1. 源数据都进入Kafka
2. 数据通过实时节点或索引服务进入Druid
3. 同时数据通过Flume备份到Hadoop
4. 定时或DQC发现数据丢失时,通过Druid Hadoop Index Job 重新摄入数据
其他
Druid数据以时间分片,当短时间内涌入大量数据时会造成Segment文件过大,从而影响查询;Druid通过数据分片和复制使得数据分布到更多节点以提高效率
数据分片
- 实时节点数据分片(可以通过tuningConfig中的shardSpec指定分片方式)
- 要求查询时所有分片必须存在
- 要求指定分片总数
- 添加新的实时节点时,不用更改原实时节点的配置
- 查询时,即使分片缺失,所有分片都会被查询
- Linear分片:
- Numbered分片
- DruidIndexJob分片(只能设置一种)
- targetPartitionSize 通过设置分片大小计算分片个数
- numShards 直接设置分片个数
- HadoopIndex Job 分片(通过partitionSpec设置)
- 哈希分片:基于维度值的哈希值分区(更快,分布更均匀)
- 范围分区:基于纬度值的取值范围分区
数据复制
- DeepStorage:系统一般自带副本能力,保证数据不丢失
- Druid内部数据复制:通过设置Segment副本来保证
通过Tranquility操作索引服务
pass
高基数维度优化
- Cardinality aggregator(SQL中Count(distinct x)的默认方法)
- 基于HyperLoglog算法
- 只在查询时优化,不减少存储容量
- 效率比存储时预聚合的 HyperUnique aggregator低
- HyperUnique aggregator
Kafka索引服务
设计背景
- 保证数据摄入的Exactly Once语义
- 不受windowPeriod的约束,可以摄入任意时间戳的数据,而不仅仅是当前的数据
- 操作易用性,自适应性强,可以根据Kafka分区增加或减少任务的数量
windowPeriod的设定会导致超出时间窗口延迟的数据被丢弃,而过长的时间窗口会影响索引服务的任务完成退出和查询性能;影响数据不重复摄入的主要是Kafka的Offset管理。在最初的KafakDireChief采用高层的消费者,这会自动完成类似Broker的Leader选择,Offset维护,管理分区和消费者之间的均衡和重平衡等功能,同一个Group中的消息只会被一个消费者消费一次。
- 同一个Group的消息只能被消费一次,导致很难实现多副本来保证高可用和查询一致性
- 高等级消费者采用ZK存储Offset,导致内存增量持久化和Offset提交不在同一事物中。会存在持久化成功但是没提交的情况下节点失败会导致这条消息被重复消费。
实现
** 采用了Supervisor(监督者)的方式运行在Overlord上**
- KafkaSupervisor:负责索引任务创建和管理整个生命周期;监管索引任务状态来协调移交,管理失败,保障可扩展性等
- KafkaPartitions来记录Topic和分区->Offset的映射关系
- KafkaIndexTask从 KafkaIOConfig->startPartition的Offset开始读取,直到endPartition结束,发布移交Segment。执行过程中,startPartition->Offset不会改变,KafkaSupervisor通过修改endPartition来控制任务结束
- 运行中的任务分读取和发布状态;任务会保持读取状态,直到达到taskDuartion后进入发布状态。接下来保持发布状态直到生成Segment并推送到DeepStorage,并且等待历史节点加载
- TaskGroup是KafkaSupervisor管理Kafka分区,Offset的数据结构
- Appenderator:索引数据,类似LSM-Tree的架构
- FiniteAppendderatorDriver驱动Appenderator完成有限流式数据的索引,在索引完成后执行移交操作
- SegmentAllocator 根据给定的时间戳,分配一个Segment
- 检查任务是否达到任务的持续时间(taskDuration,默认一小时)。达到则发送信号提示停止读取数据,进入发布阶段
终止taskGroup的流程
优势
- 去掉时间窗口,读取数据后根据时间戳使用SegmentAllocator分配到合适的Segment(缺点是这样会产生碎片化的Segment)
- Segment的发布和Offset的提交在同一事务中处理,都在发布截断完成,可以解决重复摄入的问题