Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Wal debug 6.29.tikv #356

Open
wants to merge 18 commits into
base: 6.29.tikv
Choose a base branch
from
23 changes: 20 additions & 3 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1487,13 +1487,23 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
tonyxuqqi marked this conversation as resolved.
Show resolved Hide resolved
auto& wal = *it;
assert(wal.IsSyncing());

ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Synced log %" PRIu64 " from logs_, last seq number %" PRIu64
"\n",
wal.number, wal.writer->GetLastSequence());
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
synced_wals->AddWal(
wal.number,
WalMetadata(wal.GetPreSyncSize(), wal.writer->GetLastSequence()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
auto writer = wal.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_. Last Seq number of the WAL is %" PRIu64 "\n",
wal.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
it = logs_.erase(it);
} else {
wal.FinishSync();
Expand All @@ -1507,12 +1517,19 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
log_write_mutex_.AssertHeld();
uint64_t min_wal = 0;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
if (min_wal == 0) {
min_wal = it->number;
}
wal.FinishSync();
}
log_sync_cv_.SignalAll();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"MarkLogsNotSynced from %" PRIu64 " to %" PRIu64 "\n", min_wal,
up_to);
}

SequenceNumber DBImpl::GetLatestSequenceNumber() const {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1845,7 +1845,7 @@ class DBImpl : public DB {

IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size,
LogFileNumberSize& log_file_number_size);
LogFileNumberSize& log_file_number_size, int caller_id);

IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
Expand Down
23 changes: 22 additions & 1 deletion db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
return;
}

VersionEdit synced_wals;
if (!alive_log_files_.empty() && !logs_.empty()) {
uint64_t min_log_number = job_context->log_number;
size_t num_alive_log_files = alive_log_files_.size();
Expand All @@ -293,6 +294,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
earliest.number);
log_recycle_files_.push_back(earliest.number);
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting WAL log %" PRIu64 "\n", earliest.number);
job_context->log_delete_files.push_back(earliest.number);
}
if (job_context->size_log_to_delete == 0) {
Expand All @@ -317,7 +320,18 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// logs_ could have changed while we were waiting.
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
log.GetPreSyncSize() > 0) {
synced_wals.AddWal(
log.number,
WalMetadata(log.GetPreSyncSize(), log.writer->GetLastSequence()));
}
auto writer = log.ReleaseWriter();
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"deleting log %" PRIu64
" from logs_, last seq number of WAL %" PRIu64 "\n",
log.number, writer->GetLastSequence());
logs_to_free_.push_back(writer);
logs_.pop_front();
}
// Current log cannot be obsolete.
Expand All @@ -331,6 +345,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
logs_to_free_.clear();
log_write_mutex_.Unlock();
mutex_.Lock();
if (synced_wals.IsWalAddition()) {
ApplyWALToManifest(&synced_wals);
}
job_context->log_recycle_files.assign(log_recycle_files_.begin(),
log_recycle_files_.end());
}
Expand Down Expand Up @@ -491,6 +508,10 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// Close WALs before trying to delete them.
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Close log %" PRIu64
" from logs_, last Seq number in WAL %" PRIu64 "\n",
w->get_log_number(), w->GetLastSequence());
auto s = w->Close();
s.PermitUncheckedError();
}
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
assert(log_writer->get_log_number() == log_file_number_size.number);
impl->mutex_.AssertHeld();
s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
log_file_number_size);
log_file_number_size, 0);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
Expand Down
24 changes: 20 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1432,10 +1432,23 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size,
LogFileNumberSize& log_file_number_size) {
LogFileNumberSize& log_file_number_size,
int caller_id) {
assert(log_size != nullptr);

if (log_writer->file()->GetFileSize() == 0) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Start writing to WAL: [%" PRIu64 " ]",
log_writer->get_log_number());
}
if (log_writer->get_log_number() != logs_.back().number) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tonyxuqqi : At the beginning of the write, writer will reference the last WAL only. The race condition happens, while the write is ongoing. We should check this condition after the write.

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"Not writing to latest WAL: [%" PRIu64 ", %" PRIu64 "] CallerId: %d",
log_writer->get_log_number(), logs_.back().number, caller_id);
}
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
SequenceNumber seq = WriteBatchInternal::Sequence(&merged_batch);
log_writer->SetLastSequence(seq);
*log_size = log_entry.size();
// When two_write_queues_ WriteToWAL has to be protected from concurretn calls
// from the two queues anyway and log_write_mutex_ is already held. Otherwise
Expand Down Expand Up @@ -1488,7 +1501,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 1);
if (to_be_cached_state) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a lot when the fsync happens in writeToWAL

