in LevelDB 分布式数据库 ~ read.

LevelDB写入

LevelDB写入

从写操作开始

首先来看 GetPut 两者中的写方法:

Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
  WriteBatch batch;
  batch.Put(key, value);
  return Write(opt, &batch);
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
    ...
}

DB::Put 方法将传入的参数封装成了一个 WritaBatch,然后仍然会执行 DBImpl::Write 方法向数据库中写入数据;写入方法 DBImpl::Write 其实是一个是非常复杂的过程,包含了很多对上下文状态的判断,我们先来看一个写操作的整体逻辑:

LevelDB-Put

从总体上看,LevelDB 在对数据库执行写操作时,会有三个步骤:

  1. 调用 MakeRoomForWrite 方法为即将进行的写入提供足够的空间;
    • 在这个过程中,由于 memtable 中空间的不足可能会冻结当前的 memtable,发生 Minor Compaction 并创建一个新的 MemTable 对象;
    • 在某些条件满足时,也可能发生 Major Compaction,对数据库中的 SSTable 进行压缩;
  2. 通过 AddRecord 方法向日志中追加一条写操作的记录;
  3. 再向日志成功写入记录后,我们使用 InsertInto 直接插入 memtable 中,完成整个写操作的流程;

在这里,我们并不会提供 LevelDB 对于 Put 方法实现的全部代码,只会展示一份精简后的代码,帮助我们大致了解一下整个写操作的流程:

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;

  MakeRoomForWrite(my_batch == NULL);

  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  WriteBatch* updates = BuildBatchGroup(&last_writer);
  WriteBatchInternal::SetSequence(updates, last_sequence + 1);
  last_sequence += WriteBatchInternal::Count(updates);

  log_->AddRecord(WriteBatchInternal::Contents(updates));
  WriteBatchInternal::InsertInto(updates, mem_);

  versions_->SetLastSequence(last_sequence);
  return Status::OK();
}

C++

不可变的 memtable

在写操作的实现代码 DBImpl::Put 中,写操作的准备过程 MakeRoomForWrite 是我们需要注意的一个方法:

Status DBImpl::MakeRoomForWrite(bool force) {
  uint64_t new_log_number = versions_->NewFileNumber();
  WritableFile* lfile = NULL;
  env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);

  delete log_;
  delete logfile_;
  logfile_ = lfile;
  logfile_number_ = new_log_number;
  log_ = new log::Writer(lfile);
  imm_ = mem_;
  has_imm_.Release_Store(imm_);
  mem_ = new MemTable(internal_comparator_);
  mem_->Ref();
  MaybeScheduleCompaction();
  return Status::OK();
}

C++

当 LevelDB 中的 memtable 已经被数据填满导致内存已经快不够用的时候,我们会开始对 memtable 中的数据进行冻结并创建一个新的 MemTable 对象。

Immutable-MemTable

你可以看到,与 Bigtable 中论文不同的是,LevelDB 中引入了一个不可变的 memtable 结构 imm,它的结构与 memtable 完全相同,只是其中的所有数据都是不可变的。

LevelDB-Serving

在切换到新的 memtable 之后,还可能会执行 MaybeScheduleCompaction 来触发一次 Minor Compaction 将 imm 中数据固化成数据库中的 SSTable;imm 的引入能够解决由于 memtable 中数据过大导致压缩时不可写入数据的问题。

引入 imm 后,如果 memtable 中的数据过多,我们可以直接将 memtable 指针赋值给 imm,然后创建一个新的 MemTable 实例,这样就可以继续接受外界的写操作,不再需要等待 Minor Compaction 的结束了。

日志记录的格式

作为一个持久存储的 KV 数据库,LevelDB 一定要有日志模块以支持错误发生时恢复数据,我们想要深入了解 LevelDB 的实现,那么日志的格式是一定绕不开的问题;这里并不打算展示用于追加日志的方法 AddRecord 的实现,因为方法中只是实现了对表头和字符串的拼接。

