Skip to content

Tuanzi-bug/TuanKV

Repository files navigation

KV存储

相关流程记录

  1. 启动bitcask引擎实例

    1. 对配置项进行校验
      1. 校验地址是否为空
      2. 数据文件阈值无效
    2. 对数据目录进行校验,不存在则创建
    3. 初始化DB结构体
      1. 对索引进行初始化
      2. 配置项添加索引类型
  2. 加载数据目录中的文件

    1. 读取目录文件
    2. 遍历文件,找到目标文件,搜索条件以.data结尾
      1. 找到文件,对文件名进行分割,获取文件ID
      2. 处理错误,并存放ID
    3. 文件ID进行排序
    4. 遍历文件ID并打开对应数据文件
      1. 最后一个ID放置于活跃文件中
      2. 其余放置于旧的文件中
  3. 数据文件中加载索引

    1. 获取文件ID
    2. 找到对应文件
    3. 循环处理文件内容,读取数据
      1. 构建内存索引
      2. 判断记录类型,保持进入索引中
      3. 更新偏移值
    4. 判断当前活跃文件,需要更新当前文件偏移值
  4. 数据删除流程

    1. 判断key的有效性
    2. 从内存索引中查找key是否存在,不存在直接返回
    3. 构造数据记录信息,写入数据文件中
    4. 从内存索引中将对应key进行删除
  5. IO管理器

    1. 在IOManager的接口中新增一个打开对应IO的方法- NewIOManager
    2. 目前先简单处理,直接调用NewFileIOManager即可,后续需要新增在进行判断
  6. 打开数据文件 OpenDataFile

    1. 根据path和Id生成一个完整的文件名称
    2. 初始化IOManager,直接返回错误
    3. 初始化DataFile结构体,并返回
  7. 读取LogRecord

    1. 设置头部size为固定值:maxLogRecordHeaderSize
    2. 封装一个读取指定字节数的函数 readNBytes
    3. 读取头部信息,拿到对应信息
    4. 写一个解码相关函数 decodeLogRecordHeader,以及一个头部信息的结构体 logRecordHeader
    5. 两个条件,说明读取到了文件末尾,返回EOF错误
    6. 取出key和Value的长度
    7. 读取实际的用户存储的Key和value数据
    8. 校验crc是否正确,定义新的方法getlogRecordCRC拿到crc的值
    9. 如果不相等需要返回错误,定义错误类型 ErrInvalCRC
  8. 写write方法需要更新字段

  9. 特殊处理

    1. 在ioManager添加一个获取文件大小方法
    2. 需要重新添加一个判断,如果读取长度大于文件长度,只需要读取到实际长度
  10. 写相关单元测试

    1. 打开文件
    2. 写文件
  11. LogRecord编解码

    1. 初始化一个header部分字节数组,创建一个header数组
    2. 从第五个字节开始存储 Type类型
    3. 从后面开始存放后面的数据 key和value的长度信息
    4. 返回一条信息实际大小
    5. 重新创建一个数组
    6. 将header部分的内容拷贝过来,将数据key和value一起拷贝过去
    7. 生成crc,除了前面4个字节的
    8. 使用小端序进行存储信息,并返回信息与长度
  12. 解码的流程

    1. 首先判断长度是否满足4字节,直接返回
    2. 构建一个logRecordHeader结构体
    3. 解码key和value的长度信息
    4. 返回结构体和长度信息
  13. getLogRecordCRC

    1. 首先进行编码header信息
    2. 对key和value数据信息进行更新写入
    3. 返回crc值
  14. 添加单元测试

    1. 编码测试
      1. 正常情况
      2. value为空情况
      3. delete删除情况下
    2. 解码测试
  15. 定义索引迭代器接口

    1. 定义btree树索引迭代器
      1. 当前下标 currIndex
      2. reverse 反向遍历标志
      3. value 存放key+位置索引信息
    2. 实现接口的每一个方法
    3. 实现创建btree索引迭代器的函数
    4. 在Index的接口添加一个返回迭代器的一个方法 Iterator
    5. 增加单元测试
  16. 对外的迭代器

    1. 设置迭代器配置项
    2. 定义默认索引迭代器
    3. 面向用户方法与内部一致
      1. 对Value方法需要返回对应的值
      2. 根据get方法进行简化,根据位置信息获取value值
    4. 需要增加判断是否满足前缀要求
    5. 获取数据库所有的key--ListKeys
    6. 索引新增一个Size方法,方便获取长度
    7. 对数据库信息执行指定操作Fold
    8. 不全数据库close与sync方法
    9. close:关闭当前活跃文件以及遍历并关闭旧的文件,该方法需要加锁
    10. sync:对当前活跃文件进行持久化
  17. 原子写数据

    1. 创建新的文件 batch
    2. 创建WriteBatch结构体,内容包括锁,配置项、暂存数据pendingWrites、db
    3. 初始化NewWriteBatch
    4. Put方法
      1. 首先判断key的有效性
      2. 加写锁
      3. 创建一个记录暂存起来
    5. Delete方法
      1. 判断key的有效性
      2. 加写锁
      3. 创建一个删除记录暂存起来
      4. 如果数据不存在,对暂存内的记录进行删除
    6. Commit方法
      1. 加锁
      2. 首先对数据进行校验,不存在就直接返回,超过配置项要求,返回对应错误ErrExceedMaxBatchNum
      3. 对全局db进行加锁,保证串行化
      4. 获取当前事务的序列号,并进行加一,通过atomic.AddUint64
      5. 对数据进行暂存,写数据到数据文件中
      6. 需要新建一个函数:原始key+序列号进行编码。前面放序列号后面放key
      7. 需要写入一条标识事务完成的数据,定义一个常量txnFinKey,加入新的类型type
      8. 根据配置进行持久化
      9. 更新对应的内存索引,需要判断当前记录是否是删除状态。正常状态就更新,如果删除状态进行删除
      10. 清空暂存数据
    7. 当db启动时候,需要对事务进行处理
      1. 解析key,拿到序列号
      2. 设置一个常量,对常用的key添加一个非事务的序列号
      3. 判断序列号是否是事务,非事务是直接提交,写入文件中,对代码进行提取成函数
      4. 是事务状态,非完成状态,暂存事务数据(定义一个单独结构体:包含记录与位置信息),完成状态,写入文件中
      5. 对事务序列号进行更新,保证获取最新的序列号
    8. 单元测试
  18. merge功能实现

    1. 判断是否存在活跃文件
    2. 加锁
    3. 添加标识,是否有merge正在进行,返回自定义错误ErrMergeIsProgress,结束后把标识设置false
    4. 对活跃文件进行处理,持久化。把当前活跃文件放置在旧的里面,重新打开新的活跃文件
    5. 取出所有需要merge的活跃文件
    6. 对merge文件进行排序
    7. 新建一个方法:获取merge目录路径
      1. 获取父级目录地址
      2. 获取文件名称
      3. 组合临时merge路径
    8. 如果存在,将之前的路径进行删除
    9. 新建一个merge path的目录
    10. 打开临时的bitcask实例
    11. 修改配置信息
    12. 遍历merge的文件,处理过程与加载引擎一致
    13. 与内存的索引信息进行比对,有效进行写入。不需要写入事务标记的信息
    14. 将索引写入hit文件
      1. 写一个新的方法:打开hit索引文件,与OpenDataFile方法进行抽离同一过程代码
      2. 新增一个写入hint文件的logRecord的方法
      3. 构造logRecord的记录
      4. 新增一个对logRecordPos编码的方法以及解码的方法
    15. 打开hit文件存储索引。
    16. 运行持久化,保证数据写入磁盘中
    17. 增加merge完成的文件,并写入相关记录标识已经完成
    18. 记录没有参与merge的文档id
  19. 在启动时候需要对merge数据进行处理

  20. 获取merge数据路径,不存在直接返回

  21. 读取目录

  22. 查看是否存在完成merge标识的文件,没有直接返回nil

  23. 新增一个获取第一个没有merge文件的id

  24. 删除旧的数据文件(小于id的文件)

  25. 将新的数据文件移动到数据目录中

  26. 加载引擎之前,加载merge的数据目录,从hint文件中加载索引

  27. 内存索引优化

    1. 添加自适应基树索引,创建结构体AdaptiveRadixTree
    2. 实现索引对应的四个方法【迭代器方法可以仿照btree树进行编写】,注意:需要加读锁与写锁
    3. 创建一个初始化基础树方法
    4. 写相关的单元测试
  28. 添加 B+树 索引【封装了bbolt库】

    1. 实现索引接口的方法【迭代器,可以使用bbotdb提供的】
    2. 完成相关单元测试
  29. 加入b+树索引后,是需要从数据文件中添加索引,在关闭时候保存当前序列号

  30. 添加一个函数,读取序列号文件

  31. 新增一个方法:读取序列号,当索引是b+树时候

  32. 新增一个字段,判断是否存在事务序列号文件,如果不存在禁用WriteBatch

  33. 新增一个标识,是否第一次初始化此数据目录

  34. 文件IO优化

    1. 文件锁:保证存储引擎实例在单进程执行
      1. 添加flock库
      2. 在打开db函数中,添加当前目录是否正在使用,进行new。
      3. 自定义文件锁名称 fileLockName
      4. 调用treLock,返回特定错误 ErrDatabaseIsUsing
      5. 将文件锁添加到db的结构体当中
      6. 在关闭数据库时候,将文件锁进行释放掉
      7. 写单元测试
    2. 对持久化策略进行改动
      1. 添加选项BytesPerSync,累计写到阈值再进行持久化
      2. 默认是0
      3. 再db结构体中添加 新的字段bytesWrite 记录写入多少字节
      4. 再appendlog进行判断,条件:是否持久化 阈值大于0 当前写入字节大于阈值
      5. 持久化后累计值清空
    3. 对启动加速
      1. 定义MMap结构体
      2. 实现iomannger对应的方法
      3. 实现新建mmap的方法
      4. 添加单元测试
      5. 初始化io函数中添加一个初始化类型,添加一个枚举io,根据类型生成不同io
      6. 对newDatafile 需要添加一个io类型,打开需要指定io类型,其他默认fileio
      7. 添加配置项 MMapAtStartup,添加一个默认值
      8. 加载过程中需要修改io类型
      9. 需要重置io类型,添加resetIoType 方法
      10. 对文件添加一个重置io类型
  35. 数据merge优化

    1. 修改索引方法的返回值,改成返回旧值
    2. 对应的索引器进行修改
    3. 修改相关的单元测试
    4. 在db结构体中添加一个字段 reclaimSize,递增旧值的size
    5. 对logRecordSize 添加一个字段 Size,同时需要修改编码解码函数
    6. 对于启动loadIndex函数,也是需要记录旧值
    7. 构造内存索引函数,进行修改
    8. 在batch里面也需要做同样的改造,对于Delete函数需要加上自己的key的size
    9. 添加单元测试
    10. 添加一个结构体Stat记录统计信息包含keyNum,DataFileNum、reclaimableSize、DiskSize
    11. 添加一个函数Stat函数返回,相关统计信息
    12. 对选项添加一个字段,DataFileMergeRatio 阈值,添加一个校验(不能大于1小于0)
    13. 对Merge函数,获取总数居量,新增一个工具方法,获取目录的大小,写相关单元测试
    14. 进行校验,总数据量是否达到阈值,如果小于返回自定义错误 ErrMergeRatioUnreached
    15. 在工具,新增一个工具函数 AvailableDiskSize 获取磁盘剩余可用空间大小,相关单元测试
    16. 在merge进行判断是否可以当前磁盘剩余空间是否可以容纳,如果不能返回自定义错误ErrNoEnoughSpaceForMerge
    17. 单元测试
  36. 数据备份

    1. 写一个拷贝数据目录工具函数
    2. 判断目标文件夹不存在就重新创建一个
    3. 循环遍历原路径,把不需要文件排除
    4. 如果是文件夹,在目标路径创建文件夹
    5. 常规文件,读取并重新写入目标路径中
    6. 在db新增一个方法
    7. 加锁,排除文件锁文件
    8. 添加单元测试
  37. HTTP接口

    1. 初始化DB实例,函数
    2. 使用Go语言自带的Http库,添加相关方法,包括:Put、Get、Delete、Stat、ListKeys
    3. 后续可以根据需求进行添加
    4. Put:校验请求方式,然后对请求主体进行json解码,遍历map将值写入数据库中
    5. Get:校验请求方式,获取请求的key,获取数据,返回json数据(需设置头 Content-Type)
    6. 其余方式可参考进行写
  38. 基准测试

  39. redis String

    1. 新建一个types文件,各个类型实现的方法,枚举类型 String Hash...
    2. 创建RedisDataStructure,以及New函数
    3. 创建Set函数,key ttl value
    4. 编码 1b 存储类型,过期时间,与value组合在一起
    5. 调用存储引擎接口进行写入
    6. Get方法实现,通过存储引擎拿到数据
    7. 对数据进行解码,首先判断类型,返回自定义错误ErrWrongTypeOperation、判断是否过期,返回nil
    8. 对于通用命令 Delete type,可以创建generic文件
    9. Delete 调用接口
    10. Type 先获取value,返回第一个字节
    11. 写单元测试
  40. Hash

    1. 创建 meta 文件,存储元数据结构体
    2. 写编码与解码的方法
    3. 编码:数据类型+过期时间+版本号+数据量,如果是list还需要添加专用字段
    4. 解码:返回数据结构
    5. 实现通用方法 findMetaData ,判断key是否存在,判断元数据是否存在,根据存在情况,返回meta数据
    6. 实现对hash的key进行编解码。
    7. 实现 HSet :先获取元数据,构造hash key, 编码,然后
    8. 单元测试
  41. Set

    1. SAdd: 查找元数据,返回元数据,构造set的key
    2. SRem
    3. SIsMember
  42. redis 协议 参考:

相关参考

其他

git commit 规范参考

commit message格式

():

image

About

用Golang实现kv存储引擎

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages