Skip to content

Commit

Permalink
raft: apply entries on restart, and add restart tests
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 10, 2024
1 parent d5c368b commit 013d188
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ pub struct Log {
/// The underlying storage engine. Uses a trait object instead of generics,
/// to allow runtime selection of the engine and avoid propagating the
/// generic type parameters throughout Raft.
engine: Box<dyn storage::Engine>,
pub(super) engine: Box<dyn storage::Engine>,
/// The current term.
term: Term,
/// Our leader vote in the current term, if any.
Expand Down
141 changes: 127 additions & 14 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ impl RawNode<Follower> {
let role = Follower::new(None, 0);
let mut node = Self { id, peers, log, state, tx, opts, role };
node.role.election_timeout = node.random_election_timeout();

// Apply any pending entries following restart. Unlike the Raft log,
// state machines writes are not flushed to durable storage, so a tail
// of writes may be lost if the OS crashes or restarts.
node.maybe_apply()?;
Ok(node)
}

Expand Down Expand Up @@ -1125,7 +1130,7 @@ impl RawNode<Leader> {
#[cfg(test)]
mod tests {
use super::*;
use crate::encoding::Value as _;
use crate::encoding::{bincode, Key as _, Value as _};
use crate::raft::state::test::{self as teststate, KVCommand, KVResponse};
use crate::raft::Entry;
use crate::storage;
Expand Down Expand Up @@ -1185,6 +1190,10 @@ mod tests {

/// Test helpers for Node.
impl Node {
fn dismantle(self) -> (Log, Box<dyn State>) {
with_rawnode!(self, |n| (n.log, n.state))
}

fn get_applied_index(&self) -> Index {
with_rawnode!(ref self, |n| n.state.get_applied_index())
}
Expand All @@ -1201,6 +1210,14 @@ mod tests {
with_rawnode!(ref self, |n| n.log.get_term())
}

fn options(&self) -> Options {
with_rawnode!(ref self, |n| n.opts.clone())
}

fn peers(&self) -> HashSet<NodeID> {
with_rawnode!(ref self, |n| n.peers.clone())
}

fn read(&self, command: Vec<u8>) -> crate::error::Result<Vec<u8>> {
with_rawnode!(ref self, |n| n.state.read(command))
}
Expand Down Expand Up @@ -1330,6 +1347,20 @@ mod tests {
self.request(id, request, &mut output)?;
}

// restart [commit_index=INDEX] [applied_index=INDEX] [ID...]
// Restarts the given nodes (or all nodes). They retain their
// log and state, unless applied_index is given (which reverts
// the state machine to the given index, or 0 if empty).
// commit_index may be given to regress the commit index (it
// is not flushed to durable storage).
"restart" => {
let mut args = command.consume_args();
let applied_index = args.lookup_parse("applied_index")?;
let commit_index = args.lookup_parse("commit_index")?;
let ids = self.parse_ids_or_all(&args.rest())?;
self.restart(&ids, commit_index, applied_index, &mut output)?;
}

// stabilize [heartbeat=BOOL] [ID...]
// Stabilizes the given nodes by repeatedly delivering messages
// until no more messages are pending. If heartbeat is true, also
Expand Down Expand Up @@ -1398,6 +1429,38 @@ mod tests {
Self { next_request_id: 1, ..Default::default() }
}

/// Creates a new empty node and inserts it.
fn add_node(
&mut self,
id: NodeID,
peers: HashSet<NodeID>,
opts: Options,
) -> Result<(), Box<dyn Error>> {
let log = Log::new(Box::new(storage::Memory::new()))?;
let state = teststate::KV::new();
self.add_node_with(id, peers, log, state, opts)
}

/// Creates a new node with the given log and state and inserts it.
fn add_node_with(
&mut self,
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
opts: Options,
) -> Result<(), Box<dyn Error>> {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
let (applied_tx, applied_rx) = crossbeam::channel::unbounded();
let state = teststate::Emit::new(state, applied_tx);
self.nodes.insert(id, Node::new(id, peers, log, state, node_tx, opts)?);
self.nodes_rx.insert(id, node_rx);
self.nodes_pending.insert(id, Vec::new());
self.applied_rx.insert(id, applied_rx);
self.disconnected.insert(id, HashSet::new());
Ok(())
}

/// Transitions nodes to candidates and campaign in a new term.
fn campaign(&mut self, ids: &[NodeID], output: &mut String) -> Result<(), Box<dyn Error>> {
let campaign = |node| match node {
Expand Down Expand Up @@ -1434,20 +1497,9 @@ mod tests {

self.ids = (1..=nodes).collect();

for id in self.ids.iter().copied() {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
let (applied_tx, applied_rx) = crossbeam::channel::unbounded();
for id in self.ids.clone() {
let peers = self.ids.iter().copied().filter(|i| i != &id).collect();
let log = Log::new(Box::new(storage::Memory::new()))?;
let state = teststate::Emit::new(teststate::KV::new(), applied_tx);
self.nodes.insert(id, Node::new(id, peers, log, state, node_tx, opts.clone())?);

while applied_rx.try_recv().is_ok() {} // drain first apply

self.nodes_rx.insert(id, node_rx);
self.nodes_pending.insert(id, Vec::new());
self.applied_rx.insert(id, applied_rx);
self.disconnected.insert(id, HashSet::new());
self.add_node(id, peers, opts.clone())?;
}

// Promote leader if requested. Suppress output.
Expand All @@ -1461,6 +1513,11 @@ mod tests {
self.stabilize(&self.ids.clone(), true, quiet)?;
}

// Drain any initial applied entries.
for applied_rx in self.applied_rx.values_mut() {
while applied_rx.try_recv().is_ok() {}
}

// Output final cluster status.
self.status(&self.ids, output)
}
Expand Down Expand Up @@ -1620,6 +1677,62 @@ mod tests {
self.transition(id, |n| n.step(msg), output)
}

/// Restarts the given nodes. If commit_index or applied_index are
/// given, the log commit index or state machine will regress.
fn restart(
&mut self,
ids: &[NodeID],
commit_index: Option<Index>,
applied_index: Option<Index>,
output: &mut String,
) -> Result<(), Box<dyn Error>> {
for id in ids.iter().copied() {
let node = self.nodes.remove(&id).ok_or(format!("unknown node {id}"))?;
let peers = node.peers();
let opts = node.options();
let (log, mut state) = node.dismantle();
let mut log = Log::new(log.engine)?; // reset log

// If requested, regress the commit index.
if let Some(commit_index) = commit_index {
if commit_index > log.get_commit_index().0 {
return Err(format!("commit_index={commit_index} beyond current").into());
}
let commit_term = match log.get(commit_index)? {
Some(e) => e.term,
None if commit_index == 0 => 0,
None => return Err(format!("unknown commit_index={commit_index}").into()),
};
log.engine.set(
&crate::raft::log::Key::CommitIndex.encode(),
bincode::serialize(&(commit_index, commit_term)),
)?;
// Reset the log again.
log = Log::new(log.engine)?;
}

// If requested, wipe the state machine and reapply up to the
// requested applied index.
if let Some(applied_index) = applied_index {
if applied_index > log.get_commit_index().0 {
return Err(format!("applied_index={applied_index} beyond commit").into());
}
state = teststate::KV::new();
let mut scan = log.scan(..=applied_index);
while let Some(entry) = scan.next().transpose()? {
_ = state.apply(entry); // apply errors are returned to client
}
assert_eq!(state.get_applied_index(), applied_index, "wrong applied index");
}

// Add node, and run a noop transition to output applied entries.
self.add_node_with(id, peers, log, state, opts)?;
self.transition(id, Ok, output)?;
}
// Output restarted node status.
self.status(ids, output)
}

/// Stabilizes the given nodes by repeatedly delivering pending messages
/// until no new messages are generated. If heartbeat is true, leaders
/// then emit a heartbeat and restabilize again, e.g. to propagate the
Expand Down
122 changes: 122 additions & 0 deletions src/raft/testscripts/node/restart
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Restarting a cluster that's fully caught up retains the existing state and
# allows trivially electing a new leader.

cluster nodes=3 leader=1
---
n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2}
n2@1 follower(n1) last=1@1 commit=1@1 applied=1
n3@1 follower(n1) last=1@1 commit=1@1 applied=1

# Replicate a couple of writes.
(put 1 a=1)
(put 1 b=2)
(stabilize heartbeat=true)
---
ok

# Dump the current status, log, and state.
status
---
n1@1 leader last=3@1 commit=3@1 applied=3 progress={2:3→4 3:3→4}
n2@1 follower(n1) last=3@1 commit=3@1 applied=3
n3@1 follower(n1) last=3@1 commit=3@1 applied=3

log
---
n1@1 term=1 last=3@1 commit=3@1 vote=Some(1)
n1@1 entry 1@1 None
n1@1 entry 2@1 put a=1
n1@1 entry 3@1 put b=2
n2@1 term=1 last=3@1 commit=3@1 vote=Some(1)
n2@1 entry 1@1 None
n2@1 entry 2@1 put a=1
n2@1 entry 3@1 put b=2
n3@1 term=1 last=3@1 commit=3@1 vote=Some(1)
n3@1 entry 1@1 None
n3@1 entry 2@1 put a=1
n3@1 entry 3@1 put b=2

state
---
n1@1 applied=3
n1@1 state a=1
n1@1 state b=2
n2@1 applied=3
n2@1 state a=1
n2@1 state b=2
n3@1 applied=3
n3@1 state a=1
n3@1 state b=2

# Restart the nodes. They retain the same status, logs, and state.
restart
---
n1@1 follower() last=3@1 commit=3@1 applied=3
n2@1 follower() last=3@1 commit=3@1 applied=3
n3@1 follower() last=3@1 commit=3@1 applied=3

log
---
n1@1 term=1 last=3@1 commit=3@1 vote=Some(1)
n1@1 entry 1@1 None
n1@1 entry 2@1 put a=1
n1@1 entry 3@1 put b=2
n2@1 term=1 last=3@1 commit=3@1 vote=Some(1)
n2@1 entry 1@1 None
n2@1 entry 2@1 put a=1
n2@1 entry 3@1 put b=2
n3@1 term=1 last=3@1 commit=3@1 vote=Some(1)
n3@1 entry 1@1 None
n3@1 entry 2@1 put a=1
n3@1 entry 3@1 put b=2

state
---
n1@1 applied=3
n1@1 state a=1
n1@1 state b=2
n2@1 applied=3
n2@1 state a=1
n2@1 state b=2
n3@1 applied=3
n3@1 state a=1
n3@1 state b=2

# Elect a new leader.
campaign 3
stabilize heartbeat=true
---
n3@1 follower() ⇨ n3@2 candidate
n3@2 → n1 Campaign last=3@1
n3@2 → n2 Campaign last=3@1
n1@1 follower() ⇨ n1@2 follower()
n1@2 → n3 CampaignResponse vote=true
n2@1 follower() ⇨ n2@2 follower()
n2@2 → n3 CampaignResponse vote=true
n3@2 candidate ⇨ n3@2 leader
n3@2 append 4@2 None
n3@2 → n1 Append base=3@1 [4@2]
n3@2 → n2 Append base=3@1 [4@2]
n3@2 → n1 Heartbeat last_index=4 commit_index=3 read_seq=0
n3@2 → n2 Heartbeat last_index=4 commit_index=3 read_seq=0
n1@2 follower() ⇨ n1@2 follower(n3)
n1@2 append 4@2 None
n1@2 → n3 AppendResponse match_index=4
n1@2 → n3 HeartbeatResponse match_index=4 read_seq=0
n2@2 follower() ⇨ n2@2 follower(n3)
n2@2 append 4@2 None
n2@2 → n3 AppendResponse match_index=4
n2@2 → n3 HeartbeatResponse match_index=4 read_seq=0
n3@2 commit 4@2
n3@2 → n1 Heartbeat last_index=4 commit_index=4 read_seq=0
n3@2 → n2 Heartbeat last_index=4 commit_index=4 read_seq=0
n1@2 commit 4@2
n1@2 → n3 HeartbeatResponse match_index=4 read_seq=0
n2@2 commit 4@2
n2@2 → n3 HeartbeatResponse match_index=4 read_seq=0

status
---
n1@2 follower(n3) last=4@2 commit=4@2 applied=4
n2@2 follower(n3) last=4@2 commit=4@2 applied=4
n3@2 leader last=4@2 commit=4@2 applied=4 progress={1:4→5 2:4→5}
41 changes: 41 additions & 0 deletions src/raft/testscripts/node/restart_apply
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Restarting a node and wiping its state machine will reapply the state.

cluster nodes=3 leader=1
---
n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2}
n2@1 follower(n1) last=1@1 commit=1@1 applied=1
n3@1 follower(n1) last=1@1 commit=1@1 applied=1

# Replicate a couple of writes.
(put 1 a=1)
(put 1 b=2)
(stabilize heartbeat=true)
---
ok

# Restart n3 and clear its state machine. The node will apply all pending
# entries when restarting.
restart 3 applied_index=0
---
n3@1 apply 1@1 None
n3@1 apply 2@1 put a=1
n3@1 apply 3@1 put b=2
n3@1 follower() last=3@1 commit=3@1 applied=3

state 3
---
n3@1 applied=3
n3@1 state a=1
n3@1 state b=2

# Restart n3 and lose the last write. It will also be reapplied.
restart 3 applied_index=2
---
n3@1 apply 3@1 put b=2
n3@1 follower() last=3@1 commit=3@1 applied=3

state 3
---
n3@1 applied=3
n3@1 state a=1
n3@1 state b=2

0 comments on commit 013d188

Please sign in to comment.