MemTable(db/memtable.h db/memtable.cc db/skiplist.h)LevelDB中存储在内存中的那部分KV数据都存储在memtable中,而memtable中的数
MemTable(db/memtable.h db/memtable.cc db/skiplist.h)
LevelDB中存储在内存中的那部分KV数据都存储在memtable中,而memtable中的数据实际是用跳表来存储的。MemTable使用Arena进行内存管理,并提供了添加、查找、迭代器的接口,而实际上这些接口都是调用SkipList的添加和迭代器接口来实现的。
MemTable类
MemTable类的声明:
class MemTable{ public: // ... // Increase reference count. void Ref() { ++refs_; } // Drop reference count. Delete if no more references exist. void Unref() { --refs_; assert(refs_ >= 0); if (refs_ <= 0) { delete this; } } // ... // Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live // while the returned iterator is live. The keys returned by this // iterator are internal keys encoded by AppendInternalKey in the // db/format.{h,cc} module. Iterator *NewIterator(); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. void Add(SequenceNumber seq, ValueType type, const Slice &key, const Slice &value); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // Else, return false. bool Get(const LookupKey &key, std::string *value, Status *s); private: // ... struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator &c) : comparator(c) {} int operator()(const char *a, const char *b) const; }; friend class MemTableIterator; friend class MemTableBackwardIterator; typedef SkipList<const char *, KeyComparator> Table; KeyComparator comparator_; int refs_; Arena arena_; Table table_; // ...};
MemTable的成员变量
table_变量即为实际存储KV数据的跳表,arena_用于内存管理,refs_用于引用计数,comparator_用于key之间的比较。
其中最简单的是ref_,调用Ref()函数则增加引用计数,调用Unref()函数减少引用次数,并且当引用次数为0时,Unref()函数会将对象自身delete。
Arena类
然后是arena_。Arena类为:
class Arena{ public: // ... // Return a pointer to a newly allocated memory block of "bytes" bytes. char *Allocate(size_t bytes); // Allocate memory with the normal alignment guarantees provided by malloc char *AllocateAligned(size_t bytes); // ... private: char *AllocateFallback(size_t bytes); char *AllocateNewBlock(size_t block_bytes); // Allocation state char *alloc_ptr_; size_t alloc_bytes_remaining_; // Array of new[] allocated memory blocks std::vector<char *> blocks_; // Total memory usage of the arena. port::AtomicPointer memory_usage_; // ...};
alloc_ptr_指向当前可以被分配的内存起始位置(在某一个block中),alloc_bytes_remaining_为当前可以被分配的内存的大小,blocks_中包含了指向每一个block的指针,memory_usage_为使用的内存总量,并且使用了AtomicPointer,可以通过内存屏障或者c++的atomic接口实现原子操作。
Arena类提供的方法主要有两个,一个是Allocate(size_t bytes):
inline char *Arena::Allocate(size_t bytes)char *Arena::AllocateFallback(size_t bytes)char *Arena::AllocateNewBlock(size_t block_bytes){ char *result = new char[block_bytes]; blocks_.push_back(result); memory_usage_.NoBarrier_Store( reinterpret_cast<void *>(MemoryUsage() + block_bytes + sizeof(char *))); return result;}
Allocate(size_t bytes)函数分配的内存不保证对齐。首先检查当前可以被分配的内存空间是否足够,如果够的话,直接将从alloc_ptr_指向的位置开始的bytes个字节分配给调用者,并移动alloc_ptr_指向新的位置,调整alloc_bytes_remaining_:
// The semantics of what to return are a bit messy if we allow // 0-byte allocations, so we disallow them here (we don't need // them for our internal use). assert(bytes > 0); if (bytes <= alloc_bytes_remaining_) { char *result = alloc_ptr_; alloc_ptr_ += bytes; alloc_bytes_remaining_ -= bytes; return result; }
如果不够,就调用AllocateFallback(size_t bytes)函数:
return AllocateFallback(bytes);
AllocateFallback(size_t bytes)函数首先检查bytes大小是否超过1/4个block,如果超过,则直接调用AllocateNewBlock(size_t block_bytes)函数分配一个新的block给调用者,并且这个新的block中除了当前的bytes个字节,其余部分将不会被使用,而alloc_ptr_的指向却不变,也就是说当前可以被分配的内存保持不变,因为此时剩余空间至少为1/4个block,这样设计就可以减少大块的内存碎片:
if (bytes > kBlockSize / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. char *result = AllocateNewBlock(bytes); return result; }
如果没有超过1/4个block,就分配一个新的block,并且将alloc_ptr_指向新的block中的剩余空间:
// We waste the remaining space in the current block. alloc_ptr_ = AllocateNewBlock(kBlockSize); alloc_bytes_remaining_ = kBlockSize; char *result = alloc_ptr_; alloc_ptr_ += bytes; alloc_bytes_remaining_ -= bytes; return result;
还有一个内存分配函数是AllocateAligned(size_t bytes):
char *Arena::AllocateAligned(size_t bytes)
这个函数保证内存对齐。首先,以下代码会调整alloc_ptr_的位置使其对齐:
const int align = (sizeof(void *) > 8) ? sizeof(void *) : 8; assert((align & (align - 1)) == 0); // Pointer size should be a power of 2 size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align - 1); size_t slop = (current_mod == 0 ? 0 : align - current_mod); size_t needed = bytes + slop; char *result;
剩下的代码和Allocate(size_t bytes)中实现一致:
if (needed <= alloc_bytes_remaining_) { result = alloc_ptr_ + slop; alloc_ptr_ += needed; alloc_bytes_remaining_ -= needed; } else { // AllocateFallback always returned aligned memory result = AllocateFallback(bytes); } assert((reinterpret_cast<uintptr_t>(result) & (align - 1)) == 0); return result;
SkipList类
最后是最重要的跳表,跳表由SkipList类实现:
template <typename Key, class Comparator>class SkipList{ private: struct Node; public: // ... // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. void Insert(const Key &key); // Returns true iff an entry that compares equal to key is in the list. bool Contains(const Key &key) const; // Iteration over the contents of a skip list class Iterator { // ... }; private: // ... // Immutable after construction Comparator const compare_; Arena *const arena_; // Arena used for allocations of nodes Node *const head_; // Modified only by Insert(). Read racily by readers, but stale // values are ok. port::AtomicPointer max_height_; // Height of the entire list // ... Node *NewNode(const Key &key, int height); // ... // Return true if key is greater than the data stored in "n" bool KeyIsAfterNode(const Key &key, Node *n) const; // Return the earliest node that comes at or after key. // Return nullptr if there is no such node. // // If prev is non-null, fills prev[level] with pointer to previous // node at "level" for every level in [0..max_height_-1]. Node *FindGreaterOrEqual(const Key &key, Node **prev) const; // Return the latest node with a key < key. // Return head_ if there is no such node. Node *FindLessThan(const Key &key) const; // Return the last node in the list. // Return head_ if list is empty. Node *FindLast() const; // ...};
跳表中主要有四个成员,一个是compare_用于key的比较,一个是arena_用于内存管理,一个是head_指向跳表开头的节点,最后一个是max_height_为跳表最大的高度。
当然跳表中还需要两个类和结构,一个是Iterator类,用于跳表的遍历,另一个是Node结构,用于表示跳表中的节点。
Node结构
首先是Node结构:
template <typename Key, class Comparator>struct SkipList<Key, Comparator>::Node{ // ... Key const key; // Accessors/mutators for links. Wrapped in methods so we can // add the appropriate barriers as necessary. Node *Next(int n) // ... void SetNext(int n, Node *x) // ... // No-barrier variants that can be safely used in a few locations. Node *NoBarrier_Next(int n) // ... void NoBarrier_SetNext(int n, Node *x) // ... private: // Array of length equal to the node height. next_[0] is lowest level link. port::AtomicPointer next_[1];};
Node中key存储了节点中实际存储的KV值,next_为一个变长数组(struct最后一个元素如果是数组,则长度可变),用于存储该节点在不同高度的链表上指向下一个节点的指针。
创建Node使用了一个很神奇的函数:
template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::NewNode(const Key &key, int height){ char *mem = arena_->AllocateAligned( sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); return new (mem) Node(key);}
这个函数首先分配了一块对齐的内存,然后通过new (mem) Node(key)
这个语句创建一个节点,我也不是很懂。
Iterator类
其次是iterator类:
// Iteration over the contents of a skip list class Iterator { public: // ... // Returns true iff the iterator is positioned at a valid node. bool Valid() const; // Returns the key at the current position. // REQUIRES: Valid() const Key &key() const; // Advances to the next position. // REQUIRES: Valid() void Next(); // Advances to the previous position. // REQUIRES: Valid() void Prev(); // Advance to the first entry with a key >= target void Seek(const Key &target); // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToFirst(); // Position at the last entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToLast(); private: const SkipList *list_; Node *node_; // ... };
这个类主要有两个成员,一个list_指向迭代的跳表,node_指向当前迭代的节点。
对跳表的遍历操作需要获取一个Iterator类型的迭代器,然后通过调用Iterator类提供的接口函数对跳表进行遍历。而Iterator类提供的接口函数实际上通过调用SkipList类提供的私有接口函数实现。
SkipList类的私有接口函数如下:
template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindGreaterOrEqual(const Key &key, Node **prev) const{ Node *x = head_; int level = GetMaxHeight() - 1; while (true) { Node *next = x->Next(level); if (KeyIsAfterNode(key, next)) { // Keep searching in this list x = next; } else { if (prev != nullptr) prev[level] = x; if (level == 0) { return next; } else { // Switch to next list level--; } } }}template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindLessThan(const Key &key) const{ Node *x = head_; int level = GetMaxHeight() - 1; while (true) { assert(x == head_ || compare_(x->key, key) < 0); Node *next = x->Next(level); if (next == nullptr || compare_(next->key, key) >= 0) { if (level == 0) { return x; } else { // Switch to next list level--; } } else { x = next; } }}template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindLast() const{ Node *x = head_; int level = GetMaxHeight() - 1; while (true) { Node *next = x->Next(level); if (next == nullptr) { if (level == 0) { return x; } else { // Switch to next list level--; } } else { x = next; } }}
这三个函数的实现比较简单,类似于链表的遍历,就不做具体分析了。
另外,跳表还有两个接口函数用于插入和检查是否包含某个key,这两个函数的实现也使用了跳表的私有接口函数:
template <typename Key, class Comparator>void SkipList<Key, Comparator>::Insert(const Key &key)template <typename Key, class Comparator>bool SkipList<Key, Comparator>::Contains(const Key &key) const
MemTable成员函数
分析完以上这些,我们开始分析MemTable提供的接口函数:
void MemTable::Add(SequenceNumber s, ValueType type, const Slice &key, const Slice &value){ // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; char *buf = arena_.Allocate(encoded_len); char *p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); table_.Insert(buf);}
Add函数将key和value组成key_size | sequencenumber | type | key | value_size | value这样的形式,然后调用SkipList类提供的Insert函数插入跳表中。
bool MemTable::Get(const LookupKey &key, std::string *value, Status *s)
Get函数先用跳表的迭代器找到包含key的节点:
Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data());
然后将节点中的数据解码后封装为Slice存入value:
if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char *entry = iter.key(); uint32_t key_length; const char *key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; } case kTypeDeletion: *s = Status::NotFound(Slice()); return true; } } } return false;
MemTableIterator类
MemTable还封装了一个MemTableIterator类用于MemTable的迭代:
class MemTableIterator : public Iterator{ public: explicit MemTableIterator(MemTable::Table *table) : iter_(table) {} virtual bool Valid() const { return iter_.Valid(); } virtual void Seek(const Slice &k) { iter_.Seek(EncodeKey(&tmp_, k)); } virtual void SeekToFirst() { iter_.SeekToFirst(); } virtual void SeekToLast() { iter_.SeekToLast(); } virtual void Next() { iter_.Next(); } virtual void Prev() { iter_.Prev(); } virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } virtual Slice value() const { Slice key_slice = GetLengthPrefixedSlice(iter_.key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: MemTable::Table::Iterator iter_; std::string tmp_; // For passing to EncodeKey // No copying allowed MemTableIterator(const MemTableIterator &); void operator=(const MemTableIterator &);};
遍历MemTable就需要获得一个MemTableIterator类的迭代器,然后通过调用MemTableIterator类提供的接口函数进行遍历。MemTableIterator类其实就是实现了对跳表迭代器的封装,成员为一个跳表的迭代器iter_,接口函数通过调用跳表迭代器的接口函数实现。
227 Love u