Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 9, 2024
1 parent b996472 commit c821dcf
Showing 1 changed file with 54 additions and 51 deletions.
105 changes: 54 additions & 51 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use super::{
Envelope, Index, Log, Message, ReadSequence, Request, RequestID, Response, State, Status,
};
use super::log::{Index, Log};
use super::message::{Envelope, Message, ReadSequence, Request, RequestID, Response, Status};
use super::state::State;
use crate::error::{Error, Result};

use crossbeam::channel::Sender;
use itertools::Itertools as _;
use log::{debug, info};
use rand::Rng as _;
use std::collections::{HashMap, HashSet, VecDeque};

/// A node ID.
/// A node ID. Unique within a cluster. Assigned manually when started.
pub type NodeID = u8;

/// A leader term.
/// A leader term number. Increases monotonically.
pub type Term = u64;

/// A logical clock interval as number of ticks.
pub type Ticks = u8;

/// Raft node options.
#[derive(Clone, Debug, PartialEq)]
pub struct Options {
/// The number of ticks between leader heartbeats.
pub heartbeat_interval: Ticks,
Expand All @@ -40,31 +42,38 @@ impl Default for Options {
/// A Raft node, with a dynamic role. The node is driven synchronously by
/// processing inbound messages via step() or by advancing time via tick().
/// These methods consume the current node, and return a new one with a possibly
/// different role. Outbound messages are sent via the given node_tx channel.
/// different role. Outbound messages are sent via the given node_tx channel,
/// and must be delivered to the remote peers.
///
/// This enum wraps the RawNode<Role> types, which implement the actual
/// node logic. It exists for ergonomic use across role transitions, i.e
/// node = node.step()?.
/// This enum is the public interface to the node, with a closed set of roles.
/// It wraps the RawNode<Role> types, which implement the actual internal
/// node logic. The enum also allows ergonomic use across role transitions
/// since it can represent all roles, e.g.: `node = node.step()?`.
pub enum Node {
/// A candidate campaigns for leadership.
Candidate(RawNode<Candidate>),
/// A follower replicates entries from a leader.
Follower(RawNode<Follower>),
/// A leader processes client requests and replicates entries to followers.
Leader(RawNode<Leader>),
}

impl Node {
/// Creates a new Raft node, starting as a leaderless follower, or leader if
/// there are no peers.
/// Creates a new Raft node. It starts as a leaderless follower, waiting to
/// hear from a leader or otherwise transitioning to candidate and
/// campaigning for leadership. In the case of a single-node cluster (no
/// peers), the node immediately transitions to leader when created.
pub fn new(
id: NodeID,
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
node_tx: Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let node = RawNode::new(id, peers, log, state, node_tx, opts)?;
if node.peers.is_empty() {
// If there are no peers, become leader immediately.
// If this is a single-node cluster, become leader immediately.
if node.cluster_size() == 1 {
return Ok(node.into_candidate()?.into_leader()?.into());
}
Ok(node.into())
Expand All @@ -90,15 +99,15 @@ impl Node {

/// Processes a message from a peer.
pub fn step(self, msg: Envelope) -> Result<Self> {
debug!("Stepping {:?}", msg);
debug!("Stepping {msg:?}");
match self {
Node::Candidate(n) => n.step(msg),
Node::Follower(n) => n.step(msg),
Node::Leader(n) => n.step(msg),
}
}

/// Moves time forward by a tick.
/// Advances time by a tick.
pub fn tick(self) -> Result<Self> {
match self {
Node::Candidate(n) => n.tick(),
Expand Down Expand Up @@ -126,20 +135,29 @@ impl From<RawNode<Leader>> for Node {
}
}

/// A Raft role: leader, follower, or candidate.
/// Marker trait for a Raft role: leader, follower, or candidate.
pub trait Role {}

/// A Raft node with the concrete role R.
/// A Raft node with role R.
///
/// This implements the typestate pattern, where individual node states (roles)
/// are encoded as RawNode<Role>. See: http:https://cliffle.com/blog/rust-typestate/
pub struct RawNode<R: Role = Follower> {
/// The node ID. Must be unique in this cluster.
id: NodeID,
/// The IDs of the other nodes in the cluster. Does not change while
/// running. Can change on restart, but all nodes must have the same node
/// set to avoid multiple leaders (i.e. split brain).
peers: HashSet<NodeID>,
/// The Raft log, containing client commands to be executed.
log: Log,
/// The Raft state machine, on which client commands are executed.
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
/// Channel for sending outbound messages to other nodes.
node_tx: Sender<Envelope>,
/// Node options.
opts: Options,
/// Role-specific state.
role: R,
}

Expand Down Expand Up @@ -176,7 +194,7 @@ impl<R: Role> RawNode<R> {
/// order. The slice must have the same size as the cluster.
fn quorum_value<T: Ord + Copy>(&self, mut values: Vec<T>) -> T {
assert_eq!(values.len(), self.cluster_size(), "vector size must match cluster size");
*values.select_nth_unstable_by(self.quorum_size() - 1, |a, b: &T| a.cmp(b).reverse()).1
*values.select_nth_unstable_by(self.quorum_size() - 1, |a, b| a.cmp(b).reverse()).1
}

/// Sends a message.
Expand All @@ -185,7 +203,7 @@ impl<R: Role> RawNode<R> {
}

/// Sends a message without borrowing self, to allow partial borrows.
fn send_with(tx: &crossbeam::channel::Sender<Envelope>, msg: Envelope) -> Result<()> {
fn send_with(tx: &Sender<Envelope>, msg: Envelope) -> Result<()> {
debug!("Sending {msg:?}");
Ok(tx.send(msg)?)
}
Expand Down Expand Up @@ -404,7 +422,7 @@ impl RawNode<Follower> {
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
node_tx: Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let role = Follower::new(None, 0);
Expand Down Expand Up @@ -1170,14 +1188,10 @@ mod tests {
use super::*;
use crate::encoding::{bincode, Value as _};
use crate::raft::state::test::{self as teststate, KVCommand, KVResponse};
use crate::raft::{
Entry, Request, RequestID, Response, ELECTION_TIMEOUT_RANGE, HEARTBEAT_INTERVAL,
MAX_APPEND_ENTRIES,
};
use crate::raft::Entry;
use crate::storage;
use crossbeam::channel::Receiver;
use std::borrow::Borrow;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::result::Result;
use test_case::test_case;
Expand Down Expand Up @@ -1267,25 +1281,21 @@ mod tests {
//
// Creates a new Raft cluster.
"cluster" => {
let mut opts = Options::default();
let mut args = command.consume_args();
let nodes = args.lookup_parse("nodes")?.unwrap_or(0);
let leader = args.lookup_parse("leader")?;
let heartbeat_interval =
args.lookup_parse("heartbeat_interval")?.unwrap_or(HEARTBEAT_INTERVAL);
let election_timeout = args
.lookup_parse("election_timeout")?
.unwrap_or(ELECTION_TIMEOUT_RANGE.start);
let max_append_entries =
args.lookup_parse("max_append_entries")?.unwrap_or(MAX_APPEND_ENTRIES);
if let Some(heartbeat_interval) = args.lookup_parse("heartbeat_interval")? {
opts.heartbeat_interval = heartbeat_interval;
};
if let Some(election_timeout) = args.lookup_parse("election_timeout")? {
opts.election_timeout_range = election_timeout..election_timeout + 1;
}
if let Some(max_append_entries) = args.lookup_parse("max_append_entries")? {
opts.max_append_entries = max_append_entries;
}
args.reject_rest()?;
self.cluster(
nodes,
leader,
heartbeat_interval,
election_timeout,
max_append_entries,
&mut output,
)?;
self.cluster(nodes, leader, opts, &mut output)?;
}

// deliver [from=ID] [ID...]
Expand Down Expand Up @@ -1455,9 +1465,7 @@ mod tests {
&mut self,
nodes: u8,
leader: Option<NodeID>,
heartbeat_interval: Ticks,
election_timeout: Ticks,
max_append_entries: usize,
opts: Options,
output: &mut String,
) -> Result<(), Box<dyn Error>> {
if !self.ids.is_empty() {
Expand All @@ -1475,12 +1483,7 @@ mod tests {
let peers = self.ids.iter().copied().filter(|i| *i != id).collect();
let log = Log::new(storage::Memory::new())?;
let state = teststate::Emit::new(teststate::KV::new(), applied_tx);
let opts = Options {
heartbeat_interval,
election_timeout_range: election_timeout..election_timeout + 1,
max_append_entries,
};
self.nodes.insert(id, Node::new(id, peers, log, state, node_tx, opts)?);
self.nodes.insert(id, Node::new(id, peers, log, state, node_tx, opts.clone())?);

while applied_rx.try_recv().is_ok() {} // drain first apply

Expand Down

0 comments on commit c821dcf

Please sign in to comment.