Skip to content

Commit

Permalink
raft: add test helper state machines
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 8, 2024
1 parent 16dcc86 commit bbe8a69
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 118 deletions.
125 changes: 12 additions & 113 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1222,16 +1222,16 @@ fn quorum_value<T: Ord + Copy>(mut values: Vec<T>) -> T {
#[cfg(test)]
mod tests {
use super::*;
use crate::encoding::{self, bincode, Value as _};
use crate::encoding::{bincode, Value as _};
use crate::raft::state::test::{self as teststate, KVCommand, KVResponse};
use crate::raft::{
Entry, Request, RequestID, Response, ELECTION_TIMEOUT_RANGE, HEARTBEAT_INTERVAL,
MAX_APPEND_ENTRIES,
};
use crossbeam::channel::{Receiver, Sender};
use crossbeam::channel::Receiver;
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::result::Result;
use test_each_file::test_each_path;
Expand Down Expand Up @@ -1270,7 +1270,7 @@ mod tests {
nodes_rx: HashMap<NodeID, Receiver<Envelope>>,
/// Inbound receive queues to each node, to be stepped.
nodes_pending: HashMap<NodeID, Vec<Envelope>>,
/// Applied log entries for each node, after TestState application.
/// Applied log entries for each node, after state machine application.
applied_rx: HashMap<NodeID, Receiver<Entry>>,
/// Network partitions, sender → receivers.
disconnected: HashMap<NodeID, HashSet<NodeID>>,
Expand Down Expand Up @@ -1339,7 +1339,7 @@ mod tests {
let id = args.next_pos().ok_or("must specify node ID")?.parse()?;
let key = args.next_pos().ok_or("must specify key")?.value.clone();
args.reject_rest()?;
let request = Request::Read(TestCommand::Get { key }.encode()?);
let request = Request::Read(KVCommand::Get { key }.encode()?);
self.request(id, request, &mut output)?;
}

Expand Down Expand Up @@ -1387,7 +1387,7 @@ mod tests {
let kv = args.next_key().ok_or("must specify key/value pair")?.clone();
let (key, value) = (kv.key.unwrap(), kv.value);
args.reject_rest()?;
let request = Request::Write(TestCommand::Put { key, value }.encode()?);
let request = Request::Write(KVCommand::Put { key, value }.encode()?);
self.request(id, request, &mut output)?;
}

Expand Down Expand Up @@ -1504,7 +1504,7 @@ mod tests {
let (applied_tx, applied_rx) = crossbeam::channel::unbounded();
let peers = self.ids.iter().copied().filter(|i| *i != id).collect();
let log = Log::new(crate::storage::Memory::new())?;
let state = Box::new(TestState::new(applied_tx));
let state = teststate::Emit::new(teststate::KV::new(), applied_tx);
let opts = Options {
heartbeat_interval,
election_timeout_range: election_timeout..election_timeout + 1,
Expand Down Expand Up @@ -1740,7 +1740,7 @@ mod tests {
let nodefmt = Self::format_node(node);
output.push_str(&format!("{nodefmt} applied={applied_index}\n"));

let raw = state.read(TestCommand::Scan.encode()?)?;
let raw = state.read(KVCommand::Scan.encode()?)?;
let kvs: Vec<(String, String)> = bincode::deserialize(&raw)?;
for (key, value) in kvs {
output.push_str(&format!("{nodefmt} state {key}={value}\n"));
Expand Down Expand Up @@ -1962,7 +1962,7 @@ mod tests {
/// Formats an entry.
fn format_entry(entry: &Entry) -> String {
let command = match entry.command.as_ref() {
Some(raw) => TestCommand::decode(raw).expect("invalid command").to_string(),
Some(raw) => KVCommand::decode(raw).expect("invalid command").to_string(),
None => "None".to_string(),
};
format!("{}@{} {command}", entry.index, entry.term)
Expand Down Expand Up @@ -2046,7 +2046,7 @@ mod tests {
/// Formats a request.
fn format_request(request: &Request) -> String {
match request {
Request::Read(c) | Request::Write(c) => TestCommand::decode(c).unwrap().to_string(),
Request::Read(c) | Request::Write(c) => KVCommand::decode(c).unwrap().to_string(),
Request::Status => "status".to_string(),
}
}
Expand All @@ -2055,7 +2055,7 @@ mod tests {
fn format_response(response: &crate::error::Result<Response>) -> String {
match response {
Ok(Response::Read(r) | Response::Write(r)) => {
TestResponse::decode(r).unwrap().to_string()
KVResponse::decode(r).unwrap().to_string()
}
Ok(Response::Status(status)) => format!("{status:#?}"),
Err(e) => format!("Error::{e:?} ({e})"),
Expand Down Expand Up @@ -2094,105 +2094,4 @@ mod tests {
}
}
}

/// A test state machine which stores key/value pairs. See TestCommand.
struct TestState {
/// The current applied index.
applied_index: Index,
/// The stored data.
data: BTreeMap<String, String>,
/// Sends applied entries, for output.
applied_tx: Sender<Entry>,
}

impl TestState {
fn new(applied_tx: Sender<Entry>) -> Self {
Self { applied_index: 0, data: BTreeMap::new(), applied_tx }
}
}

impl State for TestState {
fn get_applied_index(&self) -> Index {
self.applied_index
}

fn apply(&mut self, entry: Entry) -> crate::error::Result<Vec<u8>> {
let response = entry
.command
.as_deref()
.map(TestCommand::decode)
.transpose()?
.map(|c| match c {
TestCommand::Put { key, value } => {
self.data.insert(key, value);
TestResponse::Put(entry.index)
}
TestCommand::Get { .. } => panic!("get submitted as write command"),
TestCommand::Scan => panic!("scan submitted as write command"),
})
.map_or(Ok(Vec::new()), |r| r.encode())?;
self.applied_index = entry.index;
self.applied_tx.send(entry)?;
Ok(response)
}

fn read(&self, command: Vec<u8>) -> crate::error::Result<Vec<u8>> {
let response = match TestCommand::decode(&command)? {
TestCommand::Get { key } => TestResponse::Get(self.data.get(&key).cloned()),
TestCommand::Scan => TestResponse::Scan(self.data.clone()),
TestCommand::Put { .. } => panic!("put submitted as read command"),
};
response.encode()
}
}

/// A TestState command. Each command returns a corresponding TestResponse.
#[derive(Serialize, Deserialize)]
enum TestCommand {
/// Fetches the value of the given key.
Get { key: String },
/// Stores the given key/value pair, returning the applied index.
Put { key: String, value: String },
/// Returns all key/value pairs.
Scan,
}

impl encoding::Value for TestCommand {}

impl std::fmt::Display for TestCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Get { key } => write!(f, "get {key}"),
Self::Put { key, value } => write!(f, "put {key}={value}"),
Self::Scan => write!(f, "scan"),
}
}
}

/// A TestCommand response.
#[derive(Serialize, Deserialize)]
enum TestResponse {
/// The value for the TestCommand::Get key, or None if it does not exist.
Get(Option<String>),
/// The applied index of a TestCommand::Put command.
Put(Index),
/// The scanned key/value pairs.
Scan(BTreeMap<String, String>),
}

impl encoding::Value for TestResponse {}

impl std::fmt::Display for TestResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Get(Some(value)) => write!(f, "{value}")?,
Self::Get(None) => write!(f, "None")?,
Self::Put(applied_index) => write!(f, "{applied_index}")?,
Self::Scan(scan) => {
write!(f, "{}", scan.iter().map(|(k, v)| format!("{k}={v}")).join(","))?
}
};
Ok(())
}
}
}
167 changes: 162 additions & 5 deletions src/raft/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use crate::error::Result;
/// Otherwise, the replicas will diverge, and different replicas will produce
/// different results.
///
/// Write commands ([`Request::Write`]) are replicated and applied on all
/// replicas via [`State::apply`]. The state machine must keep track of the last
/// applied index and return it via [`State::get_applied_index`]. Read commands
/// ([`Request::Read`]) are only executed on a single replica via
/// [`State::read`] and must not make any state changes.
/// Write commands (`Request::Write`) are replicated and applied on all replicas
/// via `State::apply`. The state machine must keep track of the last applied
/// index and return it via `State::get_applied_index`. Read commands
/// (`Request::Read`) are only executed on a single replica via `State::read`
/// and must not make any state changes.
pub trait State: Send {
/// Returns the last applied index from the state machine.
///
Expand Down Expand Up @@ -49,3 +49,160 @@ pub trait State: Send {
/// any state changes (i.e. it must not write).
fn read(&self, command: Vec<u8>) -> Result<Vec<u8>>;
}

/// Test helper state machines.
#[allow(dead_code)]
#[cfg(test)]
pub mod test {
use super::*;
use crate::encoding::{self, Value as _};
use crossbeam::channel::Sender;
use itertools::Itertools as _;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

/// Wraps a state machine and emits applied entries to the provided channel.
pub struct Emit {
inner: Box<dyn State>,
tx: Sender<Entry>,
}

impl Emit {
pub fn new(inner: Box<dyn State>, tx: Sender<Entry>) -> Box<Self> {
Box::new(Self { inner, tx })
}
}

impl State for Emit {
fn get_applied_index(&self) -> Index {
self.inner.get_applied_index()
}

fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
let response = self.inner.apply(entry.clone())?;
self.tx.send(entry)?;
Ok(response)
}

fn read(&self, command: Vec<u8>) -> Result<Vec<u8>> {
self.inner.read(command)
}
}

/// A simple string key/value store. Takes KVCommands.
pub struct KV {
applied_index: Index,
data: BTreeMap<String, String>,
}

impl KV {
pub fn new() -> Box<Self> {
Box::new(Self { applied_index: 0, data: BTreeMap::new() })
}
}

impl State for KV {
fn get_applied_index(&self) -> Index {
self.applied_index
}

fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
let command = entry.command.as_deref().map(KVCommand::decode).transpose()?;
let response = match command {
Some(KVCommand::Put { key, value }) => {
self.data.insert(key, value);
KVResponse::Put(entry.index).encode()?
}
Some(c @ (KVCommand::Get { .. } | KVCommand::Scan)) => {
panic!("{c} submitted as write command")
}
None => Vec::new(),
};
self.applied_index = entry.index;
Ok(response)
}

fn read(&self, command: Vec<u8>) -> Result<Vec<u8>> {
match KVCommand::decode(&command)? {
KVCommand::Get { key } => KVResponse::Get(self.data.get(&key).cloned()).encode(),
KVCommand::Scan => KVResponse::Scan(self.data.clone()).encode(),
c @ KVCommand::Put { .. } => panic!("{c} submitted as read command"),
}
}
}

/// A KV command. Returns the corresponding KVResponse.
#[derive(Serialize, Deserialize)]
pub enum KVCommand {
/// Fetches the value of the given key.
Get { key: String },
/// Stores the given key/value pair, returning the applied index.
Put { key: String, value: String },
/// Returns all key/value pairs.
Scan,
}

impl encoding::Value for KVCommand {}

impl std::fmt::Display for KVCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Get { key } => write!(f, "get {key}"),
Self::Put { key, value } => write!(f, "put {key}={value}"),
Self::Scan => write!(f, "scan"),
}
}
}

/// A KVCommand response.
#[derive(Serialize, Deserialize)]
pub enum KVResponse {
/// Get returns the key's value, or None if it does not exist.
Get(Option<String>),
/// Put returns the applied index of the command.
Put(Index),
/// Scan returns the key/value pairs.
Scan(BTreeMap<String, String>),
}

impl encoding::Value for KVResponse {}

impl std::fmt::Display for KVResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Get(Some(value)) => write!(f, "{value}"),
Self::Get(None) => write!(f, "None"),
Self::Put(applied_index) => write!(f, "{applied_index}"),
Self::Scan(kvs) => {
write!(f, "{}", kvs.iter().map(|(k, v)| format!("{k}={v}")).join(","))
}
}
}
}

/// A state machine which does nothing. All commands are ignored.
pub struct Noop {
applied_index: Index,
}

impl Noop {
pub fn new() -> Box<Self> {
Box::new(Self { applied_index: 0 })
}
}

impl State for Noop {
fn get_applied_index(&self) -> Index {
self.applied_index
}

fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
self.applied_index = entry.index;
Ok(Vec::new())
}

fn read(&self, _: Vec<u8>) -> Result<Vec<u8>> {
Ok(Vec::new())
}
}
}

0 comments on commit bbe8a69

Please sign in to comment.