LevelDB之WriteBatch-事务分离的事务处理器

发布时间:2024年01月17日

事务分离的事务处理器

在看levelDB源码时发现一个很有意思的事情,levelDB的示例代码时不时的就会使用事务处理器WriteBatch来处理需要同时执行多个事务的情况,那么这个WriteBatch是何方神圣,为什么levelDB那么喜欢用它来处理事务相关问题,我们将代码单独抽离出来一探究竟!

WriteBatch的定义

WriteBatch的类定义如下:

class WriteBatch {
public:
    class LEVELDB_EXPORT Handler {
        public:
        virtual ~Handler() = default;

        virtual void Put(const leveldb::Slice &key, const leveldb::Slice &value) = 0;

        virtual void Delete(const leveldb::Slice &key) = 0;
    };

    WriteBatch();

    // 支持copy构造函数
    WriteBatch(const WriteBatch &) = default;
    // 支持赋值操作
    WriteBatch &operator=(const WriteBatch &) = default;

    ~WriteBatch();

    // Put 将Key-value形式的值存储在数据库中
    void Put(const leveldb::Slice &key, const leveldb::Slice &value);
    // 当某个值不想要时,按照key值将对应的键值对删除
    void Delete(const leveldb::Slice &key);
    // 将 batch中所有的缓存清理,并预留出长度和序列号字段足够的长度
    void Clear();
    // 获取batch内部指标数据长度,也就是获取rep_的大小
    size_t ApproximateSize() const;
    // 当有多个batch想合并时,可以使用append将batch添加到另外衣蛾batch后面
    void Append(const WriteBatch &source);
    // 对batch进行迭代,这样能够按照类型取出所有batch里面的值
    leveldb::Status Iterate(Handler *handler) const;

private:
    friend class WriteBatchInternal;

    std::string rep_;  // See comment in write_batch.cc for the format of rep_
};

这里我们需要注意的事Handler类型和friend class WriteBatchInternal;两种特殊用法,Handler声明在WriteBatch内,也说明Handler类只用于WriteBatch(从属关系上属于),友元类WriteBatchInternal其实是IMPL设计模式的一种变种,这里就是为了将不相关的功能性函数和实际的业务数据之间进行分离,同时有能提供兼容性,提高类的可复用性。

WriteBatchInternal的定义

WriteBatchInternal其实就是将WriteBatch中和具体业务数据不相关的功能函数进行抽离,通过这种方法能够有效的降低代码的耦合性,还能提高代码的可阅读性,其具体定义如下:

class WriteBatchInternal {
public:
    // 返回入参WriteBatch存储的事务个数
    static uint32_t Count(const WriteBatch* batch);
    // 设置当前writeBatch中包含事务的个数
    static void SetCount(WriteBatch* batch, uint32_t n);
    // 将batch开头的序列号返回
    static leveldb::SequenceNumber Sequence(const WriteBatch* batch);
    // 将一个特殊的数字,作为序列号存储在batch的开头
    static void SetSequence(WriteBatch* batch, leveldb::SequenceNumber seq);

    static leveldb::Slice Contents(const WriteBatch* batch) {
        return leveldb::Slice(batch->rep_);
    }

    static size_t ByteSize(const WriteBatch* batch) {
        return batch->rep_.size();
    }

    static void SetContents(WriteBatch* batch, const leveldb::Slice& contents);
    // 仿写, 不需要进行真正的数据存储,这里去除后面的数据表项
    static leveldb::Status InsertInfo(const WriteBatch* batch);

    static void Append(WriteBatch* dst, const WriteBatch* src);

};

WriteBatchInternal的实现

WriteBatchInternal::Count与WriteBatchInternal::SetCount

我们知道WriteBatch数据的记录完全依赖的是一个string类型的成员变量,那么因为是要记录事务,那么具体记录多少事务肯定要有具体的数量,否则记录结束之后怎么才能取出,SetCount和Count其实就是将string中的4个字节转换为uint32_t类型的数据,用于记录事务处理器中添加了多少事务操作。

