Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIFO Compaction with TTL #2480

Closed
wants to merge 11 commits into from
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
### New Features
* Measure estimated number of reads per file. The information can be accessed through DB::GetColumnFamilyMetaData or "rocksdb.sstables" DB property.
* RateLimiter support for throttling background reads, or throttling the sum of background reads and writes. This can give more predictable I/O usage when compaction reads more data than it writes, e.g., due to lots of deletions.
* [Experimental] FIFO compaction with TTL support. It can be enabled by setting CompactionOptionsFIFO.ttl > 0.

## 5.6.0 (06/06/2017)
### Public API Change
Expand Down
14 changes: 8 additions & 6 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts,
int level,
const std::string* compression_dict, const bool skip_filters) {
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict, const bool skip_filters,
const uint64_t creation_time) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, compression_dict, skip_filters,
column_family_name, level),
column_family_name, level, creation_time),
column_family_id, file);
}

Expand All @@ -76,7 +76,8 @@ Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level) {
TableProperties* table_properties, int level,
const uint64_t creation_time) {
sagar0 marked this conversation as resolved.
Show resolved Hide resolved
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
Expand Down Expand Up @@ -125,7 +126,8 @@ Status BuildTable(
builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
column_family_id, column_family_name, file_writer.get(), compression,
compression_opts, level);
compression_opts, level, nullptr /* compression_dict */,
false /* skip_filters */, creation_time);
}

MergeHelper merge(env, internal_comparator.user_comparator(),
Expand Down
8 changes: 4 additions & 4 deletions db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts,
int level,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict = nullptr,
const bool skip_filters = false);
const bool skip_filters = false, const uint64_t creation_time = 0);

// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
Expand All @@ -79,6 +78,7 @@ extern Status BuildTable(
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1);
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0);

} // namespace rocksdb
13 changes: 13 additions & 0 deletions db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -461,4 +461,17 @@ bool Compaction::ShouldFormSubcompactions() const {
}
}

uint64_t Compaction::MaxInputFileCreationTime() const {
uint64_t max_creation_time = 0;
for (const auto& file : inputs_[0].files) {
if (file->fd.table_reader != nullptr &&
file->fd.table_reader->GetTableProperties() != nullptr) {
uint64_t creation_time =
file->fd.table_reader->GetTableProperties()->creation_time;
max_creation_time = std::max(max_creation_time, creation_time);
}
}
return max_creation_time;
}

} // namespace rocksdb
2 changes: 2 additions & 0 deletions db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ class Compaction {

uint64_t max_compaction_bytes() const { return max_compaction_bytes_; }

uint64_t MaxInputFileCreationTime() const;

private:
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool mark_as_compacted);
Expand Down
18 changes: 14 additions & 4 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,6 @@ Status CompactionJob::FinishCompactionOutputFile(
uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
assert(output_number != 0);

TableProperties table_properties;
// Check for iterator errors
Status s = input_status;
auto meta = &sub_compact->current_output()->meta;
Expand Down Expand Up @@ -1263,14 +1262,25 @@ Status CompactionJob::OpenCompactionOutputFile(
// data is going to be found
bool skip_filters =
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;

uint64_t output_file_creation_time =
sub_compact->compaction->MaxInputFileCreationTime();
if (output_file_creation_time == 0) {
int64_t _current_time;
auto status = db_options_.env->GetCurrentTime(&_current_time);
if (!status.ok()) {
_current_time = 0;
}
output_file_creation_time = static_cast<uint64_t>(_current_time);
sagar0 marked this conversation as resolved.
Show resolved Hide resolved
}

sub_compact->builder.reset(NewTableBuilder(
*cfd->ioptions(), cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
sub_compact->outfile.get(), sub_compact->compaction->output_compression(),
cfd->ioptions()->compression_opts,
sub_compact->compaction->output_level(),
&sub_compact->compression_dict,
skip_filters));
sub_compact->compaction->output_level(), &sub_compact->compression_dict,
skip_filters, output_file_creation_time));
LogFlush(db_options_.info_log);
return s;
}
Expand Down
105 changes: 96 additions & 9 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1405,17 +1405,88 @@ bool FIFOCompactionPicker::NeedsCompaction(
return vstorage->CompactionScore(kLevel0) >= 1;
}