io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);

cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down Expand Up @@ -1516,7 +1529,9 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
}

for (auto& log : logs_) {
log.PrepareForSync();
io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
log.FinishSync();
if (!io_s.ok()) {
break;
}
Expand Down Expand Up @@ -1550,6 +1565,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}

return io_s;
}

Expand Down Expand Up @@ -1589,7 +1605,7 @@ IOStatus DBImpl::ConcurrentWriteToWAL(

uint64_t log_size;
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size,
log_file_number_size);
log_file_number_size, 2);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
Expand Down
1 change: 1 addition & 0 deletions db/log_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
}
last_seq_ = 0;
}

Writer::~Writer() {
Expand Down
6 changes: 6 additions & 0 deletions db/log_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "rocksdb/types.h"

namespace ROCKSDB_NAMESPACE {

Expand Down Expand Up @@ -92,11 +93,16 @@ class Writer {

bool TEST_BufferIsEmpty();

void SetLastSequence(SequenceNumber seq) { last_seq_ = seq; }

SequenceNumber GetLastSequence() const { return last_seq_; }

private:
std::unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block
uint64_t log_number_;
bool recycle_log_files_;
SequenceNumber last_seq_;

// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
Expand Down
17 changes: 15 additions & 2 deletions db/wal_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ void WalAddition::EncodeTo(std::string* dst) const {
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
}
PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kLastSyncSeq));
PutVarint64(dst, metadata_.GetLastSequence());

PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
}
Expand All @@ -44,6 +46,14 @@ Status WalAddition::DecodeFrom(Slice* src) {
metadata_.SetSyncedSizeInBytes(size);
break;
}
case WalAdditionTag::kLastSyncSeq: {
uint64_t lsn = 0;
if (!GetVarint64(src, &lsn)) {
return Status::Corruption(class_name, "Error decoding WAL file size");
}
metadata_.SetLastSequence(lsn);
break;
}
// TODO: process future tags such as checksum.
case WalAdditionTag::kTerminate:
return Status::OK();
Expand All @@ -58,13 +68,15 @@ Status WalAddition::DecodeFrom(Slice* src) {

JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
<< wal.GetMetadata().GetSyncedSizeInBytes();
<< wal.GetMetadata().GetSyncedSizeInBytes() << "LastSeqNumber"
<< wal.GetMetadata().GetLastSequence();
return jw;
}

std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
os << "log_number: " << wal.GetLogNumber()
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
<< " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes()
<< " last_seq_number: " << wal.GetMetadata().GetLastSequence();
return os;
}

Expand Down Expand Up @@ -139,6 +151,7 @@ Status WalSet::AddWal(const WalAddition& wal) {

// Update synced size for the given WAL.
it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
it->second.SetLastSequence(wal.GetMetadata().GetLastSequence());
return Status::OK();
}

Expand Down
14 changes: 12 additions & 2 deletions db/wal_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,21 @@ class WalMetadata {
public:
WalMetadata() = default;

explicit WalMetadata(uint64_t synced_size_bytes)
: synced_size_bytes_(synced_size_bytes) {}
explicit WalMetadata(uint64_t synced_size_bytes,
uint64_t last_sequence_number = 0)
: synced_size_bytes_(synced_size_bytes),
last_sequence_number_(last_sequence_number) {}

bool HasSyncedSize() const { return synced_size_bytes_ != kUnknownWalSize; }

void SetSyncedSizeInBytes(uint64_t bytes) { synced_size_bytes_ = bytes; }

uint64_t GetSyncedSizeInBytes() const { return synced_size_bytes_; }

uint64_t GetLastSequence() const { return last_sequence_number_; }

void SetLastSequence(uint64_t lsn) { last_sequence_number_ = lsn; }

private:
friend bool operator==(const WalMetadata& lhs, const WalMetadata& rhs);
friend bool operator!=(const WalMetadata& lhs, const WalMetadata& rhs);
Expand All @@ -50,6 +56,8 @@ class WalMetadata {

// Size of the most recently synced WAL in bytes.
uint64_t synced_size_bytes_ = kUnknownWalSize;

uint64_t last_sequence_number_ = 0;
};

inline bool operator==(const WalMetadata& lhs, const WalMetadata& rhs) {
Expand All @@ -66,6 +74,8 @@ enum class WalAdditionTag : uint32_t {
kTerminate = 1,
// Synced Size in bytes.
kSyncedSize = 2,

kLastSyncSeq = 3,
// Add tags in the future, such as checksum?
};

Expand Down
Loading