Skip to content

Commit

Permalink
Add dedicated flusher for syncOnly (#170)
Browse files Browse the repository at this point in the history
* Add dedicated flusher for syncOnly

* Resolve comments

* Make it extendable

* Fix typo

* Remove unnecessary comment

* Fix include order

---------

Co-authored-by: Zexi Liu <[email protected]>
  • Loading branch information
ZexiLiu and Zexi Liu authored Aug 7, 2024
1 parent 7648451 commit fb7904f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
10 changes: 8 additions & 2 deletions include/libjungle/db_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ class GlobalConfig {
GlobalConfig()
: globalLogPath("./")
, numFlusherThreads(1)
, numDedicatedFlusherForAsyncReqs(0)
, flusherSleepDuration_ms(500)
, flusherMinRecordsToTrigger(65536)
, flusherMinLogFilesToTrigger(16)
Expand All @@ -592,8 +593,7 @@ class GlobalConfig {
, fdbCacheSize(0)
, numTableWriters(8)
, memTableFlushBufferSize(32768)
, shutdownLogger(true)
{}
, shutdownLogger(true) {}

/**
* Path where Jungle's global log will be located.
Expand All @@ -605,6 +605,12 @@ class GlobalConfig {
*/
size_t numFlusherThreads;

/**
* Create dedicated flushers for async request. Only effective when it is
* not 0 and `numFlusherThreads` > `numDedicatedFlusherForAsyncReqs`.
*/
size_t numDedicatedFlusherForAsyncReqs;

/**
* Fluhser thread sleep time in ms.
*/
Expand Down
10 changes: 10 additions & 0 deletions src/db_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ limitations under the License.
#include "internal_helper.h"
#include "log_reclaimer.h"

#include "libjungle/db_config.h"

#include <set>

#include _MACRO_TO_STR(LOGGER_H)
Expand Down Expand Up @@ -86,9 +88,17 @@ void DBMgr::initInternal(const GlobalConfig& config) {

printGlobalConfig();

bool dedicated_async_flusher =
config.numDedicatedFlusherForAsyncReqs
&& config.numFlusherThreads > config.numDedicatedFlusherForAsyncReqs;
for (size_t ii=0; ii<config.numFlusherThreads; ++ii) {
std::string t_name = "flusher_" + std::to_string(ii);
Flusher* flusher = new Flusher(t_name, config);
// If dedicated flusher is enabled, only the first
// `numDedicatedFlusherForAsyncReqs` flushers will handle async reqs.
if (dedicated_async_flusher && ii >= config.numDedicatedFlusherForAsyncReqs) {
flusher->handleAsyncReqs = false;
}
wMgr->addWorker(flusher);
flusher->run();
}
Expand Down
7 changes: 6 additions & 1 deletion src/flusher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ Flusher::Flusher(const std::string& _w_name,
{
workerName = _w_name;
gConfig = _config;
handleAsyncReqs = true;
FlusherOptions options;
options.sleepDuration_ms = gConfig.flusherSleepDuration_ms;
options.worker = this;
Expand All @@ -105,7 +106,11 @@ void Flusher::work(WorkerOptions* opt_base) {

DB* target_db = nullptr;

FlusherQueueElem* elem = dbm->flusherQueue()->pop();
FlusherQueueElem* elem = nullptr;
if (handleAsyncReqs) {
elem = dbm->flusherQueue()->pop();
}

if (elem) {
// User assigned work check if it is already closed.
std::lock_guard<std::mutex> l(dbm->dbMapLock);
Expand Down
2 changes: 2 additions & 0 deletions src/flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ limitations under the License.

#pragma once

#include "internal_helper.h"
#include "worker_mgr.h"

#include <libjungle/jungle.h>
Expand Down Expand Up @@ -82,6 +83,7 @@ class Flusher : public WorkerBase {

GlobalConfig gConfig;
size_t lastCheckedFileIndex;
bool handleAsyncReqs;
};


Expand Down

0 comments on commit fb7904f

Please sign in to comment.