uint32_t WriteBatchInternal::Count(const WriteBatch *batch) {
    //   8-byte sequence number
    const auto *const buffer =  reinterpret_cast<const uint8_t *>(batch->rep_.data() + 8);

    // Recent clang and gcc optimize this to a single mov / ldr instruction.
    return (static_cast<uint32_t>(buffer[0])) |
            (static_cast<uint32_t>(buffer[1]) << 8) |
            (static_cast<uint32_t>(buffer[2]) << 16) |
            (static_cast<uint32_t>(buffer[3]) << 24);
}
void WriteBatchInternal::SetCount(WriteBatch *batch, uint32_t n) {
    // 前面8byte sequence number
    // 这里和上边的去比啊,这里使用[]取值是因为[]取出的非const 引用
    auto *const buffer =  reinterpret_cast<uint8_t *>(&batch->rep_[8]);
    // 将uint32_t类型数据每8个字节存储到一个chat里面
    buffer[0] = static_cast<uint8_t>(n);
    buffer[1] = static_cast<uint8_t>(n >> 8);
    buffer[2] = static_cast<uint8_t>(n >> 16);
    buffer[3] = static_cast<uint8_t>(n >> 24);

}

WriteBatchInternal::Sequence与WriteBatchInternal::SetSequence

除了事务的计数,事务处理器WriteBatch中还有一个序列号,用于表明事务处理的事务号。不过事务处理号是一个64位的数据,这里的leveldb::SequenceNumber其实就是一个64位的数据

leveldb::SequenceNumber WriteBatchInternal::Sequence(const WriteBatch *batch) {
    const auto *const buffer = reinterpret_cast<const uint8_t *>(batch->rep_.data());

    // Recent clang and gcc optimize this to a single mov / ldr instruction.
    return (static_cast<uint64_t>(buffer[0])) |
        (static_cast<uint64_t>(buffer[1]) << 8) |
        (static_cast<uint64_t>(buffer[2]) << 16) |
        (static_cast<uint64_t>(buffer[3]) << 24) |
        (static_cast<uint64_t>(buffer[4]) << 32) |
        (static_cast<uint64_t>(buffer[5]) << 40) |
        (static_cast<uint64_t>(buffer[6]) << 48) |
        (static_cast<uint64_t>(buffer[7]) << 56);
}

void WriteBatchInternal::SetSequence(WriteBatch *batch, leveldb::SequenceNumber seq) {
    auto *const buffer = reinterpret_cast<uint8_t *>(&batch->rep_[0]);

    // Recent clang and gcc optimize this to a single mov / str instruction.
    buffer[0] = static_cast<uint8_t>(seq);
    buffer[1] = static_cast<uint8_t>(seq >> 8);
    buffer[2] = static_cast<uint8_t>(seq >> 16);
    buffer[3] = static_cast<uint8_t>(seq >> 24);
    buffer[4] = static_cast<uint8_t>(seq >> 32);
    buffer[5] = static_cast<uint8_t>(seq >> 40);
    buffer[6] = static_cast<uint8_t>(seq >> 48);
    buffer[7] = static_cast<uint8_t>(seq >> 56);
}

WriteBatchInternal::SetContents与WriteBatchInternal::Contents

通过以上两个函数,我们知道在levelDB中有4字节的计数值,有8字节的序列号值,那么WriteBatch中rep_成员变量的前12字节就是预留出来供计数和序列号使用。

static leveldb::Slice Contents(const WriteBatch* batch) {
    return leveldb::Slice(batch->rep_);
}

void WriteBatchInternal::SetContents(WriteBatch *batch, const leveldb::Slice &contents) {
    assert(contents.size() >= kHeader);
    batch->rep_.assign(contents.data(), contents.size());
}

WriteBatchInternal::Append

当想把一个WriteBatch附加到另外一个之后时,需要考虑一下rep_成员变量的特殊性通过以上两个函数,我们知道在levelDB中有4字节的计数值,有8字节的序列号值,那么WriteBatch中rep_成员变量的前12字节就是预留出来供计数和序列号使用。想要把一个WriteBatch附加到另外一个后面,需要完成一下两件事情:

  1. 去除后面WriteBatch的前12头部字节
  2. 将后面WriteBatch的计数Count添加到被添加的WriteBatch上
