diff --git a/src/raft/log.rs b/src/raft/log.rs index 02262e86..480e6e80 100644 --- a/src/raft/log.rs +++ b/src/raft/log.rs @@ -88,7 +88,7 @@ pub struct Log { /// The underlying storage engine. Uses a trait object instead of generics, /// to allow runtime selection of the engine and avoid propagating the /// generic type parameters throughout Raft. - engine: Box, + pub(super) engine: Box, /// The current term. term: Term, /// Our leader vote in the current term, if any. diff --git a/src/raft/node.rs b/src/raft/node.rs index 0c41be9f..3ee13766 100644 --- a/src/raft/node.rs +++ b/src/raft/node.rs @@ -289,6 +289,11 @@ impl RawNode { let role = Follower::new(None, 0); let mut node = Self { id, peers, log, state, tx, opts, role }; node.role.election_timeout = node.random_election_timeout(); + + // Apply any pending entries following restart. Unlike the Raft log, + // state machines writes are not flushed to durable storage, so a tail + // of writes may be lost if the OS crashes or restarts. + node.maybe_apply()?; Ok(node) } @@ -1125,7 +1130,7 @@ impl RawNode { #[cfg(test)] mod tests { use super::*; - use crate::encoding::Value as _; + use crate::encoding::{bincode, Key as _, Value as _}; use crate::raft::state::test::{self as teststate, KVCommand, KVResponse}; use crate::raft::Entry; use crate::storage; @@ -1185,6 +1190,10 @@ mod tests { /// Test helpers for Node. impl Node { + fn dismantle(self) -> (Log, Box) { + with_rawnode!(self, |n| (n.log, n.state)) + } + fn get_applied_index(&self) -> Index { with_rawnode!(ref self, |n| n.state.get_applied_index()) } @@ -1201,6 +1210,14 @@ mod tests { with_rawnode!(ref self, |n| n.log.get_term()) } + fn options(&self) -> Options { + with_rawnode!(ref self, |n| n.opts.clone()) + } + + fn peers(&self) -> HashSet { + with_rawnode!(ref self, |n| n.peers.clone()) + } + fn read(&self, command: Vec) -> crate::error::Result> { with_rawnode!(ref self, |n| n.state.read(command)) } @@ -1330,6 +1347,20 @@ mod tests { self.request(id, request, &mut output)?; } + // restart [commit_index=INDEX] [applied_index=INDEX] [ID...] + // Restarts the given nodes (or all nodes). They retain their + // log and state, unless applied_index is given (which reverts + // the state machine to the given index, or 0 if empty). + // commit_index may be given to regress the commit index (it + // is not flushed to durable storage). + "restart" => { + let mut args = command.consume_args(); + let applied_index = args.lookup_parse("applied_index")?; + let commit_index = args.lookup_parse("commit_index")?; + let ids = self.parse_ids_or_all(&args.rest())?; + self.restart(&ids, commit_index, applied_index, &mut output)?; + } + // stabilize [heartbeat=BOOL] [ID...] // Stabilizes the given nodes by repeatedly delivering messages // until no more messages are pending. If heartbeat is true, also @@ -1398,6 +1429,38 @@ mod tests { Self { next_request_id: 1, ..Default::default() } } + /// Creates a new empty node and inserts it. + fn add_node( + &mut self, + id: NodeID, + peers: HashSet, + opts: Options, + ) -> Result<(), Box> { + let log = Log::new(Box::new(storage::Memory::new()))?; + let state = teststate::KV::new(); + self.add_node_with(id, peers, log, state, opts) + } + + /// Creates a new node with the given log and state and inserts it. + fn add_node_with( + &mut self, + id: NodeID, + peers: HashSet, + log: Log, + state: Box, + opts: Options, + ) -> Result<(), Box> { + let (node_tx, node_rx) = crossbeam::channel::unbounded(); + let (applied_tx, applied_rx) = crossbeam::channel::unbounded(); + let state = teststate::Emit::new(state, applied_tx); + self.nodes.insert(id, Node::new(id, peers, log, state, node_tx, opts)?); + self.nodes_rx.insert(id, node_rx); + self.nodes_pending.insert(id, Vec::new()); + self.applied_rx.insert(id, applied_rx); + self.disconnected.insert(id, HashSet::new()); + Ok(()) + } + /// Transitions nodes to candidates and campaign in a new term. fn campaign(&mut self, ids: &[NodeID], output: &mut String) -> Result<(), Box> { let campaign = |node| match node { @@ -1434,20 +1497,9 @@ mod tests { self.ids = (1..=nodes).collect(); - for id in self.ids.iter().copied() { - let (node_tx, node_rx) = crossbeam::channel::unbounded(); - let (applied_tx, applied_rx) = crossbeam::channel::unbounded(); + for id in self.ids.clone() { let peers = self.ids.iter().copied().filter(|i| i != &id).collect(); - let log = Log::new(Box::new(storage::Memory::new()))?; - let state = teststate::Emit::new(teststate::KV::new(), applied_tx); - self.nodes.insert(id, Node::new(id, peers, log, state, node_tx, opts.clone())?); - - while applied_rx.try_recv().is_ok() {} // drain first apply - - self.nodes_rx.insert(id, node_rx); - self.nodes_pending.insert(id, Vec::new()); - self.applied_rx.insert(id, applied_rx); - self.disconnected.insert(id, HashSet::new()); + self.add_node(id, peers, opts.clone())?; } // Promote leader if requested. Suppress output. @@ -1461,6 +1513,11 @@ mod tests { self.stabilize(&self.ids.clone(), true, quiet)?; } + // Drain any initial applied entries. + for applied_rx in self.applied_rx.values_mut() { + while applied_rx.try_recv().is_ok() {} + } + // Output final cluster status. self.status(&self.ids, output) } @@ -1620,6 +1677,62 @@ mod tests { self.transition(id, |n| n.step(msg), output) } + /// Restarts the given nodes. If commit_index or applied_index are + /// given, the log commit index or state machine will regress. + fn restart( + &mut self, + ids: &[NodeID], + commit_index: Option, + applied_index: Option, + output: &mut String, + ) -> Result<(), Box> { + for id in ids.iter().copied() { + let node = self.nodes.remove(&id).ok_or(format!("unknown node {id}"))?; + let peers = node.peers(); + let opts = node.options(); + let (log, mut state) = node.dismantle(); + let mut log = Log::new(log.engine)?; // reset log + + // If requested, regress the commit index. + if let Some(commit_index) = commit_index { + if commit_index > log.get_commit_index().0 { + return Err(format!("commit_index={commit_index} beyond current").into()); + } + let commit_term = match log.get(commit_index)? { + Some(e) => e.term, + None if commit_index == 0 => 0, + None => return Err(format!("unknown commit_index={commit_index}").into()), + }; + log.engine.set( + &crate::raft::log::Key::CommitIndex.encode(), + bincode::serialize(&(commit_index, commit_term)), + )?; + // Reset the log again. + log = Log::new(log.engine)?; + } + + // If requested, wipe the state machine and reapply up to the + // requested applied index. + if let Some(applied_index) = applied_index { + if applied_index > log.get_commit_index().0 { + return Err(format!("applied_index={applied_index} beyond commit").into()); + } + state = teststate::KV::new(); + let mut scan = log.scan(..=applied_index); + while let Some(entry) = scan.next().transpose()? { + _ = state.apply(entry); // apply errors are returned to client + } + assert_eq!(state.get_applied_index(), applied_index, "wrong applied index"); + } + + // Add node, and run a noop transition to output applied entries. + self.add_node_with(id, peers, log, state, opts)?; + self.transition(id, Ok, output)?; + } + // Output restarted node status. + self.status(ids, output) + } + /// Stabilizes the given nodes by repeatedly delivering pending messages /// until no new messages are generated. If heartbeat is true, leaders /// then emit a heartbeat and restabilize again, e.g. to propagate the diff --git a/src/raft/testscripts/node/restart b/src/raft/testscripts/node/restart new file mode 100644 index 00000000..4a561c75 --- /dev/null +++ b/src/raft/testscripts/node/restart @@ -0,0 +1,122 @@ +# Restarting a cluster that's fully caught up retains the existing state and +# allows trivially electing a new leader. + +cluster nodes=3 leader=1 +--- +n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2} +n2@1 follower(n1) last=1@1 commit=1@1 applied=1 +n3@1 follower(n1) last=1@1 commit=1@1 applied=1 + +# Replicate a couple of writes. +(put 1 a=1) +(put 1 b=2) +(stabilize heartbeat=true) +--- +ok + +# Dump the current status, log, and state. +status +--- +n1@1 leader last=3@1 commit=3@1 applied=3 progress={2:3→4 3:3→4} +n2@1 follower(n1) last=3@1 commit=3@1 applied=3 +n3@1 follower(n1) last=3@1 commit=3@1 applied=3 + +log +--- +n1@1 term=1 last=3@1 commit=3@1 vote=Some(1) +n1@1 entry 1@1 None +n1@1 entry 2@1 put a=1 +n1@1 entry 3@1 put b=2 +n2@1 term=1 last=3@1 commit=3@1 vote=Some(1) +n2@1 entry 1@1 None +n2@1 entry 2@1 put a=1 +n2@1 entry 3@1 put b=2 +n3@1 term=1 last=3@1 commit=3@1 vote=Some(1) +n3@1 entry 1@1 None +n3@1 entry 2@1 put a=1 +n3@1 entry 3@1 put b=2 + +state +--- +n1@1 applied=3 +n1@1 state a=1 +n1@1 state b=2 +n2@1 applied=3 +n2@1 state a=1 +n2@1 state b=2 +n3@1 applied=3 +n3@1 state a=1 +n3@1 state b=2 + +# Restart the nodes. They retain the same status, logs, and state. +restart +--- +n1@1 follower() last=3@1 commit=3@1 applied=3 +n2@1 follower() last=3@1 commit=3@1 applied=3 +n3@1 follower() last=3@1 commit=3@1 applied=3 + +log +--- +n1@1 term=1 last=3@1 commit=3@1 vote=Some(1) +n1@1 entry 1@1 None +n1@1 entry 2@1 put a=1 +n1@1 entry 3@1 put b=2 +n2@1 term=1 last=3@1 commit=3@1 vote=Some(1) +n2@1 entry 1@1 None +n2@1 entry 2@1 put a=1 +n2@1 entry 3@1 put b=2 +n3@1 term=1 last=3@1 commit=3@1 vote=Some(1) +n3@1 entry 1@1 None +n3@1 entry 2@1 put a=1 +n3@1 entry 3@1 put b=2 + +state +--- +n1@1 applied=3 +n1@1 state a=1 +n1@1 state b=2 +n2@1 applied=3 +n2@1 state a=1 +n2@1 state b=2 +n3@1 applied=3 +n3@1 state a=1 +n3@1 state b=2 + +# Elect a new leader. +campaign 3 +stabilize heartbeat=true +--- +n3@1 follower() ⇨ n3@2 candidate +n3@2 → n1 Campaign last=3@1 +n3@2 → n2 Campaign last=3@1 +n1@1 follower() ⇨ n1@2 follower() +n1@2 → n3 CampaignResponse vote=true +n2@1 follower() ⇨ n2@2 follower() +n2@2 → n3 CampaignResponse vote=true +n3@2 candidate ⇨ n3@2 leader +n3@2 append 4@2 None +n3@2 → n1 Append base=3@1 [4@2] +n3@2 → n2 Append base=3@1 [4@2] +n3@2 → n1 Heartbeat last_index=4 commit_index=3 read_seq=0 +n3@2 → n2 Heartbeat last_index=4 commit_index=3 read_seq=0 +n1@2 follower() ⇨ n1@2 follower(n3) +n1@2 append 4@2 None +n1@2 → n3 AppendResponse match_index=4 +n1@2 → n3 HeartbeatResponse match_index=4 read_seq=0 +n2@2 follower() ⇨ n2@2 follower(n3) +n2@2 append 4@2 None +n2@2 → n3 AppendResponse match_index=4 +n2@2 → n3 HeartbeatResponse match_index=4 read_seq=0 +n3@2 commit 4@2 +n3@2 → n1 Heartbeat last_index=4 commit_index=4 read_seq=0 +n3@2 → n2 Heartbeat last_index=4 commit_index=4 read_seq=0 +n1@2 commit 4@2 +n1@2 → n3 HeartbeatResponse match_index=4 read_seq=0 +n2@2 commit 4@2 +n2@2 → n3 HeartbeatResponse match_index=4 read_seq=0 + +status +--- +n1@2 follower(n3) last=4@2 commit=4@2 applied=4 +n2@2 follower(n3) last=4@2 commit=4@2 applied=4 +n3@2 leader last=4@2 commit=4@2 applied=4 progress={1:4→5 2:4→5} diff --git a/src/raft/testscripts/node/restart_apply b/src/raft/testscripts/node/restart_apply new file mode 100644 index 00000000..fb9d943c --- /dev/null +++ b/src/raft/testscripts/node/restart_apply @@ -0,0 +1,41 @@ +# Restarting a node and wiping its state machine will reapply the state. + +cluster nodes=3 leader=1 +--- +n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2} +n2@1 follower(n1) last=1@1 commit=1@1 applied=1 +n3@1 follower(n1) last=1@1 commit=1@1 applied=1 + +# Replicate a couple of writes. +(put 1 a=1) +(put 1 b=2) +(stabilize heartbeat=true) +--- +ok + +# Restart n3 and clear its state machine. The node will apply all pending +# entries when restarting. +restart 3 applied_index=0 +--- +n3@1 apply 1@1 None +n3@1 apply 2@1 put a=1 +n3@1 apply 3@1 put b=2 +n3@1 follower() last=3@1 commit=3@1 applied=3 + +state 3 +--- +n3@1 applied=3 +n3@1 state a=1 +n3@1 state b=2 + +# Restart n3 and lose the last write. It will also be reapplied. +restart 3 applied_index=2 +--- +n3@1 apply 3@1 put b=2 +n3@1 follower() last=3@1 commit=3@1 applied=3 + +state 3 +--- +n3@1 applied=3 +n3@1 state a=1 +n3@1 state b=2 diff --git a/src/raft/testscripts/node/restart_commit_recover b/src/raft/testscripts/node/restart_commit_recover new file mode 100644 index 00000000..7f7d2251 --- /dev/null +++ b/src/raft/testscripts/node/restart_commit_recover @@ -0,0 +1,69 @@ +# Restarting the cluster and wiping the commit indexes allows +# a new leader to recover the commit index. + +cluster nodes=3 leader=1 +--- +n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2} +n2@1 follower(n1) last=1@1 commit=1@1 applied=1 +n3@1 follower(n1) last=1@1 commit=1@1 applied=1 + +# Replicate a couple of writes, but don't propagate the commit index. +(put 1 a=1) +(put 1 b=2) +(stabilize) +status +--- +n1@1 leader last=3@1 commit=3@1 applied=3 progress={2:3→4 3:3→4} +n2@1 follower(n1) last=3@1 commit=1@1 applied=1 +n3@1 follower(n1) last=3@1 commit=1@1 applied=1 + +# Restart all nodes and wipe the commit index. +restart commit_index=0 +--- +n1@1 follower() last=3@1 commit=0@0 applied=3 +n2@1 follower() last=3@1 commit=0@0 applied=1 +n3@1 follower() last=3@1 commit=0@0 applied=1 + +# n3 campaigns for leadership and recovers the commit index. +campaign 3 +stabilize +--- +n3@1 follower() ⇨ n3@2 candidate +n3@2 → n1 Campaign last=3@1 +n3@2 → n2 Campaign last=3@1 +n1@1 follower() ⇨ n1@2 follower() +n1@2 → n3 CampaignResponse vote=true +n2@1 follower() ⇨ n2@2 follower() +n2@2 → n3 CampaignResponse vote=true +n3@2 candidate ⇨ n3@2 leader +n3@2 append 4@2 None +n3@2 → n1 Append base=3@1 [4@2] +n3@2 → n2 Append base=3@1 [4@2] +n3@2 → n1 Heartbeat last_index=4 commit_index=0 read_seq=0 +n3@2 → n2 Heartbeat last_index=4 commit_index=0 read_seq=0 +n1@2 follower() ⇨ n1@2 follower(n3) +n1@2 append 4@2 None +n1@2 → n3 AppendResponse match_index=4 +n1@2 → n3 HeartbeatResponse match_index=4 read_seq=0 +n2@2 follower() ⇨ n2@2 follower(n3) +n2@2 append 4@2 None +n2@2 → n3 AppendResponse match_index=4 +n2@2 → n3 HeartbeatResponse match_index=4 read_seq=0 +n3@2 commit 4@2 + +status +--- +n1@2 follower(n3) last=4@2 commit=0@0 applied=3 +n2@2 follower(n3) last=4@2 commit=0@0 applied=1 +n3@2 leader last=4@2 commit=4@2 applied=4 progress={1:4→5 2:4→5} + +# A heartbeat propagates the commit index. +heartbeat 3 +stabilize +--- +n3@2 → n1 Heartbeat last_index=4 commit_index=4 read_seq=0 +n3@2 → n2 Heartbeat last_index=4 commit_index=4 read_seq=0 +n1@2 commit 4@2 +n1@2 → n3 HeartbeatResponse match_index=4 read_seq=0 +n2@2 commit 4@2 +n2@2 → n3 HeartbeatResponse match_index=4 read_seq=0 diff --git a/src/raft/testscripts/node/restart_term_vote b/src/raft/testscripts/node/restart_term_vote new file mode 100644 index 00000000..9e95fd64 --- /dev/null +++ b/src/raft/testscripts/node/restart_term_vote @@ -0,0 +1,48 @@ +# The term/vote is retained across a restart. + +cluster nodes=3 +--- +n1@0 follower() last=0@0 commit=0@0 applied=0 +n2@0 follower() last=0@0 commit=0@0 applied=0 +n3@0 follower() last=0@0 commit=0@0 applied=0 + +# Start a new election on n1. +campaign 1 +--- +n1@0 follower() ⇨ n1@1 candidate +n1@1 → n2 Campaign last=0@0 +n1@1 → n3 Campaign last=0@0 + +# n3 votes for n1, and then restarts. +deliver 3 +--- +n3@0 follower() ⇨ n3@1 follower() +n3@1 → n1 CampaignResponse vote=true + +restart 3 +--- +n3@1 follower() last=0@0 commit=0@0 applied=0 + +# n3 still has a record of the term and vote in the log. +log 3 +--- +n3@1 term=1 last=0@0 commit=0@0 vote=Some(1) + +# n2 also campaigns. n3 does not grant its vote. +campaign 2 +--- +n2@0 follower() ⇨ n2@1 candidate +n2@1 → n1 Campaign last=0@0 +n2@1 → n3 Campaign last=0@0 + +deliver 3 +--- +n3@1 → n2 CampaignResponse vote=false + +# n1 wins leadership. +(stabilize) +status +--- +n1@1 leader last=1@1 commit=1@1 applied=1 progress={2:1→2 3:1→2} +n2@1 follower(n1) last=1@1 commit=0@0 applied=0 +n3@1 follower(n1) last=1@1 commit=0@0 applied=0