
ByteHouse是火山引擎上的一款云原生数据仓库,为用户带来极速分析体验,能够支撑实时数据分析和海量离线数据分析;便捷的弹性扩缩容能力,极致的分析性能和丰富的企业级特性,助力客户数字化转型。
本文将从需求动机、技术实现及实际应用等角度,介绍基于不同架构的ByteHouse实时导入技术演进。

ByteHouse实时导入技术的演进动机,起初于字节跳动内部业务的需求。
在字节内部,ByteHouse主要还是以Kafka为实时导入的主要数据源(本文都以 Kafka 导入为例展开描述,下文不再赘述)。对于大部分内部用户而言,其数据体量偏大;所以用户更看重数据导入的性能、服务的稳定性以及导入能力的可扩展性。而对于数据延时性,大多数用户只要是秒级可见就能满足其需求。基于这样的场景,ByteHouse进行了定制性的优化。

ByteHouse首先沿用了Clickhouse社区的分布式架构,但分布式架构有一些天然性架构层面的缺陷,这些痛点主要表现在三个方面:
这些是分布式架构天然的痛点,但是由于其天然的并发特性,以及本地磁盘数据读写的极致性能优化,可以说有利有弊。
基于分布式架构的实时导入核心设计其实就是两级并发:
一个CH集群通常有多个Shard,每个Shard都会并发做消费导入,这就是第一级Shard间的多进程并发;
每个Shard内部还可以使用多个线程并发消费,从而达到很高的性能吞吐。
就单个线程来说,基本消费模式是攒批写入——消费一定的数据量,或者一定时间之后,再一次性写入。攒批写入可以更好地实现性能优化,查询性能提升,并降低后台Merge线程的压力。
上述社区的设计与实现,还是无法满足用户的一些高级需求:
为了解决上述需求,ByteHouse团队基于分布式架构自研了一种消费引擎——HaKafka。
HaKafka继承了社区原有Kafka表引擎的消费优点,再重点做了高可用的Ha优化。就分布式架构来谈,其实每个Shard内可能都会有多个副本,在每个副本上都可以做 HaKafka表的创建。但是ByteHouse只会通过ZK选一个Leader,让Leader来真正地执行消费流程,其他节点位于Stand by状态。当Leader节点不可用了,ZK可以在秒级将Leader切到Stand by节点继续消费,从而实现一种高可用。
HaKafka的消费模式从High Level调整到了Low Level模式。Low Level模式可以保证Topic Partition有序和均匀地分配到集群内各个shard;与此同时,Shard内部可以再一次用多线程,让每个线程来消费不同Partition。从而完全继承了社区Kafka表引擎两级并发的优点。
在Low-Level消费模式下,上游用户只要在写入Topic的时候,保证没有数据倾斜,那么通过HaKafka导入到 Clickhouse里的数据肯定也是均匀分布在各个shard的。
同时,对于有特殊数据分布需求——将相同Key的数据写到相同Shard——的高级用户,只要在上游保证相同Key的数据写入相同Partition,那么导入ByteHouse也就能完全满足用户需求,很好地支持唯一键等场景。

基于上图可见,假设有一个双副本的Shard,每个副本都会有一张相同的HaKafka表处于Ready的状态。但是只有通过ZK选主成功的 leader 节点上,HaKafka才会执行对应的消费流程。当这个leader节点宕机以后, 副本Replica 2会自动再被选为一个新的Leader,继续消费,从而保证高可用。

在节点故障场景下,一般需要执行替换节点流程。对于分布式节点替换有一个很繁重的操作——拷贝数据。
如果是一个多副本的集群,一个副本故障,另一个副本是完好的。我们很自然希望在节点替换阶段,Kafka消费放在完好的副本Replica 2上,因为其上旧数据是完备的。这样 Replica 2 就始终是一个完备的数据集,可以正常对外提供服务。这一点HaKafka是可以保证的。HaKafka 选主的时候,如果确定有某一个节点在替换节点流程当中,会避免将其选为Leader。

HaKafka还做到了Memory Table的优化。
考虑这样一个场景:业务有一个大宽表,可能有上百列的字段 或者上千的Map-Key。由于ClickHouse每一个列都会对应落盘为一个具体的文件,列越多,每次导入写的文件也就越多。那么,相同消费时间内,就会频繁地写很多的碎文件,对于机器的IO是很沉重的负担,同时给MERGE带来很大压力;严重时甚至导致集群不可用。为了解决这种场景,我们设计了Memory Table实现导入性能优化。
Memory Table的做法就是每一次导入数据不直接刷盘,而是存在内存中;当数据达到一定量以后,再集中刷盘,减少 IO 操作。Memory Table可以提供对外查询服务的,查询会路由到消费节点所在的副本去读 memory table 里边的数据,这样保证了不影响数据导入的延时性。从内部使用经验来看,Memory Table不仅很好地解决了部分大宽表业务导入需求,而且导入性能最高可以提升3倍左右。
鉴于上文描述的分布式架构的天然缺陷,ByteHouse团队一直致力于对架构进行升级。我们选择了业务主流的云原生架构,新的架构在2021年初开始服务字节内部业务,并于2023年初进行了代码开源(ByConity)。

