Skip to content

Commit

Permalink
raft: use current term for Log.append() and as splice() bound
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 8, 2024
1 parent cd45474 commit a5fdfc6
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 72 deletions.
41 changes: 19 additions & 22 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ impl encoding::Key<'_> for KeyPrefix {}
///
/// * Entry indexes are contiguous starting at 1 (no index gaps).
/// * Entry terms never decrease from the previous entry.
/// * Entry terms are at or below the current term.
/// * Appended entries are durable (flushed to disk).
/// * Appended entries use the current term.
/// * Committed entries are never changed or removed (no log truncation).
/// * Committed entries will eventually be replicated to all nodes.
/// * Entries with the same index/term contain the same command.
Expand Down Expand Up @@ -153,8 +155,9 @@ impl Log {
(self.term, self.vote)
}

/// Stores the most recent term and cast vote (if any). Enforces that the
/// term does not regress, and that we only vote for one node in a term.
/// Stores the current term and cast vote (if any). Enforces that the term
/// does not regress, and that we only vote for one node in a term. append()
/// will use this term, and splice() can't write entries beyond it.
pub fn set_term(&mut self, term: Term, vote: Option<NodeID>) -> Result<()> {
assert!(term > 0, "can't set term 0");
assert!(term >= self.term, "term regression {} → {}", self.term, term);
Expand All @@ -169,20 +172,14 @@ impl Log {
Ok(())
}

/// Appends a command to the log and flushes it to disk, returning its
/// index. None implies a noop command, typically after Raft leader changes.
/// The term must be equal to or greater than the previous entry.
/// TODO: use self.term for the term.
pub fn append(&mut self, term: Term, command: Option<Vec<u8>>) -> Result<Index> {
match self.get(self.last_index)? {
Some(e) if term < e.term => panic!("term regression {} → {term}", e.term),
None if self.last_index > 0 => panic!("log gap at {}", self.last_index),
None if term == 0 => panic!("can't append entry with term 0"),
Some(_) | None => {}
}
/// Appends a command to the log at the current term, and flushes it to
/// disk, returning its index. None implies a noop command, typically after
/// Raft leader changes.
pub fn append(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
assert!(self.term > 0, "can't append entry in term 0");
// We could omit the index in the encoded value, since it's also stored
// in the key, but we keep it simple.
let entry = Entry { index: self.last_index + 1, term, command };
let entry = Entry { index: self.last_index + 1, term: self.term, command };
self.engine.set(&Key::Entry(entry.index).encode()?, entry.encode()?)?;
self.engine.flush()?;
self.last_index = entry.index;
Expand Down Expand Up @@ -249,11 +246,11 @@ impl Log {
/// Splices a set of entries into the log and flushes it to disk. The
/// entries must have contiguous indexes and equal/increasing terms, and the
/// first entry must be in the range [1,last_index+1] with a term at or
/// equal to the previous (base) entry's term. New indexes will be appended.
/// Overlapping indexes with the same term must be equal and will be
/// ignored. Overlapping indexes with different terms will truncate the
/// existing log at the first conflict and then splice the new entries.
/// TODO: check against self.term.
/// above the previous (base) entry's term and at or below the current term.
/// New indexes will be appended. Overlapping indexes with the same term
/// must be equal and will be ignored. Overlapping indexes with different
/// terms will truncate the existing log at the first conflict and then
/// splice the new entries.
pub fn splice(&mut self, entries: Vec<Entry>) -> Result<Index> {
let (Some(first), Some(last)) = (entries.first(), entries.last()) else {
return Ok(self.last_index); // empty input is noop
Expand All @@ -272,6 +269,7 @@ impl Log {

// Check that the entries connect to the existing log (if any), and that the
// term doesn't regress.
assert!(last.term <= self.term, "splice term {} beyond current {}", last.term, self.term);
match self.get(first.index - 1)? {
Some(base) if first.term < base.term => {
panic!("splice term regression {} → {}", base.term, first.term)
Expand Down Expand Up @@ -349,14 +347,13 @@ mod tests {
fn run(&mut self, command: &goldenscript::Command) -> Result<String, Box<dyn Error>> {
let mut output = String::new();
match command.name.as_str() {
// append TERM [COMMAND] [oplog=BOOL]
// append [COMMAND] [oplog=BOOL]
"append" => {
let mut args = command.consume_args();
let term = args.next_pos().ok_or("term not given")?.parse()?;
let command = args.next_pos().map(|a| a.value.as_bytes().to_vec());
let oplog = args.lookup_parse("oplog")?.unwrap_or(false);
args.reject_rest()?;
let index = self.log.append(term, command)?;
let index = self.log.append(command)?;
let entry = self.log.get(index)?.expect("entry not found");
self.maybe_oplog(oplog, &mut output);
output.push_str(&format!("append → {}\n", Self::format_entry(&entry)));
Expand Down
2 changes: 1 addition & 1 deletion src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ impl RawNode<Leader> {
/// replicating it to peers. If successful, it will eventually be committed
/// and applied to the state machine.
fn propose(&mut self, command: Option<Vec<u8>>) -> Result<Index> {
let index = self.log.append(self.term, command)?;
let index = self.log.append(command)?;
for peer in self.peers.iter().copied().sorted() {
// Eagerly send the entry to the peer, but only if it is in steady
// state where we've already sent the previous entries. Otherwise,
Expand Down
38 changes: 17 additions & 21 deletions src/raft/testscripts/log/append
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
# Appending an entry with term 0 fails, even on an empty log.
!append 0 foo
# Appending an entry with term 0 fails.
!append foo
---
Panic: can't append entry with term 0
Panic: can't append entry in term 0

# Appending to an empty log works. The term doesn't have to be 1, and doesn't
# have to match get_term(). The entry is written to the engine and flushed
# to durable storage.
append 2 foo oplog=true
# Appending to an empty log works. The term doesn't have to be 1. The entry is
# written to the engine and flushed to durable storage.
set_term 2
append foo oplog=true
---
engine: set Entry(1) 0x000000000000000001 = 0x01020103666f6f
engine: flush
append → 1@2 foo

# Appending a noop entry (no command) also works.
append 2 oplog=true
append oplog=true
---
engine: set Entry(2) 0x000000000000000002 = 0x020200
engine: flush
Expand All @@ -25,33 +25,28 @@ status
scan
dump
---
term=0 last=2@2 commit=0@0 vote=None
term=2 last=2@2 commit=0@0 vote=None
1@2 foo
2@2 None
Entry(1) 0x000000000000000001 = 0x01020103666f6f
Entry(2) 0x000000000000000002 = 0x020200
TermVote 0x01 = 0x0200

# Bumping the term with a command is allowed. Skipping a term and omitting the
# command is also allowed.
append 3 command
append 5
# Skipping a term then appending is allowed.
set_term 3
append command
set_term 5
append
---
append → 3@3 command
append → 4@5 None

# A term regression fails, as does a 0 term.
!append 4 old
!append 0
---
Panic: term regression 5 → 4
Panic: term regression 5 → 0

# Dump the final status and data.
status
scan
dump
---
term=0 last=4@5 commit=0@0 vote=None
term=5 last=4@5 commit=0@0 vote=None
1@2 foo
2@2 None
3@3 command
Expand All @@ -60,3 +55,4 @@ Entry(1) 0x000000000000000001 = 0x01020103666f6f
Entry(2) 0x000000000000000002 = 0x020200
Entry(3) 0x000000000000000003 = 0x03030107636f6d6d616e64
Entry(4) 0x000000000000000004 = 0x040500
TermVote 0x01 = 0x0500
13 changes: 8 additions & 5 deletions src/raft/testscripts/log/commit
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Panic: commit index 1 does not exist

# Add some entries.
set_term 2
splice 1@1= 2@1=foo 3@2=bar
---
splice → 3@2 bar
Expand All @@ -22,48 +23,50 @@ status
---
engine: set CommitIndex 0x02 = 0x0101
commit → 1@1 None
term=0 last=3@2 commit=1@1 vote=None
term=2 last=3@2 commit=1@1 vote=None

# Dump the raw engine contents tdump
dump
---
Entry(1) 0x000000000000000001 = 0x010100
Entry(2) 0x000000000000000002 = 0x02010103666f6f
Entry(3) 0x000000000000000003 = 0x03020103626172
TermVote 0x01 = 0x0200
CommitIndex 0x02 = 0x0101

# Commits are idempotent, which doesn't incur an engine set.
commit 1 oplog=true
status
---
commit → 1@1 None
term=0 last=3@2 commit=1@1 vote=None
term=2 last=3@2 commit=1@1 vote=None

# Commits can skip an entry.
commit 3
status
---
commit → 3@2 bar
term=0 last=3@2 commit=3@2 vote=None
term=2 last=3@2 commit=3@2 vote=None

# Commit regressions error.
!commit 2
status
---
Panic: commit index regression 3 → 2
term=0 last=3@2 commit=3@2 vote=None
term=2 last=3@2 commit=3@2 vote=None

# Committing non-existant indexes error.
!commit 4
status
---
Panic: commit index 4 does not exist
term=0 last=3@2 commit=3@2 vote=None
term=2 last=3@2 commit=3@2 vote=None

# Dump the raw values.
dump
---
Entry(1) 0x000000000000000001 = 0x010100
Entry(2) 0x000000000000000002 = 0x02010103666f6f
Entry(3) 0x000000000000000003 = 0x03020103626172
TermVote 0x01 = 0x0200
CommitIndex 0x02 = 0x0302
8 changes: 5 additions & 3 deletions src/raft/testscripts/log/get
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ get 1
None

# Append a few entries.
append 1
append 1 foo
append 2 bar
set_term 1
append
append foo
set_term 2
append bar
---
append → 1@1 None
append → 2@1 foo
Expand Down
8 changes: 5 additions & 3 deletions src/raft/testscripts/log/has
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ has 1@1
false

# Append a few entries.
append 1
append 1 foo
append 2 bar
set_term 1
append
append foo
set_term 2
append bar
---
append → 1@1 None
append → 2@1 foo
Expand Down
11 changes: 6 additions & 5 deletions src/raft/testscripts/log/init
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Tests that the log correctly initializes cached state when opened.

set_term 1 7
set_term 1
---
ok

append 1 foo
append 2 bar
append foo
set_term 2 7
append bar
commit 1
---
append → 1@1 foo
Expand All @@ -14,15 +15,15 @@ commit → 1@1 foo

status
---
term=1 last=2@2 commit=1@1 vote=7
term=2 last=2@2 commit=1@1 vote=7

reload
---
ok

status
---
term=1 last=2@2 commit=1@1 vote=7
term=2 last=2@2 commit=1@1 vote=7

scan
---
Expand Down
8 changes: 5 additions & 3 deletions src/raft/testscripts/log/scan
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ scan 3..7
<empty>

# Append a few entries.
append 1
append 1 foo
append 2 bar
set_term 1
append
append foo
set_term 2
append bar
---
append → 1@1 None
append → 2@1 foo
Expand Down

0 comments on commit a5fdfc6

Please sign in to comment.