Skip to content

Commit

Permalink
Merge branch 'optimize-storage' into 'master'
Browse files Browse the repository at this point in the history
add memory engine

See merge request !10
  • Loading branch information
absolute8511 committed Sep 3, 2020
2 parents 9b6a1f4 + 823f279 commit a95f763
Show file tree
Hide file tree
Showing 51 changed files with 4,416 additions and 172 deletions.
906 changes: 906 additions & 0 deletions engine/btree.go

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions engine/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func rangeLimitIterator(i Iterator, r *Range, l *Limit, reverse bool) *RangeLimi
} else {
it.Iterator.Seek(r.Min)
if r.Type&common.RangeLOpen > 0 {
if it.Iterator.Valid() && bytes.Equal(it.Iterator.RefKey(), r.Min) {
if it.Iterator.Valid() && bytes.Compare(it.Iterator.RefKey(), r.Min) <= 0 {
it.Iterator.Next()
}
}
Expand All @@ -209,25 +209,26 @@ func rangeLimitIterator(i Iterator, r *Range, l *Limit, reverse bool) *RangeLimi
} else {
it.Iterator.SeekForPrev(r.Max)
if !it.Iterator.Valid() {
it.Iterator.SeekToLast()
it.Iterator.SeekToFirst()
if it.Iterator.Valid() && bytes.Compare(it.Iterator.RefKey(), r.Max) == 1 {
dbLog.Infof("iterator seek to last key %v should not great than seek to max %v", it.Iterator.RefKey(), r.Max)
}
}
if r.Type&common.RangeROpen > 0 {
if it.Iterator.Valid() && bytes.Equal(it.Iterator.RefKey(), r.Max) {
if it.Iterator.Valid() && bytes.Compare(it.Iterator.RefKey(), r.Max) >= 0 {
it.Iterator.Prev()
}
}
}
}
for i := 0; i < l.Offset; i++ {
if it.Iterator.Valid() {
if !it.reverse {
it.Iterator.Next()
} else {
it.Iterator.Prev()
}
if !it.Valid() {
break
}
if !it.reverse {
it.Iterator.Next()
} else {
it.Iterator.Prev()
}
}
return it
Expand Down
13 changes: 13 additions & 0 deletions engine/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@ import (
"github.com/youzan/ZanRedisDB/common"
)

var (
errDBEngClosed = errors.New("db engine is closed")
errIntNumber = errors.New("invalid integer")
)

type RefSlice interface {
// ref data
Data() []byte
Free()
// copied data if need
Bytes() []byte
}

const (
Expand Down Expand Up @@ -184,6 +192,7 @@ type KVEngine interface {
Exist(key []byte) (bool, error)
ExistNoLock(key []byte) (bool, error)
GetRef(key []byte) (RefSlice, error)
GetRefNoLock(key []byte) (RefSlice, error)
GetValueWithOp(key []byte, op func([]byte) error) error
GetValueWithOpNoLock(key []byte, op func([]byte) error) error
DeleteFilesInRange(rg CRange)
Expand All @@ -197,6 +206,8 @@ func NewKVEng(cfg *RockEngConfig) (KVEngine, error) {
return NewRockEng(cfg)
} else if cfg.EngineType == "pebble" {
return NewPebbleEng(cfg)
} else if cfg.EngineType == "mem" {
return NewMemEng(cfg)
}
return nil, errors.New("unknown engine type for: " + cfg.EngineType)
}
Expand All @@ -206,6 +217,8 @@ func NewSharedEngConfig(cfg RockOptions) (SharedRockConfig, error) {
return newSharedRockConfig(cfg), nil
} else if cfg.EngineType == "pebble" {
return newSharedPebblekConfig(cfg), nil
} else if cfg.EngineType == "mem" {
return newSharedMemConfig(cfg), nil
}
return nil, errors.New("unknown engine type for: " + cfg.EngineType)
}
152 changes: 152 additions & 0 deletions engine/kv_skiplist.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#include "kv_skiplist.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

static char* copyString(const char* str, size_t sz) {
char* result = (char*)(malloc(sizeof(char) * sz));
memcpy(result, str, sizeof(char) * sz);
return result;
}

void kv_skiplist_remove_node(skiplist_raw* l, kv_node* entry) {
// Detach `entry` from skiplist.
skiplist_erase_node(l, &entry->snode);
// Release `entry`, to free its memory.
skiplist_release_node(&entry->snode);
skiplist_wait_for_free(&entry->snode);
// Free `entry` after it becomes safe.
kv_skiplist_node_free(entry);
}

skiplist_raw* kv_skiplist_create() {
skiplist_raw *slist;
slist = (skiplist_raw*)malloc(sizeof(skiplist_raw));
skiplist_init(slist, kv_cmp);
return slist;
}

void kv_skiplist_destroy(skiplist_raw* slist) {
// Iterate and free all nodes.
skiplist_node* cursor = skiplist_begin(slist);
while (cursor) {
kv_node* entry = _get_entry(cursor, kv_node, snode);
// Get next `cursor`.
cursor = skiplist_next(slist, cursor);
kv_skiplist_remove_node(slist, entry);
}
skiplist_free(slist);
}

kv_node* kv_skiplist_node_create(const char* key, size_t ksz, const char* value, size_t vsz) {
kv_node *n;
n = (kv_node*)malloc(sizeof(kv_node));
skiplist_init_node(&n->snode);
pthread_mutex_init(&n->dlock, NULL);
n->key = copyString(key, ksz);
n->key_sz = ksz;
n->value = copyString(value, vsz);
n->value_sz = vsz;
return n;
}

void kv_skiplist_node_free(kv_node *n) {
skiplist_free_node(&n->snode);
if (n->key) {
free(n->key);
}
if (n->value) {
free(n->value);
}
pthread_mutex_destroy(&n->dlock);
free(n);
}

int kv_skiplist_insert(skiplist_raw* l, const char* key, size_t ksz, const char* value, size_t vsz) {
kv_node* n = kv_skiplist_node_create(key, ksz, value, vsz);
return skiplist_insert_nodup(l, &n->snode);
}

int kv_skiplist_update(skiplist_raw* l, const char* key, size_t ksz, const char* value, size_t vsz) {
kv_node* n = kv_skiplist_node_create(key, ksz, value, vsz);
int ret = skiplist_insert_nodup(l, &n->snode);
if (ret == 0) {
return 0;
}
kv_skiplist_node_free(n);

kv_node q;
q.key = copyString(key, ksz);
q.key_sz = ksz;
// the key already exist, we remove it and reinsert
skiplist_node* cur = skiplist_find(l, &q.snode);
if (!cur) {
return -1;
}
kv_node* found = _get_entry(cur, kv_node, snode);
pthread_mutex_lock(&found->dlock);
found->value = copyString(value, vsz);
found->value_sz = vsz;
skiplist_release_node(&found->snode);
pthread_mutex_unlock(&found->dlock);
return 0;
}

char* kv_skiplist_get(skiplist_raw* l, const char* key, size_t ksz, size_t* vsz) {
kv_node n;
n.key = copyString(key, ksz);
n.key_sz = ksz;
skiplist_node* cur = skiplist_find(l, &n.snode);
if (!cur) {
return NULL;
}
char* v = kv_skiplist_get_node_value(cur, vsz);
skiplist_release_node(cur);
return v;
}

char* kv_skiplist_get_node_value(skiplist_node* n, size_t* sz) {
kv_node* found = _get_entry(n, kv_node, snode);
pthread_mutex_lock(&found->dlock);
*sz = found->value_sz;
char* v = copyString(found->value, found->value_sz);
pthread_mutex_unlock(&found->dlock);
return v;
}

char* kv_skiplist_get_node_key(skiplist_node* n, size_t* sz) {
kv_node* found = _get_entry(n, kv_node, snode);
pthread_mutex_lock(&found->dlock);
*sz = found->key_sz;
char* v = copyString(found->key, found->key_sz);
pthread_mutex_unlock(&found->dlock);
return v;
}

int kv_skiplist_del(skiplist_raw* l, const char* key, size_t ksz) {
kv_node n;
n.key = copyString(key, ksz);
n.key_sz = ksz;
skiplist_node* cur = skiplist_find(l, &n.snode);
if (!cur) {
return 0;
}
kv_node* found = _get_entry(cur, kv_node, snode);
kv_skiplist_remove_node(l, found);
return 1;
}

skiplist_node* kv_skiplist_find_ge(skiplist_raw* l, const char* key, size_t ksz) {
kv_node n;
n.key = copyString(key, ksz);
n.key_sz = ksz;
return skiplist_find_greater_or_equal(l, &n.snode);
}

skiplist_node* kv_skiplist_find_le(skiplist_raw* l, const char* key, size_t ksz) {
kv_node n;
n.key = copyString(key, ksz);
n.key_sz = ksz;
return skiplist_find_smaller_or_equal(l, &n.snode);
}
63 changes: 63 additions & 0 deletions engine/kv_skiplist.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#ifndef _KV_SKIPLIST_H
#define _KV_SKIPLIST_H (1)

#include "skiplist.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#ifdef __cplusplus
extern "C" {
#endif

// Define a node that contains key and value pair.
typedef struct {
// Metadata for skiplist node.
skiplist_node snode;
// My data here: {int, int} pair.
pthread_mutex_t dlock;
char* key;
size_t key_sz;
char* value;
size_t value_sz;
} kv_node;

static int kv_cmp(skiplist_node* a, skiplist_node* b, void* aux) {
kv_node *aa, *bb;
aa = _get_entry(a, kv_node, snode);
bb = _get_entry(b, kv_node, snode);
size_t sz;
sz = aa->key_sz;
if (bb->key_sz < sz) {
sz = bb->key_sz;
}
int ret = memcmp(aa->key, bb->key, sz);
if (ret != 0 || aa->key_sz == bb->key_sz) {
return ret;
}
return aa->key_sz < bb->key_sz?-1:1;
}

skiplist_raw* kv_skiplist_create();
void kv_skiplist_destroy(skiplist_raw* slist);
kv_node* kv_skiplist_node_create(const char* key, size_t ksz, const char* value, size_t vsz);
void kv_skiplist_node_free(kv_node *n);

int kv_skiplist_insert(skiplist_raw* l, const char* key, size_t ksz, const char* value, size_t vsz);
int kv_skiplist_update(skiplist_raw* l, const char* key, size_t ksz, const char* value, size_t vsz);

char* kv_skiplist_get(skiplist_raw* l, const char* key, size_t ksz, size_t* vsz);
int kv_skiplist_del(skiplist_raw* l, const char* key, size_t ksz);
skiplist_node* kv_skiplist_find_ge(skiplist_raw* l, const char* key, size_t ksz);
skiplist_node* kv_skiplist_find_le(skiplist_raw* l, const char* key, size_t ksz);

char* kv_skiplist_get_node_key(skiplist_node* n, size_t* sz);
char* kv_skiplist_get_node_value(skiplist_node* n, size_t* sz);

#ifdef __cplusplus
}
#endif

#endif
Loading

0 comments on commit a95f763

Please sign in to comment.