侧边栏壁纸
博主头像
落叶人生博主等级

走进秋风,寻找秋天的落叶

  • 累计撰写 130557 篇文章
  • 累计创建 28 个标签
  • 累计收到 9 条评论
标签搜索

目 录CONTENT

文章目录

LevelDB源码分析-MemTable

2023-12-14 星期四 / 0 评论 / 0 点赞 / 13 阅读 / 19357 字

MemTable(db/memtable.h db/memtable.cc db/skiplist.h)LevelDB中存储在内存中的那部分KV数据都存储在memtable中,而memtable中的数

MemTable(db/memtable.h db/memtable.cc db/skiplist.h)

LevelDB中存储在内存中的那部分KV数据都存储在memtable中,而memtable中的数据实际是用跳表来存储的。MemTable使用Arena进行内存管理,并提供了添加、查找、迭代器的接口,而实际上这些接口都是调用SkipList的添加和迭代器接口来实现的。

MemTable类

MemTable类的声明:

class MemTable{  public:    // ...    // Increase reference count.    void Ref() { ++refs_; }    // Drop reference count.  Delete if no more references exist.    void Unref()    {        --refs_;        assert(refs_ >= 0);        if (refs_ <= 0)        {            delete this;        }    }    // ...    // Return an iterator that yields the contents of the memtable.    //    // The caller must ensure that the underlying MemTable remains live    // while the returned iterator is live.  The keys returned by this    // iterator are internal keys encoded by AppendInternalKey in the    // db/format.{h,cc} module.    Iterator *NewIterator();    // Add an entry into memtable that maps key to value at the    // specified sequence number and with the specified type.    // Typically value will be empty if type==kTypeDeletion.    void Add(SequenceNumber seq, ValueType type,             const Slice &key,             const Slice &value);    // If memtable contains a value for key, store it in *value and return true.    // If memtable contains a deletion for key, store a NotFound() error    // in *status and return true.    // Else, return false.    bool Get(const LookupKey &key, std::string *value, Status *s);  private:    // ...        struct KeyComparator    {        const InternalKeyComparator comparator;        explicit KeyComparator(const InternalKeyComparator &c) : comparator(c) {}        int operator()(const char *a, const char *b) const;    };    friend class MemTableIterator;    friend class MemTableBackwardIterator;    typedef SkipList<const char *, KeyComparator> Table;    KeyComparator comparator_;    int refs_;    Arena arena_;    Table table_;    // ...};

MemTable的成员变量

table_变量即为实际存储KV数据的跳表,arena_用于内存管理,refs_用于引用计数,comparator_用于key之间的比较。

其中最简单的是ref_,调用Ref()函数则增加引用计数,调用Unref()函数减少引用次数,并且当引用次数为0时,Unref()函数会将对象自身delete。

Arena类

然后是arena_。Arena类为:

class Arena{  public:    // ...    // Return a pointer to a newly allocated memory block of "bytes" bytes.    char *Allocate(size_t bytes);    // Allocate memory with the normal alignment guarantees provided by malloc    char *AllocateAligned(size_t bytes);    // ...  private:    char *AllocateFallback(size_t bytes);    char *AllocateNewBlock(size_t block_bytes);    // Allocation state    char *alloc_ptr_;    size_t alloc_bytes_remaining_;    // Array of new[] allocated memory blocks    std::vector<char *> blocks_;    // Total memory usage of the arena.    port::AtomicPointer memory_usage_;    // ...};

alloc_ptr_指向当前可以被分配的内存起始位置(在某一个block中),alloc_bytes_remaining_为当前可以被分配的内存的大小,blocks_中包含了指向每一个block的指针,memory_usage_为使用的内存总量,并且使用了AtomicPointer,可以通过内存屏障或者c++的atomic接口实现原子操作。

Arena类提供的方法主要有两个,一个是Allocate(size_t bytes):

inline char *Arena::Allocate(size_t bytes)char *Arena::AllocateFallback(size_t bytes)char *Arena::AllocateNewBlock(size_t block_bytes){    char *result = new char[block_bytes];    blocks_.push_back(result);    memory_usage_.NoBarrier_Store(        reinterpret_cast<void *>(MemoryUsage() + block_bytes + sizeof(char *)));    return result;}

Allocate(size_t bytes)函数分配的内存不保证对齐。首先检查当前可以被分配的内存空间是否足够,如果够的话,直接将从alloc_ptr_指向的位置开始的bytes个字节分配给调用者,并移动alloc_ptr_指向新的位置,调整alloc_bytes_remaining_:

    // The semantics of what to return are a bit messy if we allow    // 0-byte allocations, so we disallow them here (we don't need    // them for our internal use).    assert(bytes > 0);    if (bytes <= alloc_bytes_remaining_)    {        char *result = alloc_ptr_;        alloc_ptr_ += bytes;        alloc_bytes_remaining_ -= bytes;        return result;    }

如果不够,就调用AllocateFallback(size_t bytes)函数:

    return AllocateFallback(bytes);

AllocateFallback(size_t bytes)函数首先检查bytes大小是否超过1/4个block,如果超过,则直接调用AllocateNewBlock(size_t block_bytes)函数分配一个新的block给调用者,并且这个新的block中除了当前的bytes个字节,其余部分将不会被使用,而alloc_ptr_的指向却不变,也就是说当前可以被分配的内存保持不变,因为此时剩余空间至少为1/4个block,这样设计就可以减少大块的内存碎片:

    if (bytes > kBlockSize / 4)    {        // Object is more than a quarter of our block size.  Allocate it separately        // to avoid wasting too much space in leftover bytes.        char *result = AllocateNewBlock(bytes);        return result;    }

如果没有超过1/4个block,就分配一个新的block,并且将alloc_ptr_指向新的block中的剩余空间:

    // We waste the remaining space in the current block.    alloc_ptr_ = AllocateNewBlock(kBlockSize);    alloc_bytes_remaining_ = kBlockSize;    char *result = alloc_ptr_;    alloc_ptr_ += bytes;    alloc_bytes_remaining_ -= bytes;    return result;

还有一个内存分配函数是AllocateAligned(size_t bytes):

char *Arena::AllocateAligned(size_t bytes)

这个函数保证内存对齐。首先,以下代码会调整alloc_ptr_的位置使其对齐:

    const int align = (sizeof(void *) > 8) ? sizeof(void *) : 8;    assert((align & (align - 1)) == 0); // Pointer size should be a power of 2    size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align - 1);    size_t slop = (current_mod == 0 ? 0 : align - current_mod);    size_t needed = bytes + slop;    char *result;

剩下的代码和Allocate(size_t bytes)中实现一致:

    if (needed <= alloc_bytes_remaining_)    {        result = alloc_ptr_ + slop;        alloc_ptr_ += needed;        alloc_bytes_remaining_ -= needed;    }    else    {        // AllocateFallback always returned aligned memory        result = AllocateFallback(bytes);    }    assert((reinterpret_cast<uintptr_t>(result) & (align - 1)) == 0);    return result;
SkipList类

最后是最重要的跳表,跳表由SkipList类实现:

template <typename Key, class Comparator>class SkipList{  private:    struct Node;  public:    // ...    // Insert key into the list.    // REQUIRES: nothing that compares equal to key is currently in the list.    void Insert(const Key &key);    // Returns true iff an entry that compares equal to key is in the list.    bool Contains(const Key &key) const;    // Iteration over the contents of a skip list    class Iterator    {      	// ...    };  private:    // ...    // Immutable after construction    Comparator const compare_;    Arena *const arena_; // Arena used for allocations of nodes    Node *const head_;    // Modified only by Insert().  Read racily by readers, but stale    // values are ok.    port::AtomicPointer max_height_; // Height of the entire list    // ...    Node *NewNode(const Key &key, int height);    // ...    // Return true if key is greater than the data stored in "n"    bool KeyIsAfterNode(const Key &key, Node *n) const;    // Return the earliest node that comes at or after key.    // Return nullptr if there is no such node.    //    // If prev is non-null, fills prev[level] with pointer to previous    // node at "level" for every level in [0..max_height_-1].    Node *FindGreaterOrEqual(const Key &key, Node **prev) const;    // Return the latest node with a key < key.    // Return head_ if there is no such node.    Node *FindLessThan(const Key &key) const;    // Return the last node in the list.    // Return head_ if list is empty.    Node *FindLast() const;    // ...};

跳表中主要有四个成员,一个是compare_用于key的比较,一个是arena_用于内存管理,一个是head_指向跳表开头的节点,最后一个是max_height_为跳表最大的高度。

当然跳表中还需要两个类和结构,一个是Iterator类,用于跳表的遍历,另一个是Node结构,用于表示跳表中的节点。

Node结构

首先是Node结构:

template <typename Key, class Comparator>struct SkipList<Key, Comparator>::Node{    // ...    Key const key;    // Accessors/mutators for links.  Wrapped in methods so we can    // add the appropriate barriers as necessary.    Node *Next(int n)    // ...    void SetNext(int n, Node *x)    // ...    // No-barrier variants that can be safely used in a few locations.    Node *NoBarrier_Next(int n)    // ...    void NoBarrier_SetNext(int n, Node *x)    // ...  private:    // Array of length equal to the node height.  next_[0] is lowest level link.    port::AtomicPointer next_[1];};

Node中key存储了节点中实际存储的KV值,next_为一个变长数组(struct最后一个元素如果是数组,则长度可变),用于存储该节点在不同高度的链表上指向下一个节点的指针。

创建Node使用了一个很神奇的函数:

template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::NewNode(const Key &key, int height){    char *mem = arena_->AllocateAligned(        sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1));    return new (mem) Node(key);}

这个函数首先分配了一块对齐的内存,然后通过new (mem) Node(key)这个语句创建一个节点,我也不是很懂。

Iterator类

其次是iterator类:

    // Iteration over the contents of a skip list    class Iterator    {      public:        // ...         // Returns true iff the iterator is positioned at a valid node.        bool Valid() const;        // Returns the key at the current position.        // REQUIRES: Valid()        const Key &key() const;        // Advances to the next position.        // REQUIRES: Valid()        void Next();        // Advances to the previous position.        // REQUIRES: Valid()        void Prev();        // Advance to the first entry with a key >= target        void Seek(const Key &target);        // Position at the first entry in list.        // Final state of iterator is Valid() iff list is not empty.        void SeekToFirst();        // Position at the last entry in list.        // Final state of iterator is Valid() iff list is not empty.        void SeekToLast();      private:        const SkipList *list_;        Node *node_;        // ...    };

这个类主要有两个成员,一个list_指向迭代的跳表,node_指向当前迭代的节点。

对跳表的遍历操作需要获取一个Iterator类型的迭代器,然后通过调用Iterator类提供的接口函数对跳表进行遍历。而Iterator类提供的接口函数实际上通过调用SkipList类提供的私有接口函数实现。

SkipList类的私有接口函数如下:

template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindGreaterOrEqual(const Key &key, Node **prev)    const{    Node *x = head_;    int level = GetMaxHeight() - 1;    while (true)    {        Node *next = x->Next(level);        if (KeyIsAfterNode(key, next))        {            // Keep searching in this list            x = next;        }        else        {            if (prev != nullptr)                prev[level] = x;            if (level == 0)            {                return next;            }            else            {                // Switch to next list                level--;            }        }    }}template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindLessThan(const Key &key) const{    Node *x = head_;    int level = GetMaxHeight() - 1;    while (true)    {        assert(x == head_ || compare_(x->key, key) < 0);        Node *next = x->Next(level);        if (next == nullptr || compare_(next->key, key) >= 0)        {            if (level == 0)            {                return x;            }            else            {                // Switch to next list                level--;            }        }        else        {            x = next;        }    }}template <typename Key, class Comparator>typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindLast()    const{    Node *x = head_;    int level = GetMaxHeight() - 1;    while (true)    {        Node *next = x->Next(level);        if (next == nullptr)        {            if (level == 0)            {                return x;            }            else            {                // Switch to next list                level--;            }        }        else        {            x = next;        }    }}

这三个函数的实现比较简单,类似于链表的遍历,就不做具体分析了。

另外,跳表还有两个接口函数用于插入和检查是否包含某个key,这两个函数的实现也使用了跳表的私有接口函数:

template <typename Key, class Comparator>void SkipList<Key, Comparator>::Insert(const Key &key)template <typename Key, class Comparator>bool SkipList<Key, Comparator>::Contains(const Key &key) const

MemTable成员函数

分析完以上这些,我们开始分析MemTable提供的接口函数:

void MemTable::Add(SequenceNumber s, ValueType type,                   const Slice &key,                   const Slice &value){    // Format of an entry is concatenation of:    //  key_size     : varint32 of internal_key.size()    //  key bytes    : char[internal_key.size()]    //  value_size   : varint32 of value.size()    //  value bytes  : char[value.size()]    size_t key_size = key.size();    size_t val_size = value.size();    size_t internal_key_size = key_size + 8;    const size_t encoded_len =        VarintLength(internal_key_size) + internal_key_size +        VarintLength(val_size) + val_size;    char *buf = arena_.Allocate(encoded_len);    char *p = EncodeVarint32(buf, internal_key_size);    memcpy(p, key.data(), key_size);    p += key_size;    EncodeFixed64(p, (s << 8) | type);    p += 8;    p = EncodeVarint32(p, val_size);    memcpy(p, value.data(), val_size);    assert(p + val_size == buf + encoded_len);    table_.Insert(buf);}

Add函数将key和value组成key_size | sequencenumber | type | key | value_size | value这样的形式,然后调用SkipList类提供的Insert函数插入跳表中。

bool MemTable::Get(const LookupKey &key, std::string *value, Status *s)

Get函数先用跳表的迭代器找到包含key的节点:

    Slice memkey = key.memtable_key();    Table::Iterator iter(&table_);    iter.Seek(memkey.data());

然后将节点中的数据解码后封装为Slice存入value:

    if (iter.Valid())    {        // entry format is:        //    klength  varint32        //    userkey  char[klength]        //    tag      uint64        //    vlength  varint32        //    value    char[vlength]        // Check that it belongs to same user key.  We do not check the        // sequence number since the Seek() call above should have skipped        // all entries with overly large sequence numbers.        const char *entry = iter.key();        uint32_t key_length;        const char *key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);        if (comparator_.comparator.user_comparator()->Compare(                Slice(key_ptr, key_length - 8),                key.user_key()) == 0)        {            // Correct user key            const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);            switch (static_cast<ValueType>(tag & 0xff))            {            case kTypeValue:            {                Slice v = GetLengthPrefixedSlice(key_ptr + key_length);                value->assign(v.data(), v.size());                return true;            }            case kTypeDeletion:                *s = Status::NotFound(Slice());                return true;            }        }    }    return false;
MemTableIterator类

MemTable还封装了一个MemTableIterator类用于MemTable的迭代:

class MemTableIterator : public Iterator{  public:    explicit MemTableIterator(MemTable::Table *table) : iter_(table) {}    virtual bool Valid() const { return iter_.Valid(); }    virtual void Seek(const Slice &k) { iter_.Seek(EncodeKey(&tmp_, k)); }    virtual void SeekToFirst() { iter_.SeekToFirst(); }    virtual void SeekToLast() { iter_.SeekToLast(); }    virtual void Next() { iter_.Next(); }    virtual void Prev() { iter_.Prev(); }    virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); }    virtual Slice value() const    {        Slice key_slice = GetLengthPrefixedSlice(iter_.key());        return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());    }    virtual Status status() const { return Status::OK(); }  private:    MemTable::Table::Iterator iter_;    std::string tmp_; // For passing to EncodeKey    // No copying allowed    MemTableIterator(const MemTableIterator &);    void operator=(const MemTableIterator &);};

遍历MemTable就需要获得一个MemTableIterator类的迭代器,然后通过调用MemTableIterator类提供的接口函数进行遍历。MemTableIterator类其实就是实现了对跳表迭代器的封装,成员为一个跳表的迭代器iter_,接口函数通过调用跳表迭代器的接口函数实现。

227 Love u

广告 广告

评论区