void WriteBatchInternal::Append(WriteBatch *dst, const WriteBatch *src) {
    SetCount(dst, Count(dst) + Count(src));
    assert(src->rep_.size() >= kHeader);
    // 两个Writebatch合并时,需要剔除多于的头部信息, 4字节长度 + 8字节序列号
    dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}

WriteBatchInternal::InsertInfo

在这里插入图片描述

桥接模式是一种结构型设计模式, 可将一个大类或一系列紧密相关的类拆分为抽象和实现两个独立的层次结构, 从而能在开发时分别使用。

InsertInfo接口就是桥接模式的集大成者,也可以说是桥接模式的经典使用方式。

定义如下

leveldb::Status WriteBatchInternal::InsertInfo(const WriteBatch *batch, void * lpFraud)

我们知道WriteBatch::Iterate只接受Handler类型的入参,想要处理lpFraud类型的数据,需要一层转换。这个时候FraudIterate就派上用场了,FraudIterate是对Handler的实现。

// 用于实现金蝉脱壳
class FraudIterate : public WriteBatch::Handler {
    public:
    explicit FraudIterate(void * lpFraud) : m_lpFraud(lpFraud){

    }
    ~FraudIterate() override = default;

    void Put(const leveldb::Slice &key, const leveldb::Slice &value) override {
        std::cout << "put : " << key.ToString() << " : " << value.ToString() << std::endl;
        // 这里实际上调用 lpFraud指向的对象来处理Put进来的数据
    }

    void Delete(const leveldb::Slice &key) override {
        std::cout << "Delete : " << key.ToString() << std::endl;
        // 这里实际上调用lpFraud指向的对象来处理 要删除的数据
    }

    private:
    void *m_lpFraud{nullptr};
};

通过FraudIterate的封装我们就能实现Iterate接口操作FraudIterate对象,FraudIterate对象操作lpFraud,从而达到Iterate接口能够操作lpFraud的目的,这也是桥接模式使用的经典场景。

具体实现如下:

leveldb::Status WriteBatchInternal::InsertInfo(const WriteBatch *batch, void * lpFraud) {
    // FraudIterate 相当于一个壳,如果类比设计模式,这里用的就 桥接模式(Bridge Pattern)
    FraudIterate inserter(lpFraud);
    return batch->Iterate(&inserter);
}

WriteBatch的实现

在每次构造WriteBatch时,都需要预先调用一下Clear接口,这一点非常重要,在WriteBatch中rep_ 并不是作为普通的字符串使用的,里面可能存储了 ’\0’ 等值,因此每次在构造对象时,需要将rep_ 先清空,然后在预留足够的头部长度用来指定rep_ 序列号和存储数据的个数。

WriteBatch::WriteBatch() {
    Clear();
}
void WriteBatch::Clear() {
    // 清空对象
    rep_.clear();
    // 预先设置为指定的头长度
    rep_.resize(kHeader);
}

WriteBatch::Put

WriteBatch::Put接口主要是想WriteBatch对象中添加key-value键值对。

要想设置键值对,首先要设置计数值,每次Put都会增加一次计数值,这样才能保证计数值和实际存储的键值对一直是相等的关系

WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1)

然后要说明本次存储的是什么类型的值,这里是put的值,因此设置类型为leveldb::kTypeValue

rep_.push_back(static_cast<char>(leveldb::kTypeValue))

接下来就是将key和value值设置到rep_ 成员变量里面,这里借助的函数是:

void PutLengthPrefixedSlice(std::string *dst, const leveldb::Slice &value)

这个函数看是无关紧要,其实是很值得学习的地方,如何使用给一个string对象存储序列化的数据的秘密就在这个函数的里面。

void WriteBatch::Put(const leveldb::Slice &key, const leveldb::Slice &value) {
    // 每次put, batch的计数加1
    WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
    // kTypeValue 是put Deletion 是删除
    rep_.push_back(static_cast<char>(leveldb::kTypeValue));
    // 找到所有
    WRITE_BATCH_DEMO::PutLengthPrefixedSlice(&rep_, key);
    WRITE_BATCH_DEMO::PutLengthPrefixedSlice(&rep_, value);
}

