Skip to content

Commit

Permalink
Merge pull request #23 from meitu/enhancement/flushdb
Browse files Browse the repository at this point in the history
Use rocksdb deleterange API to make the flushdb command faster
  • Loading branch information
git-hulk committed Sep 11, 2019
2 parents 1879789 + 3ecd21d commit 87eb1dd
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 19 deletions.
21 changes: 17 additions & 4 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,9 @@ Status Config::parseConfigFromString(std::string input) {
return parseRocksdbOption(args[0].substr(8, args[0].size() - 8), args[1]);
} else if (size == 2 && !strncasecmp(args[0].data(), "namespace.", 10)) {
std::string ns = args[0].substr(10, args.size()-10);
if (ns.size() > INT8_MAX) {
return Status(Status::NotOK, std::string("namespace size exceed limit ")+std::to_string(INT8_MAX));
auto s = isNamespaceLegal(ns);
if (!s.IsOK()) {
return s;
}
tokens[args[1]] = ns;
} else if (size == 2 && !strcasecmp(args[0].data(), "slowlog-log-slower-than")) {
Expand Down Expand Up @@ -692,8 +693,9 @@ Status Config::AddNamespace(const std::string &ns, const std::string &token) {
if (requirepass.empty()) {
return Status(Status::NotOK, "forbid to add new namespace while the requirepass is empty");
}
if (ns.size() > 255) {
return Status(Status::NotOK, "the namespace size exceed limit " + std::to_string(INT8_MAX));
auto s = isNamespaceLegal(ns);
if (!s.IsOK()) {
return s;
}
if (tokens.find(token) != tokens.end()) {
return Status(Status::NotOK, "the token has already exists");
Expand All @@ -719,3 +721,14 @@ Status Config::DelNamespace(const std::string &ns) {
}
return Status(Status::NotOK, "the namespace was not found");
}

Status Config::isNamespaceLegal(const std::string &ns) {
if (ns.size() > UINT8_MAX) {
return Status(Status::NotOK, std::string("namespace size exceed limit ") + std::to_string(UINT8_MAX));
}
char last_char = ns.back();
if (last_char == std::numeric_limits<char>::max()) {
return Status(Status::NotOK, std::string("namespace contain ilegal letter"));
}
return Status::OK();
}
1 change: 1 addition & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,5 @@ struct Config{
Status parseRocksdbOption(const std::string &key, std::string value);
Status parseRocksdbIntOption(std::string key, std::string value);
void array2String(const std::vector<std::string> &array, const std::string &delim, std::string *output);
Status isNamespaceLegal(const std::string &ns);
};
64 changes: 49 additions & 15 deletions src/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,20 @@ rocksdb::Status Database::RandomKey(const std::string &cursor, std::string *key)
}

rocksdb::Status Database::FlushDB() {
std::string prefix;
std::string prefix, begin_key, end_key;
AppendNamespacePrefix("", &prefix);
LatestSnapShot ss(db_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
read_options.fill_cache = false;
auto iter = db_->NewIterator(read_options, metadata_cf_handle_);
for (iter->Seek(prefix);
iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
auto s = db_->Delete(rocksdb::WriteOptions(), metadata_cf_handle_, iter->key());
if (!s.ok()) {
delete iter;
return s;
}
auto s = FindKeyRangeWithPrefix(prefix, &begin_key, &end_key);
if (!s.ok()) {
return rocksdb::Status::OK();
}
s = db_->DeleteRange(rocksdb::WriteOptions(), metadata_cf_handle_, begin_key, end_key);
if (!s.ok()) {
return s;
}
s = db_->Delete(rocksdb::WriteOptions(), metadata_cf_handle_, end_key);
if (!s.ok()) {
return s;
}
delete iter;
return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -362,6 +359,43 @@ void Database::AppendNamespacePrefix(const Slice &user_key, std::string *output)
ComposeNamespaceKey(namespace_, user_key, output);
}

rocksdb::Status Database::FindKeyRangeWithPrefix(const std::string &prefix, std::string *begin, std::string *end) {
begin->clear();
end->clear();

LatestSnapShot ss(db_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
read_options.fill_cache = false;
auto iter = db_->NewIterator(read_options, metadata_cf_handle_);
iter->Seek(prefix);
if (!iter->Valid()) {
delete iter;
return rocksdb::Status::NotFound();
}
*begin = iter->key().ToString();

//compose next prefix key
std::string next_prefix = prefix;
char last_char = next_prefix.back();
last_char++;
next_prefix.pop_back();
next_prefix.push_back(last_char);

iter->SeekForPrev(next_prefix);
// reversed seek the key til with prefix or end of the iterator
while (iter->Valid() && !iter->key().starts_with(prefix)) {
iter->Prev();
}
if (!iter->Valid()) {
delete iter;
return rocksdb::Status::NotFound();
}
*end = iter->key().ToString();
delete iter;
return rocksdb::Status::OK();
}

rocksdb::Status SubKeyScanner::Scan(RedisType type,
const Slice &user_key,
const std::string &cursor,
Expand Down
1 change: 1 addition & 0 deletions src/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Database {
std::vector<std::string> *keys);
rocksdb::Status RandomKey(const std::string &cursor, std::string *key);
void AppendNamespacePrefix(const Slice &user_key, std::string *output);
rocksdb::Status FindKeyRangeWithPrefix(const std::string &prefix, std::string *begin, std::string *end);

protected:
Engine::Storage *storage_;
Expand Down

0 comments on commit 87eb1dd

Please sign in to comment.