diff --git a/src/raft/node.rs b/src/raft/node.rs index 61c90f6cb..e2a9d9df3 100644 --- a/src/raft/node.rs +++ b/src/raft/node.rs @@ -1222,16 +1222,16 @@ fn quorum_value(mut values: Vec) -> T { #[cfg(test)] mod tests { use super::*; - use crate::encoding::{self, bincode, Value as _}; + use crate::encoding::{bincode, Value as _}; + use crate::raft::state::test::{self as teststate, KVCommand, KVResponse}; use crate::raft::{ Entry, Request, RequestID, Response, ELECTION_TIMEOUT_RANGE, HEARTBEAT_INTERVAL, MAX_APPEND_ENTRIES, }; - use crossbeam::channel::{Receiver, Sender}; + use crossbeam::channel::Receiver; use pretty_assertions::assert_eq; - use serde::{Deserialize, Serialize}; use std::borrow::Borrow; - use std::collections::{BTreeMap, HashMap, HashSet}; + use std::collections::{HashMap, HashSet}; use std::error::Error; use std::result::Result; use test_each_file::test_each_path; @@ -1270,7 +1270,7 @@ mod tests { nodes_rx: HashMap>, /// Inbound receive queues to each node, to be stepped. nodes_pending: HashMap>, - /// Applied log entries for each node, after TestState application. + /// Applied log entries for each node, after state machine application. applied_rx: HashMap>, /// Network partitions, sender → receivers. disconnected: HashMap>, @@ -1339,7 +1339,7 @@ mod tests { let id = args.next_pos().ok_or("must specify node ID")?.parse()?; let key = args.next_pos().ok_or("must specify key")?.value.clone(); args.reject_rest()?; - let request = Request::Read(TestCommand::Get { key }.encode()?); + let request = Request::Read(KVCommand::Get { key }.encode()?); self.request(id, request, &mut output)?; } @@ -1387,7 +1387,7 @@ mod tests { let kv = args.next_key().ok_or("must specify key/value pair")?.clone(); let (key, value) = (kv.key.unwrap(), kv.value); args.reject_rest()?; - let request = Request::Write(TestCommand::Put { key, value }.encode()?); + let request = Request::Write(KVCommand::Put { key, value }.encode()?); self.request(id, request, &mut output)?; } @@ -1504,7 +1504,7 @@ mod tests { let (applied_tx, applied_rx) = crossbeam::channel::unbounded(); let peers = self.ids.iter().copied().filter(|i| *i != id).collect(); let log = Log::new(crate::storage::Memory::new())?; - let state = Box::new(TestState::new(applied_tx)); + let state = teststate::Emit::new(teststate::KV::new(), applied_tx); let opts = Options { heartbeat_interval, election_timeout_range: election_timeout..election_timeout + 1, @@ -1740,7 +1740,7 @@ mod tests { let nodefmt = Self::format_node(node); output.push_str(&format!("{nodefmt} applied={applied_index}\n")); - let raw = state.read(TestCommand::Scan.encode()?)?; + let raw = state.read(KVCommand::Scan.encode()?)?; let kvs: Vec<(String, String)> = bincode::deserialize(&raw)?; for (key, value) in kvs { output.push_str(&format!("{nodefmt} state {key}={value}\n")); @@ -1962,7 +1962,7 @@ mod tests { /// Formats an entry. fn format_entry(entry: &Entry) -> String { let command = match entry.command.as_ref() { - Some(raw) => TestCommand::decode(raw).expect("invalid command").to_string(), + Some(raw) => KVCommand::decode(raw).expect("invalid command").to_string(), None => "None".to_string(), }; format!("{}@{} {command}", entry.index, entry.term) @@ -2046,7 +2046,7 @@ mod tests { /// Formats a request. fn format_request(request: &Request) -> String { match request { - Request::Read(c) | Request::Write(c) => TestCommand::decode(c).unwrap().to_string(), + Request::Read(c) | Request::Write(c) => KVCommand::decode(c).unwrap().to_string(), Request::Status => "status".to_string(), } } @@ -2055,7 +2055,7 @@ mod tests { fn format_response(response: &crate::error::Result) -> String { match response { Ok(Response::Read(r) | Response::Write(r)) => { - TestResponse::decode(r).unwrap().to_string() + KVResponse::decode(r).unwrap().to_string() } Ok(Response::Status(status)) => format!("{status:#?}"), Err(e) => format!("Error::{e:?} ({e})"), @@ -2094,105 +2094,4 @@ mod tests { } } } - - /// A test state machine which stores key/value pairs. See TestCommand. - struct TestState { - /// The current applied index. - applied_index: Index, - /// The stored data. - data: BTreeMap, - /// Sends applied entries, for output. - applied_tx: Sender, - } - - impl TestState { - fn new(applied_tx: Sender) -> Self { - Self { applied_index: 0, data: BTreeMap::new(), applied_tx } - } - } - - impl State for TestState { - fn get_applied_index(&self) -> Index { - self.applied_index - } - - fn apply(&mut self, entry: Entry) -> crate::error::Result> { - let response = entry - .command - .as_deref() - .map(TestCommand::decode) - .transpose()? - .map(|c| match c { - TestCommand::Put { key, value } => { - self.data.insert(key, value); - TestResponse::Put(entry.index) - } - TestCommand::Get { .. } => panic!("get submitted as write command"), - TestCommand::Scan => panic!("scan submitted as write command"), - }) - .map_or(Ok(Vec::new()), |r| r.encode())?; - self.applied_index = entry.index; - self.applied_tx.send(entry)?; - Ok(response) - } - - fn read(&self, command: Vec) -> crate::error::Result> { - let response = match TestCommand::decode(&command)? { - TestCommand::Get { key } => TestResponse::Get(self.data.get(&key).cloned()), - TestCommand::Scan => TestResponse::Scan(self.data.clone()), - TestCommand::Put { .. } => panic!("put submitted as read command"), - }; - response.encode() - } - } - - /// A TestState command. Each command returns a corresponding TestResponse. - #[derive(Serialize, Deserialize)] - enum TestCommand { - /// Fetches the value of the given key. - Get { key: String }, - /// Stores the given key/value pair, returning the applied index. - Put { key: String, value: String }, - /// Returns all key/value pairs. - Scan, - } - - impl encoding::Value for TestCommand {} - - impl std::fmt::Display for TestCommand { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Get { key } => write!(f, "get {key}"), - Self::Put { key, value } => write!(f, "put {key}={value}"), - Self::Scan => write!(f, "scan"), - } - } - } - - /// A TestCommand response. - #[derive(Serialize, Deserialize)] - enum TestResponse { - /// The value for the TestCommand::Get key, or None if it does not exist. - Get(Option), - /// The applied index of a TestCommand::Put command. - Put(Index), - /// The scanned key/value pairs. - Scan(BTreeMap), - } - - impl encoding::Value for TestResponse {} - - impl std::fmt::Display for TestResponse { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::Get(Some(value)) => write!(f, "{value}")?, - Self::Get(None) => write!(f, "None")?, - Self::Put(applied_index) => write!(f, "{applied_index}")?, - Self::Scan(scan) => { - write!(f, "{}", scan.iter().map(|(k, v)| format!("{k}={v}")).join(","))? - } - }; - Ok(()) - } - } } diff --git a/src/raft/state.rs b/src/raft/state.rs index 9736c88e1..b79abb973 100644 --- a/src/raft/state.rs +++ b/src/raft/state.rs @@ -11,11 +11,11 @@ use crate::error::Result; /// Otherwise, the replicas will diverge, and different replicas will produce /// different results. /// -/// Write commands ([`Request::Write`]) are replicated and applied on all -/// replicas via [`State::apply`]. The state machine must keep track of the last -/// applied index and return it via [`State::get_applied_index`]. Read commands -/// ([`Request::Read`]) are only executed on a single replica via -/// [`State::read`] and must not make any state changes. +/// Write commands (`Request::Write`) are replicated and applied on all replicas +/// via `State::apply`. The state machine must keep track of the last applied +/// index and return it via `State::get_applied_index`. Read commands +/// (`Request::Read`) are only executed on a single replica via `State::read` +/// and must not make any state changes. pub trait State: Send { /// Returns the last applied index from the state machine. /// @@ -49,3 +49,160 @@ pub trait State: Send { /// any state changes (i.e. it must not write). fn read(&self, command: Vec) -> Result>; } + +/// Test helper state machines. +#[allow(dead_code)] +#[cfg(test)] +pub mod test { + use super::*; + use crate::encoding::{self, Value as _}; + use crossbeam::channel::Sender; + use itertools::Itertools as _; + use serde::{Deserialize, Serialize}; + use std::collections::BTreeMap; + + /// Wraps a state machine and emits applied entries to the provided channel. + pub struct Emit { + inner: Box, + tx: Sender, + } + + impl Emit { + pub fn new(inner: Box, tx: Sender) -> Box { + Box::new(Self { inner, tx }) + } + } + + impl State for Emit { + fn get_applied_index(&self) -> Index { + self.inner.get_applied_index() + } + + fn apply(&mut self, entry: Entry) -> Result> { + let response = self.inner.apply(entry.clone())?; + self.tx.send(entry)?; + Ok(response) + } + + fn read(&self, command: Vec) -> Result> { + self.inner.read(command) + } + } + + /// A simple string key/value store. Takes KVCommands. + pub struct KV { + applied_index: Index, + data: BTreeMap, + } + + impl KV { + pub fn new() -> Box { + Box::new(Self { applied_index: 0, data: BTreeMap::new() }) + } + } + + impl State for KV { + fn get_applied_index(&self) -> Index { + self.applied_index + } + + fn apply(&mut self, entry: Entry) -> Result> { + let command = entry.command.as_deref().map(KVCommand::decode).transpose()?; + let response = match command { + Some(KVCommand::Put { key, value }) => { + self.data.insert(key, value); + KVResponse::Put(entry.index).encode()? + } + Some(c @ (KVCommand::Get { .. } | KVCommand::Scan)) => { + panic!("{c} submitted as write command") + } + None => Vec::new(), + }; + self.applied_index = entry.index; + Ok(response) + } + + fn read(&self, command: Vec) -> Result> { + match KVCommand::decode(&command)? { + KVCommand::Get { key } => KVResponse::Get(self.data.get(&key).cloned()).encode(), + KVCommand::Scan => KVResponse::Scan(self.data.clone()).encode(), + c @ KVCommand::Put { .. } => panic!("{c} submitted as read command"), + } + } + } + + /// A KV command. Returns the corresponding KVResponse. + #[derive(Serialize, Deserialize)] + pub enum KVCommand { + /// Fetches the value of the given key. + Get { key: String }, + /// Stores the given key/value pair, returning the applied index. + Put { key: String, value: String }, + /// Returns all key/value pairs. + Scan, + } + + impl encoding::Value for KVCommand {} + + impl std::fmt::Display for KVCommand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Get { key } => write!(f, "get {key}"), + Self::Put { key, value } => write!(f, "put {key}={value}"), + Self::Scan => write!(f, "scan"), + } + } + } + + /// A KVCommand response. + #[derive(Serialize, Deserialize)] + pub enum KVResponse { + /// Get returns the key's value, or None if it does not exist. + Get(Option), + /// Put returns the applied index of the command. + Put(Index), + /// Scan returns the key/value pairs. + Scan(BTreeMap), + } + + impl encoding::Value for KVResponse {} + + impl std::fmt::Display for KVResponse { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Get(Some(value)) => write!(f, "{value}"), + Self::Get(None) => write!(f, "None"), + Self::Put(applied_index) => write!(f, "{applied_index}"), + Self::Scan(kvs) => { + write!(f, "{}", kvs.iter().map(|(k, v)| format!("{k}={v}")).join(",")) + } + } + } + } + + /// A state machine which does nothing. All commands are ignored. + pub struct Noop { + applied_index: Index, + } + + impl Noop { + pub fn new() -> Box { + Box::new(Self { applied_index: 0 }) + } + } + + impl State for Noop { + fn get_applied_index(&self) -> Index { + self.applied_index + } + + fn apply(&mut self, entry: Entry) -> Result> { + self.applied_index = entry.index; + Ok(Vec::new()) + } + + fn read(&self, _: Vec) -> Result> { + Ok(Vec::new()) + } + } +}