PutLengthPrefixedSlice的具体实现如下:

void PutLengthPrefixedSlice(std::string *dst, const leveldb::Slice &value) {
    // 1. 存储数据长度
    PutVarint32(dst, value.size());
    // 2. 存储具体的value值
    dst->append(value.data(), value.size());
}

void PutVarint32(std::string *dst, uint32_t v) {
    char buf[5];
    char *ptr = EncodeVarint32(buf, v);
    dst->append(buf, ptr - buf);
}
// 最高位为1一直到非1停止就是长度的编码
// 能这样编码是因为Key值都是字符串,能保证不会大于128
// 这里将低位放到前面是有特殊用以的,当长度大于128时,我们只需要按照最高位是否是128进行查找
// 当找到最高位为非128时,说明找到了所有的长度字段,这时包含最后一个非128开头的字符在内所有的字符就是这次的长度字段
// 这种歌编码方式用在网络发送,数据序列化中非常有用
// 可以说是序列化数组中同时存储长度和其他字符的完美解决方案
char *EncodeVarint32(char *dst, uint32_t v) {
    // Operate on characters as unsigneds
    uint8_t *ptr = reinterpret_cast<uint8_t *>(dst);
    static const int B = 128;
    if (v < (1 << 7)) {
        *(ptr++) = v;
    } else if (v < (1 << 14)) {
        *(ptr++) = v | B;
        *(ptr++) = v >> 7;
    } else if (v < (1 << 21)) {
        *(ptr++) = v | B;
        *(ptr++) = (v >> 7) | B;
        *(ptr++) = v >> 14;
    } else if (v < (1 << 28)) {
        *(ptr++) = v | B;
        *(ptr++) = (v >> 7) | B;
        *(ptr++) = (v >> 14) | B;
        *(ptr++) = v >> 21;
    } else {
        *(ptr++) = v | B;
        *(ptr++) = (v >> 7) | B;
        *(ptr++) = (v >> 14) | B;
        *(ptr++) = (v >> 21) | B;
        *(ptr++) = v >> 28;
    }
    return reinterpret_cast<char *>(ptr);
}

PutLengthPrefixedSlice又将步骤分为了两步,首先存储了数据的长度,再存储了具体的value值,存储value值这个比较常见,这里不进行过多说明,重点在这个存储数据长度。

  • 存储进去了如何就能保证取出来的时候刚好是自己想要的数据长度?
  • 你咋能知道用几个字节来存储长度?
  • 取出来记录长度的字节之后怎样才能恢复出来原来存储的长度值?

而这些问题的解决方案都在PutVarint32函数里面,当然你可以偷懒,直接无论长度为多少,我都直接用四个字节来表示,但是存储的Key值普遍是小于128的,这样当大量短的key值占用4个字节的长度存储,浪费的内存就非常可观了,因此这里非常有必要使用变长字段的编码方式来存储key值的长度,PutVarint32则完美的解决了这个问题。

PutVarint32的变长编码方式可以用途表示如下:
在这里插入图片描述
每次取出一个8bit一个字节的数据进行编码,但是最高位用来表示该位是否为长度位置,那么一个uint32_t最高需要 4 ? 8 = 32 4*8=32 4?8=32 位来表示长度,用一个最高位表示是否被占用之后就只剩下七位来表示长度了,那么为了能最多表示uint32_t最大值,就需要大于32bit来编码长度,所以这里最多可以用五字节来编码长度,那么可用bit为 5 ? 7 = 35 5*7 = 35 5?7=35 远大于uint32_t需要的32bits。

这里其实就是做了一个将8bit类型存储的数据,转换为按照7bit类型存储的数据。编码好之后,如果我们需要解码,就只需找最高位为1的字节找到完之后再夺取一个最高位为0的字节组成长度编码,然后反向的解码长度即可。

WriteBatch::Delete

Delete执行的是Put的反向操作,但是相对Put而言Delete不需要提供对应的Value值,只需要提供Key值即可;