Compaction* FIFOCompactionPicker::PickCompaction(
namespace {
uint64_t GetTotalFilesSize(
const std::vector<FileMetaData*>& files) {
uint64_t total_size = 0;
for (const auto& f : files) {
total_size += f->fd.file_size;
}
return total_size;
}
} // anonymous namespace

Compaction* FIFOCompactionPicker::PickTTLCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
assert(vstorage->num_levels() == 1);
assert(ioptions_.compaction_options_fifo.ttl > 0);

const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
uint64_t total_size = 0;
for (const auto& file : level_files) {
total_size += file->fd.file_size;
uint64_t total_size = GetTotalFilesSize(level_files);

int64_t _current_time;
auto status = ioptions_.env->GetCurrentTime(&_current_time);
if (!status.ok()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: Couldn't get current time: %s. "
"Not doing compactions based on TTL. ",
cf_name.c_str(), status.ToString().c_str());
return nullptr;
}
const uint64_t current_time = static_cast<uint64_t>(_current_time);

std::vector<CompactionInputFiles> inputs;
inputs.emplace_back();
inputs[0].level = 0;

for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr;
if (f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time == 0 ||
creation_time >=
(current_time - ioptions_.compaction_options_fifo.ttl)) {
break;
}
total_size -= f->compensated_file_size;
inputs[0].files.push_back(f);
}
}

// Return a nullptr and proceed to size-based FIFO compaction if:
// 1. there are no files older than ttl OR
// 2. there are a few files older than ttl, but deleting them will not bring
// the total size to be less than max_table_files_size threshold.
if (inputs[0].files.empty() ||
total_size > ioptions_.compaction_options_fifo.max_table_files_size) {
return nullptr;
}

for (const auto& f : inputs[0].files) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with creation time %" PRIu64 " for deletion",
cf_name.c_str(), f->fd.GetNumber(),
f->fd.table_reader->GetTableProperties()->creation_time);
}

Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}

Compaction* FIFOCompactionPicker::PickSizeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
uint64_t total_size = GetTotalFilesSize(level_files);

if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
level_files.size() == 0) {
// total size not exceeded
Expand All @@ -1435,7 +1506,6 @@ Compaction* FIFOCompactionPicker::PickCompaction(
/* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
RegisterCompaction(c);
return c;
}
}
Expand All @@ -1460,24 +1530,41 @@ Compaction* FIFOCompactionPicker::PickCompaction(
std::vector<CompactionInputFiles> inputs;
inputs.emplace_back();
inputs[0].level = 0;
// delete old files (FIFO)

for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr;
total_size -= f->compensated_file_size;
inputs[0].files.push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
break;
}
}

Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, {}, /* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}

Compaction* FIFOCompactionPicker::PickCompaction(
sagar0 marked this conversation as resolved.
Show resolved Hide resolved
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* vstorage, LogBuffer* log_buffer) {
assert(vstorage->num_levels() == 1);

Compaction* c = nullptr;
if (ioptions_.compaction_options_fifo.ttl > 0) {
c = PickTTLCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
}
if (c == nullptr) {
c = PickSizeCompaction(cf_name, mutable_cf_options, vstorage, log_buffer);
}
RegisterCompaction(c);
return c;
}
Expand Down
11 changes: 11 additions & 0 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ class FIFOCompactionPicker : public CompactionPicker {

virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;

private:
Compaction* PickTTLCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
LogBuffer* log_buffer);

Compaction* PickSizeCompaction(const std::string& cf_name,
const MutableCFOptions& mutable_cf_options,
VersionStorageInfo* version,
LogBuffer* log_buffer);
};

class NullCompactionPicker : public CompactionPicker {
Expand Down
25 changes: 24 additions & 1 deletion db/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "db/builder.h"
#include "options/options_helper.h"
#include "rocksdb/wal_filter.h"
#include "table/block_based_table_factory.h"
#include "util/rate_limiter.h"
#include "util/sst_file_manager_impl.h"
#include "util/sync_point.h"
Expand Down Expand Up @@ -164,6 +165,19 @@ static Status ValidateOptions(
"universal and level compaction styles. ");
}
}
if (cfd.options.compaction_options_fifo.ttl > 0) {
if (db_options.max_open_files != -1) {
return Status::NotSupported(
"FIFO Compaction with TTL is only supported when files are always "
"kept open (set max_open_files = -1). ");
}
if (cfd.options.table_factory->Name() !=
BlockBasedTableFactory().Name()) {
return Status::NotSupported(
"FIFO Compaction with TTL is only supported in "
"Block-Based Table format. ");
}
}
}

if (db_options.db_paths.size() > 4) {
Expand Down Expand Up @@ -832,6 +846,14 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
*cfd->GetLatestMutableCFOptions();
bool paranoid_file_checks =
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;

int64_t _current_time;
s = env_->GetCurrentTime(&_current_time);
if (!s.ok()) {
_current_time = 0;
}
const uint64_t current_time = static_cast<uint64_t>(_current_time);

{
mutex_.Unlock();

Expand All @@ -851,7 +873,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,
&event_logger_, job_id);
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
Expand Down
Loading