Skip to content

Commit

Permalink
raft: clean up Log
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed May 31, 2024
1 parent 13c0510 commit 3666d61
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 141 deletions.
143 changes: 62 additions & 81 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use super::{NodeID, Term};
use crate::encoding::{self, bincode, Key as _};
use crate::encoding::{self, bincode, Key as _, Value as _};
use crate::error::Result;
use crate::storage;
use crate::{asserterr, errassert, errdata};
use crate::{asserterr, errassert};

use serde::{Deserialize, Serialize};

/// A log index.
/// A log index. Starts at 1, indicates no index if 0.
pub type Index = u64;

/// A log entry.
Expand All @@ -20,7 +20,9 @@ pub struct Entry {
pub command: Option<Vec<u8>>,
}

/// A log key, encoded using KeyCode.
impl encoding::Value for Entry {}

/// A log storage key.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Key {
/// A log entry, storing the term and command.
Expand All @@ -33,9 +35,7 @@ pub enum Key {

impl encoding::Key<'_> for Key {}

/// Log key prefixes, used for prefix scans.
///
/// TODO: consider handling this in Key somehow.
/// Log key prefixes used for prefix scans. Must match the Key structure.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
enum KeyPrefix {
Entry,
Expand All @@ -48,8 +48,8 @@ impl encoding::Key<'_> for KeyPrefix {}
/// A Raft log.
pub struct Log {
/// The underlying storage engine. Uses a trait object instead of generics,
/// to allow runtime selection of the engine (based on the program config)
/// and avoid propagating the generic type parameters throughout.
/// to allow runtime selection of the engine and avoid propagating the
/// generic type parameters throughout Raft.
engine: Box<dyn storage::Engine>,
/// The index of the last stored entry.
last_index: Index,
Expand All @@ -62,13 +62,13 @@ pub struct Log {
}

impl Log {
/// Creates a new log, using the given storage engine.
/// Initializes a log using the given storage engine.
pub fn new(mut engine: impl storage::Engine + 'static) -> Result<Self> {
let (last_index, last_term) = engine
.scan_prefix(&KeyPrefix::Entry.encode()?)
.last()
.transpose()?
.map(|(k, v)| Self::decode_entry(&k, &v))
.map(|(_, v)| Entry::decode(&v))
.transpose()?
.map(|e| (e.index, e.term))
.unwrap_or((0, 0));
Expand All @@ -80,23 +80,6 @@ impl Log {
Ok(Self { engine: Box::new(engine), last_index, last_term, commit_index, commit_term })
}

/// Decodes an entry from a log key/value pair.
fn decode_entry(key: &[u8], value: &[u8]) -> Result<Entry> {
let Key::Entry(index) = Key::decode(key)? else { return errdata!("invalid key {key:x?}") };
Self::decode_entry_value(index, value)
}

/// Decodes an entry from a value at a given index.
fn decode_entry_value(index: Index, value: &[u8]) -> Result<Entry> {
let (term, command) = bincode::deserialize(value)?;
Ok(Entry { index, term, command })
}

/// Returns log engine name and status.
pub fn status(&mut self) -> Result<storage::engine::Status> {
self.engine.status()
}

/// Returns the commit index and term.
pub fn get_commit_index(&self) -> (Index, Term) {
(self.commit_index, self.commit_term)
Expand Down Expand Up @@ -134,22 +117,24 @@ impl Log {
Ok(())
}

/// Appends a command to the log, returning its index. None implies a noop
/// command, typically after Raft leader changes. The term must be equal to
/// or greater than the previous entry.
/// Appends a command to the log and flushes it to disk, returning its
/// index. None implies a noop command, typically after Raft leader changes.
/// The term must be equal to or greater than the previous entry.
pub fn append(&mut self, term: Term, command: Option<Vec<u8>>) -> Result<Index> {
match self.get(self.last_index)? {
Some(e) if term < e.term => return errassert!("term regression {} → {term}", e.term),
None if self.last_index > 0 => return errassert!("log gap at {}", self.last_index),
None if term == 0 => return errassert!("can't append entry with term 0"),
Some(_) | None => {}
}
let index = self.last_index + 1;
self.engine.set(&Key::Entry(index).encode()?, bincode::serialize(&(term, command))?)?;
// We could omit the index in the encoded value, since it's also stored
// in the key, but we keep it simple.
let entry = Entry { index: self.last_index + 1, term, command };
self.engine.set(&Key::Entry(entry.index).encode()?, entry.encode()?)?;
self.engine.flush()?;
self.last_index = index;
self.last_term = term;
Ok(index)
self.last_index = entry.index;
self.last_term = entry.term;
Ok(entry.index)
}

/// Commits entries up to and including the given index. The index must
Expand All @@ -174,10 +159,7 @@ impl Log {

/// Fetches an entry at an index, or None if it does not exist.
pub fn get(&mut self, index: Index) -> Result<Option<Entry>> {
self.engine
.get(&Key::Entry(index).encode()?)?
.map(|v| Self::decode_entry_value(index, &v))
.transpose()
self.engine.get(&Key::Entry(index).encode()?)?.map(|v| Entry::decode(&v)).transpose()
}

/// Checks if the log contains an entry with the given index and term.
Expand All @@ -190,31 +172,27 @@ impl Log {
&mut self,
range: impl std::ops::RangeBounds<Index>,
) -> Result<impl Iterator<Item = Result<Entry>> + '_> {
use std::ops::Bound;
let from = match range.start_bound() {
std::ops::Bound::Excluded(i) => std::ops::Bound::Excluded(Key::Entry(*i).encode()?),
std::ops::Bound::Included(i) => std::ops::Bound::Included(Key::Entry(*i).encode()?),
std::ops::Bound::Unbounded => std::ops::Bound::Included(Key::Entry(0).encode()?),
Bound::Excluded(&index) => Bound::Excluded(Key::Entry(index).encode()?),
Bound::Included(&index) => Bound::Included(Key::Entry(index).encode()?),
Bound::Unbounded => Bound::Included(Key::Entry(0).encode()?),
};
let to = match range.end_bound() {
std::ops::Bound::Excluded(i) => std::ops::Bound::Excluded(Key::Entry(*i).encode()?),
std::ops::Bound::Included(i) => std::ops::Bound::Included(Key::Entry(*i).encode()?),
std::ops::Bound::Unbounded => {
std::ops::Bound::Included(Key::Entry(Index::MAX).encode()?)
}
Bound::Excluded(&index) => Bound::Excluded(Key::Entry(index).encode()?),
Bound::Included(&index) => Bound::Included(Key::Entry(index).encode()?),
Bound::Unbounded => Bound::Included(Key::Entry(Index::MAX).encode()?),
};
Ok(self
.engine
.scan_dyn((from, to))
.map(|r| r.and_then(|(k, v)| Self::decode_entry(&k, &v))))
Ok(self.engine.scan_dyn((from, to)).map(|r| r.and_then(|(_, v)| Entry::decode(&v))))
}

/// Splices a set of entries into the log. The entries must have contiguous
/// indexes and equal/increasing terms, and the first entry must be in the
/// range [1,last_index+1] with a term at or equal to the previous (base)
/// entry's term. New indexes will be appended. Overlapping indexes with the
/// same term must be equal and will be ignored. Overlapping indexes with
/// different terms will truncate the existing log at the first conflict and
/// then splice the new entries.
/// Splices a set of entries into the log and flushes it to disk. The
/// entries must have contiguous indexes and equal/increasing terms, and the
/// first entry must be in the range [1,last_index+1] with a term at or
/// equal to the previous (base) entry's term. New indexes will be appended.
/// Overlapping indexes with the same term must be equal and will be
/// ignored. Overlapping indexes with different terms will truncate the
/// existing log at the first conflict and then splice the new entries.
pub fn splice(&mut self, entries: Vec<Entry>) -> Result<Index> {
let (Some(first), Some(last)) = (entries.first(), entries.last()) else {
return Ok(self.last_index); // empty input is noop
Expand Down Expand Up @@ -267,10 +245,7 @@ impl Log {
asserterr!(first.index > self.commit_index, "spliced entries below commit index");

for entry in entries {
self.engine.set(
&Key::Entry(entry.index).encode()?,
bincode::serialize(&(&entry.term, &entry.command))?,
)?;
self.engine.set(&Key::Entry(entry.index).encode()?, entry.encode()?)?;
}
for index in last.index + 1..=self.last_index {
self.engine.delete(&Key::Entry(index).encode()?)?;
Expand All @@ -281,6 +256,11 @@ impl Log {
self.last_term = last.term;
Ok(self.last_index)
}

/// Returns log engine status.
pub fn status(&mut self) -> Result<storage::Status> {
self.engine.status()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -334,18 +314,31 @@ mod tests {
output.push_str(&format!("commit → {}\n", Self::format_entry(&entry)));
}

// dump
"dump" => {
command.consume_args().reject_rest()?;
let range = (std::ops::Bound::Unbounded, std::ops::Bound::Unbounded);
let mut scan = self.log.engine.scan_dyn(range);
while let Some((key, value)) = scan.next().transpose()? {
output.push_str(&Self::format_key_value(&key, &value));
output.push('\n');
}
}

// get INDEX...
"get" => {
let mut args = command.consume_args();
let indexes: Vec<Index> =
args.rest_pos().iter().map(|a| a.parse()).collect::<Result<_, _>>()?;
args.reject_rest()?;
for index in indexes {
let result = match self.log.get(index)? {
Some(entry) => Self::format_entry(&entry),
None => "None".to_string(),
};
output.push_str(&format!("{result}\n"));
let entry = self
.log
.get(index)?
.as_ref()
.map(Self::format_entry)
.unwrap_or("None".to_string());
output.push_str(&format!("{entry}\n"));
}
}

Expand Down Expand Up @@ -374,17 +367,6 @@ mod tests {
}
}

// raw
"raw" => {
command.consume_args().reject_rest()?;
let range = (std::ops::Bound::Unbounded, std::ops::Bound::Unbounded);
let mut scan = self.log.engine.scan_dyn(range);
while let Some((key, value)) = scan.next().transpose()? {
output.push_str(&Self::format_key_value(&key, &value));
output.push('\n');
}
}

// scan [RANGE]
"scan" => {
let mut args = command.consume_args();
Expand Down Expand Up @@ -499,8 +481,7 @@ mod tests {
Operation::Flush => "flush".to_string(),
Operation::Set(k, v) => format!("set {}", Self::format_key_value(&k, &v)),
};
output.push_str(&s);
output.push('\n');
output.push_str(&format!("engine: {s}\n"));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ impl<R: Role> RawNode<R> {
{
let applied_index = state.get_applied_index();
let commit_index = log.get_commit_index().0;
assert!(commit_index >= applied_index, "Commit index below applied index");
// NB: we don't assert that commit_index >= applied_index, because the
// local commit index is not synced to durable storage -- on restart, it
// can be recovered from a quorum of logs.
if applied_index >= commit_index {
return Ok(());
}
Expand Down
24 changes: 12 additions & 12 deletions src/raft/testscripts/log/append
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ Error: assertion failed: can't append entry with term 0
# to durable storage.
append 2 foo oplog=true
---
set Entry(1) 0x000000000000000001 = 0x020103666f6f
flush
engine: set Entry(1) 0x000000000000000001 = 0x01020103666f6f
engine: flush
append → 1@2 foo

# Appending a noop entry (no command) also works.
append 2 oplog=true
---
set Entry(2) 0x000000000000000002 = 0x0200
flush
engine: set Entry(2) 0x000000000000000002 = 0x020200
engine: flush
append → 2@2 None

# Check that the last index/term is updated (commit index isn't), and that
# the engine contains the expected data, both in logical and raw form.
status
scan
raw
dump
---
last=2@2 commit=0@0
1@2 foo
2@2 None
Entry(1) 0x000000000000000001 = 0x020103666f6f
Entry(2) 0x000000000000000002 = 0x0200
Entry(1) 0x000000000000000001 = 0x01020103666f6f
Entry(2) 0x000000000000000002 = 0x020200

# Bumping the term with a command is allowed. Skipping a term and omitting the
# command is also allowed.
Expand All @@ -49,14 +49,14 @@ Error: assertion failed: term regression 5 → 0
# Dump the final status and data.
status
scan
raw
dump
---
last=4@5 commit=0@0
1@2 foo
2@2 None
3@3 command
4@5 None
Entry(1) 0x000000000000000001 = 0x020103666f6f
Entry(2) 0x000000000000000002 = 0x0200
Entry(3) 0x000000000000000003 = 0x030107636f6d6d616e64
Entry(4) 0x000000000000000004 = 0x0500
Entry(1) 0x000000000000000001 = 0x01020103666f6f
Entry(2) 0x000000000000000002 = 0x020200
Entry(3) 0x000000000000000003 = 0x03030107636f6d6d616e64
Entry(4) 0x000000000000000004 = 0x040500
22 changes: 11 additions & 11 deletions src/raft/testscripts/log/commit
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ Error: assertion failed: commit index 0 does not exist
commit 1@1 oplog=true
status
---
set CommitIndex 0x02 = 0x0101
engine: set CommitIndex 0x02 = 0x0101
commit → 1@1 None
last=3@2 commit=1@1

# Dump the raw engine contents too.
raw
# Dump the raw engine contents tdump
dump
---
Entry(1) 0x000000000000000001 = 0x0100
Entry(2) 0x000000000000000002 = 0x010103666f6f
Entry(3) 0x000000000000000003 = 0x020103626172
Entry(1) 0x000000000000000001 = 0x010100
Entry(2) 0x000000000000000002 = 0x02010103666f6f
Entry(3) 0x000000000000000003 = 0x03020103626172
CommitIndex 0x02 = 0x0101

# Commits are idempotent, which doesn't incur an engine set.
Expand Down Expand Up @@ -69,10 +69,10 @@ status
Error: assertion failed: commit index 4 does not exist
last=3@2 commit=3@2

# Dump the raw value.
raw
# Dump the raw values.
dump
---
Entry(1) 0x000000000000000001 = 0x0100
Entry(2) 0x000000000000000002 = 0x010103666f6f
Entry(3) 0x000000000000000003 = 0x020103626172
Entry(1) 0x000000000000000001 = 0x010100
Entry(2) 0x000000000000000002 = 0x02010103666f6f
Entry(3) 0x000000000000000003 = 0x03020103626172
CommitIndex 0x02 = 0x0302
Loading

0 comments on commit 3666d61

Please sign in to comment.