消息存储原理与 ID 规则

最近更新时间:2024-10-14 10:09:21

我的收藏

消息 ID 生成规则

在 Pulsar 中,每条消息都有自己的 ID(即 MessageID),MessageID 由四部分组成:ledgerId:entryID:partition-index:batch-index。其中:
partition-index:指分区的编号,在非分区 topic 的时候为 -1。
batch-index:在非批量消息的时候为 -1。
消息 ID 的生成规则由 Pulsar 的消息存储机制决定,Pulsar 中消息存储原理图如下:



如上图所示,在 Pulsar中,一个 Topic 的每一个分区会对应一系列的 ledger,其中只有一个 ledger 处于 open 状态即可写状态,而每个 ledger 只会存储与之对应的分区下的消息。
Pulsar 在存储消息时,会先找到当前分区使用的 ledger ,然后生成当前消息对应的 entry ID,entry ID 在同一个 ledger 内是递增的。每个 ledger 存在的时长或保存的 entry 个数超过阈值后会进行切换,新的消息会存储到同一个 partition 中的下一个 ledger 中。
批量生产消息情况下,一个 entry 中可能包含多条消息。
非批量生产的情况下,一个 entry 中包含一条消息(producer 端可以配置这个参数,默认是批量的)。
Ledger 只是一个逻辑概念,是数据的一种逻辑组装维度,并没有对应的实体。而 bookie 只会按照 entry 维度进行写入、查找、获取。

分片机制详解:Ledger 和 Entry

Pulsar 中的消息数据以 ledger 的形式存储在 BookKeeper 集群的 bookie 存储节点上。Ledger 是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个 bookie 的写入。Ledger 的条目会被复制到多个 bookie 中,同时会写入相关的数据来保证数据的一致性。
BookKeeper 需要保存的数据包括:
Journals
journals 文件里存储了 BookKeeper 的事务日志,在任何针对 ledger 的更新发生前,都会先将这个更新的描述信息持久化到这个 journal 文件中。
BookKeeper 提供有单独的 sync 线程根据当前 journal 文件的大小来作 journal 文件的 rolling。
EntryLogFile
存储真正数据的文件,来自不同 ledger 的 entry 数据先缓存在内存buffer中,然后批量flush到EntryLogFile中。
默认情况下,所有ledger的数据都是聚合然后顺序写入到同一个EntryLog文件中,避免磁盘随机写。
Index 文件
所有 Ledger 的 entry 数据都写入相同的 EntryLog 文件中,为了加速数据读取,会作 ledgerId + entryId 到文件 offset 的映射,这个映射会缓存在内存中,称为 IndexCache。
IndexCache 容量达到上限时,会被 sync 线程 flush 到磁盘中。
三类数据文件的读写交互如下图:


Entry 数据写入
1. 数据首先会同时写入 Journal(写入 Journal 的数据会实时落到磁盘)和 Memtable(读写缓存)。
2. 写入 Memtable 之后,对写入请求进行响应。
3. Memtable 写满之后,会 flush 到 Entry Logger 和 Index cache,Entry Logger 中保存数据,Index cache 中保存数据的索引信息,
4. 后台线程将 Entry Logger 和 Index cache 数据落到磁盘。
Entry 数据读取
Tailing read 请求:直接从 Memtable 中读取 Entry。
Catch-up read(滞后消费)请求:先读取 Index信息,然后索引从 Entry Logger 文件读取 Entry。
数据一致性保证:LastLogMark
写入的 EntryLog 和 Index 都是先缓存在内存中,再根据一定的条件周期性的 flush 到磁盘,这就造成了从内存到持久化到磁盘的时间间隔,如果在这间隔内 BookKeeper 进程崩溃,在重启后,我们需要根据 journal 文件内容来恢复,这个 LastLogMark 就记录了从 journal 中什么位置开始恢复。
它其实是存在内存中,当 IndexCache 被 flush 到磁盘后其值会被更新,LastLogMark 也会周期性持久化到磁盘文件,供 Bookkeeper 进程启动时读取来从 journal 中恢复。
LastLogMark 一旦被持久化到磁盘,即意味着在其之前的 Index 和 EntryLog 都已经被持久化到了磁盘,那么 journal 在这 LastLogMark 之前的数据都可以被清除了。