回想一下LSM数据写入的流程:
// 写数据对外接口
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
WriteBatch封装了一个批量修改数据的原子化操作;
其主要完成数据的串行化拼接,拼接后的格式如下:
count | record | record... (kTypeValue+Key+Value;kKeyDeletion+Key)
// 把字符串编码,拼接保存到rep_变量中
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
由于WAL日志文件和MemTable内存结构是全局共享资源,在多线程同时写入数据时,需加互斥锁来保证操作的隔离性。考虑到写WAL涉及磁盘的写入操作,耗时较久,会影响数据写入的并发性能。
leveldb针对此问题,做了一个批量写入的优化:
把数据的写入操作拆分成两个阶段,来缩短锁等待的时间; 在准备阶段,写入时获取到锁后,把更改的数据加入到待写入的队列中;再检查自己是不是排在待写入队列的头部,如不是,则释放锁,进入等待中; 如检查到自己出于带写入队列的头部,则再次获取锁,并尽可能多的从待写入队列上读取数据,写入到WAL日志文件中。
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
// 如数据未完成写入,且当前writer不在待写入队列的头部,则等待
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
// 如数据写入完成,则退出(其他线程已帮忙完成数据写入)
if (w.done) {
return w.status;
}
// 数据限流机制
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
// 尽可能多的从待写入队列中取出数据,拼接成写的WriteBatch结构
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
{
// 释放锁,让其他线程可以读数据,或者把数据放入待写入队列中
mutex_.Unlock();
// 批量写入数据
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
}
// 把数据写入到MemTable中
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
// 更新全局的sequence
versions_->SetLastSequence(last_sequence);
}
// 从写入队列中剔除此次已完成的写入,更新其为写入完成状态,并唤醒其等待的线程
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// 如队列中还有数据,则唤醒其他线程继续写入数据
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。