日志在 LevelDB 是以块的形式存储的,每一个块的长度都是 32KB,固定的块长度也就决定了日志可能存放在块中的任意位置,LevelDB 中通过引入一位 RecordType 来表示当前记录在块中的位置:

enum RecordType {
  // Zero is reserved for preallocated files
  kZeroType = 0,
  kFullType = 1,
  // For fragments
  kFirstType = 2,
  kMiddleType = 3,
  kLastType = 4
};

C++

日志记录的类型存储在该条记录的头部,其中还存储了 4 字节日志的 CRC 校验、记录的长度等信息:

LevelDB-log-format-and-recordtype

上图中一共包含 4 个块,其中存储着 6 条日志记录,我们可以通过 RecordType 对每一条日志记录或者日志记录的一部分进行标记,并在日志需要使用时通过该信息重新构造出这条日志记录。

virtual Status Sync() {
  Status s = SyncDirIfManifest();
  if (fflush_unlocked(file_) != 0 ||
      fdatasync(fileno(file_)) != 0) {
    s = Status::IOError(filename_, strerror(errno));
  }
  return s;
}

C++

因为向日志中写新记录都是顺序写的,所以它写入的速度非常快,当在内存中写入完成时,也会直接将缓冲区的这部分的内容 fflush 到磁盘上,实现对记录的持久化,用于之后的错误恢复等操作。

记录的插入

当一条数据的记录写入日志时,这条记录仍然无法被查询,只有当该数据写入 memtable 后才可以被查询,而这也是这一节将要介绍的内容,无论是数据的插入还是数据的删除都会向 memtable 中添加一条记录。

LevelDB-Memtable-Key-Value-Format

添加和删除的记录的区别就是它们使用了不用的 ValueType 标记,插入的数据会将其设置为 kTypeValue,删除的操作会标记为 kTypeDeletion;但是它们实际上都向 memtable 中插入了一条数据。

virtual void Put(const Slice& key, const Slice& value) {
  mem_->Add(sequence_, kTypeValue, key, value);
  sequence_++;
}
virtual void Delete(const Slice& key) {
  mem_->Add(sequence_, kTypeDeletion, key, Slice());
  sequence_++;
}

C++

我们可以看到它们都调用了 memtable 的 Add 方法,向其内部的数据结构 skiplist 以上图展示的格式插入数据,这条数据中既包含了该记录的键值、序列号以及这条记录的种类,这些字段会在拼接后存入 skiplist;既然我们并没有在 memtable 中对数据进行删除,那么我们是如何保证每次取到的数据都是最新的呢?首先,在 skiplist 中,我们使用了自己定义的一个 comparator

int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
  int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
  if (r == 0) {
    const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
    const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
    if (anum > bnum) {
      r = -1;
    } else if (anum < bnum) {
      r = +1;
    }
  }
  return r;
}

C++

比较的两个 key 中的数据可能包含的内容都不完全相同,有的会包含键值、序列号等全部信息,但是例如从 Get 方法调用过来的 key 中可能就只包含键的长度、键值和序列号了,但是这并不影响这里对数据的提取,因为我们只从每个 key 的头部提取信息,所以无论是完整的 key/value 还是单独的 key,我们都不会取到 key 之外的任何数据。

该方法分别从两个不同的 key 中取出键和序列号,然后对它们进行比较;比较的过程就是使用 InternalKeyComparator 比较器,它通过 user_keysequence_number 进行排序,其中 user_key 按照递增的顺序排序、sequence_number 按照递减的顺序排序,因为随着数据的插入序列号是不断递增的,所以我们可以保证先取到的都是最新的数据或者删除信息。

LevelDB-MemTable-SkipList

在序列号的帮助下,我们并不需要对历史数据进行删除,同时也能加快写操作的速度,提升 LevelDB 的写性能。