void WriteBatch::Delete(const leveldb::Slice &key) {
    WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
    rep_.push_back(static_cast<char>(leveldb::kTypeDeletion));
    WRITE_BATCH_DEMO::PutLengthPrefixedSlice(&rep_, key);
}

Delete和Put接口操作一样,只是这里在存储类型时将类型存替换为leveldb::kTypeDeletion删除类型即可。

WriteBatch::Iterate

除了Put和Delete接口唯一需要进行详细说明的就是迭代器接口了。

ApproximateSize接口用来获取rep_ 存储字节个数,就是对string对象size函数的封装。

size_t WriteBatch::ApproximateSize() const {
    return rep_.size();
}

Append用来将入参的WriteBatch对象追加到当前的对象中,内部是对WriteBatchInternal::Append的封装,具体可以参见WriteBatchInternal::Append的实现

void WriteBatch::Append(const WriteBatch &source) {
    WriteBatchInternal::Append(this, &source);
}

WriteBatch::Iterate接口则是对rep_ 成员变量的逆向操作,将rep_ 中存储的一系列Put和Delete操作进行逆向解码,接下来我们结合代码看下具体有哪些步骤

// 通过,Handler将具体业务和实现分离开来,这样WriteBatch就不需要关心具体业务的东西
// 只需要处理序列化的Put和Get即可,具体的功能交给Handler实现
// 说明:这里只是一个伪事务处理器。如果是一个真正的事务处理器,当执行过程中出现问题,事务处理能将执行的过程逆行执行一遍,将数据库恢复成原来的状态
leveldb::Status WriteBatch::Iterate(Handler *handler) const {
    // 1. 将rep_ 转化为 Slice切片以备后面使用
    leveldb::Slice input(rep_);

    if (input.size() < kHeader) {
        return leveldb::Status::Corruption("malformed WriteBatch (too small)");
    }

    // 2. 剔除序列化相关的无用数据
    input.remove_prefix(kHeader);
    leveldb::Slice key, value;
    int32_t found = 0;

    // 3. 循环处理所有rep_中的事务(key value值)
    while (!input.empty()) {
        // 执行次数增加
        found ++;
        // 取出tag,用于判断接下来需要处理数据的类型
        char tag = input[0];
        input.remove_prefix(1);

        switch (tag) {
            case leveldb::kTypeValue:
                if (WRITE_BATCH_DEMO::GetLengthPrefixedSlice(&input, &key) &&
                    WRITE_BATCH_DEMO::GetLengthPrefixedSlice(&input, &value)) {
                    handler->Put(key, value);
                } else {
                    return leveldb::Status::Corruption("Bad WriteBatch Put operation");
                }
                break;
            case leveldb::kTypeDeletion:
                if (WRITE_BATCH_DEMO::GetLengthPrefixedSlice(&input, &key)) {
                    handler->Delete(key);
                } else {
                    return leveldb::Status::Corruption("bad WriteBatch Delete operation");
                }
                break;
            default:
                return leveldb::Status::Corruption("unknown WriteBatch tag");

        }
    }

    if (found != WriteBatchInternal::Count(this)) {
        return leveldb::Status::Corruption("WriteBatch has wrong count");
    } else {
        return leveldb::Status::OK();
    }
}

这个函数的主体流程就是对rep_ 的反向操作,其中需要进行关注的就是 GetLengthPrefixedSlice接口的实现,该接口就是将rep_ 从Slice切片转化为具体的key-value值的关键。

bool GetLengthPrefixedSlice(leveldb::Slice *input, leveldb::Slice *result) {
    uint32_t len;
    // 1. 先获取出来数据长度
    if (WRITE_BATCH_DEMO::GetVarint32(input, &len) && input->size() >= len) {
        // 2. 根据长度字段从input中取出指定长度,构造出一个新的slice然后赋值给result指向的对象
        *result = leveldb::Slice(input->data(), len);
        // 3. input中剔除对应长度的数据
        input->remove_prefix(len);
        return true;
    } else {
        return false;
    }
}

GetLengthPrefixedSlice函数的关键是能够获取出来value值长度的GetVarint32函数