云原生架构本身有着很天然的自动容错能力以及轻量级的扩缩容能力。同时,因为它的数据是云存储的,既实现了存储计算分离,数据的安全性和稳定性也得到了提高。当然,云原生架构也不是没有缺点,将原来的本地读写改为远端读写,必然会带来一定的读写性能损耗。但是,以一定的性能损耗来换取架构的合理性,降低运维成本,其实是利大于弊的。

上图是ByteHouse云原生架构的架构图,本文针对实时导入这块介绍几个重要的相关组件。
首先,总架构分为三层,第一层是Cloud Service,主要包含Server和Catlog两个组件。这一层是服务入口,用户的所有请求包括查询导入都从Server进入。Server只对请求做预处理,不具体执行;在Catlog查询元信息后,把预处理的请求和元信息下发到Virtual Warehouse执行。
Virtual Warehouse是执行层。不同的业务,可以有独立的Virtual Warehouse,从而做到资源隔离。现在Virtual Warehouse主要分为两类,一类是Default,一类是Write,Default主要做查询,Write做导入,实现读写分离。
最底层是VFS(数据存储),支持HDFS、S3、aws等云存储组件。
在云原生架构下,Server端不做具体的导入执行,只做任务管理。因此在Server端,每个消费表会有一个Manager,用来管理所有的消费执行任务,并将其调度到 Virtual Warehouse 上执行。
因为继承了 HaKafka 的Low Level消费模式,Manager会根据配置的消费任务数量,将Topic Partition均匀分配给各个任务;消费任务的数量是可配置的,上限是Topic Partition数目。

基于上图,大家可以看到左边是 Manager ,从 catalog 拿到对应的Offset,然后根据指定的消费任务数目,来分配对应的消费Partition、并调度到Virtual Warehouse 的不同节点来执行。

因为云原生新架构下是有事务Transaction保证的,所有操作都希望在一个事务内完成,也更加的合理化。
依托云原生新架构下的Transaction实现,每个消费任务的消费流程主要包括以下步骤:
从上述消费流程里可以看到,云原生新架构下的消费,容错保证主要是基于Manager和Task的双向心跳以及快速失败策略:
关于消费能力的话,上文提到它是一个可扩展性的,消费任务数量可以由用户来配置,最高可以达到Topic的Partition数目。如果Virtual Warehouse中节点负载高的话,也可以很轻量地扩节点。
当然,Manager调度任务实现了基本的负载均衡保证——用Resource Manager来做任务的管理和调度。
最后,云原生新架构下的消费语义也有一个增强——从分布书架构的At-Least-Once升级到Exactly—Once。
因为分布式架构是没有事务的,只能做到一个At-Least-Once,就是任何情况下,保证不丢数据,但是一些极端情况可能会有重复消费发生。到了云原生架构,得益于Transaction的实现,每一次消费都可以通过事务让Part和Offset 实现原子性提交,从而达到Exactly—Once的语义增强。

对应HaKafka 的memory table,云原生架构同样实现了导入内存缓存Memory Buffer。
与Memory Table不同的是,Memory Buffer不再绑定到Kafka的消费任务上,而是实现为存储表的一层缓存。这样Memory Buffer就更具有通用性,不仅是 Kafka导入可以使用,像Flink 小批量导入的时候也可以使用。
同时,我们引入了一个新的组件 WAL 。数据导入的时候先写 WAL,只要写成功了,就可以认为数据导入成功了——当服务启动后,可以先从WAL恢复未刷盘的数据;之后再写Memory buffer,写成功数据就可见了——因为Memory Buffer是可以由用户来查询的。Memory Buffer的数据同样定期刷盘,刷盘后即可从WAL中清除。
最后简单介绍实时导入在字节内部的使用现状,以及下一代实时导入技术的可能优化方向。
ByteHouse 的实时导入技术是以Kafka为主,每天的数据吞吐是在 PB 级,导入的单个线程或者说单个消费者吞吐的经验值在10-20MiB/s。(这里之所以强调是经验值,因为这个值不是一个固定值,也不是一个峰值;消费吞吐很大程度上取决于用户表的复杂程度,随着表列数增加,导入性能可能会显著降低,无法使用一个准确的计算公式。因此,这里的经验值更多的是字节内部大部分表的导入性能经验值。)
除了Kafka,字节内部其实还支持一些其他数据源的实时导入,包括 RocketMQ、Pulsar、MySQL(MaterializedMySQL)、 Flink 直写等。
关于下一代实时导入技术的简单思考:
ByteHouse 已经在火山引擎上全面对外服务,并且提供各种版本以满足不同类型用户的需求。