inline const char *GetVarint32Ptr(const char *p, const char *limit,
                                  uint32_t *value) {
    // 保证指针合法
    if (p < limit) {
        uint32_t result = *(reinterpret_cast<const uint8_t *>(p));
        // 处理比较特殊的情况,也就是首个字符最高位为0,这种也最为常见,所以单独拿出来处理
        // 能有效提升处理速度
        // 当首个字符最高位为0,说明长度是小于2^7的值,这个时候长度就是首字节的值,数据开始的地方就是首个字节之后的地方
        if ((result & 128) == 0) {
            *value = result;
            return p + 1;
        }
    }
    return GetVarint32PtrFallback(p, limit, value);
}
const char *GetVarint32PtrFallback(const char *p, const char *limit,
                                       uint32_t *value) {
    // 定义一个uint32_t的值用来接收长度数据
    uint32_t result = 0;
    for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
        // 先取出一个字节
        uint32_t byte = *(reinterpret_cast<const uint8_t *>(p));
        p++;
        // 如果最高位为 1 说明后面还有长度字节
        if (byte & 128) {
            // More bytes are present
            // 取出7位用来表示长度,并将其填充到result对应的位上
            result |= ((byte & 127) << shift);
        } else {
            // 能走到这里说明该字节的最高位为0,说明长度编码字段到这里已经结束,只需要将该字节的7位取走填充到result对应的位置上即可
            result |= (byte << shift);
            *value = result;
            return reinterpret_cast<const char *>(p);
        }
    }
    return nullptr;
}

测试

最后我们写一个测试案例来测试以上上述功能的实现是否正常

#include <iostream>

#include "write_batch_demo.h"

using namespace WRITE_BATCH_DEMO;

class HandlerDemo : public WRITE_BATCH_DEMO::WriteBatch::Handler {
public:
    ~HandlerDemo() override = default;

    void Put(const leveldb::Slice &key, const leveldb::Slice &value) override {
        std::cout << "put : " << key.ToString() << " : " << value.ToString() << std::endl;
    }

    void Delete(const leveldb::Slice &key) override {
        std::cout << "Delete : " << key.ToString() << std::endl;

    }
};

int main(int argc, char **argv) {

    WRITE_BATCH_DEMO::WriteBatch batch;
    WRITE_BATCH_DEMO::WriteBatch batch1;
    HandlerDemo handler;

    //
    auto key = leveldb::Slice("xiaoming");
    auto value = leveldb::Slice("21");
    batch.Put(key, value);
    batch.Put(leveldb::Slice("xiaohong"), leveldb::Slice("21"));
    batch.Put(leveldb::Slice("wanger"), leveldb::Slice("22"));
    batch.Put(leveldb::Slice("daxiong"), leveldb::Slice("24"));

    batch1.Put(leveldb::Slice("Batch1ForTest"), leveldb::Slice("18"));

    batch.Delete("xiaoming");

    std::cout << batch.ApproximateSize() << std::endl;
    batch.Append(batch1);
    std::cout << batch.ApproximateSize() << std::endl;

    std::cout << batch.Iterate(&handler).ToString() << std::endl;

    return 0;
}

输出结果如下:

71
89
put : xiaoming : 21
put : xiaohong : 21
put : wanger : 22
put : daxiong : 24
Delete : xiaoming
put : Batch1ForTest : 18
OK

Process finished with exit code 0

完美通过验证。

用途

通过对WriteBatch的学习,我们可以得到如下有用的知识点:

  • 对string进行编码,用一个string存储各种序列化的数据,如本文中的键值对
  • 对string类型的数据进行编码,这样向网络数据tcp流等都可以使用该方法进行编码,这样一个迭代器接口加上桥接模式,就能使自己的网络通信模型变得更加稳健。
  • 事务处理原理,如何将一些列操作打包成一个整体做成一个事务来处理

源码仓库: https://github.com/zzu-andrew/levelDB/tree/main/demo/batch_test

https://github.com/zzu-andrew/levelDB/tree/main/demo/batch_test

文章来源:https://blog.csdn.net/andrewgithub/article/details/135652213
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。