Skip to content

Commit

Permalink
raft: improve assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 9, 2024
1 parent 3c3ce5c commit 952d8b1
Showing 1 changed file with 63 additions and 101 deletions.
164 changes: 63 additions & 101 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
Envelope, Index, Log, Message, ReadSequence, Request, RequestID, Response, State, Status,
};
use crate::errinput;
use crate::error::{Error, Result};

use itertools::Itertools as _;
Expand Down Expand Up @@ -51,6 +52,32 @@ pub enum Node {
Leader(RawNode<Leader>),
}

/// Helper macro which calls a closure on the inner RawNode<R>.
macro_rules! with_rawnode {
// Borrowed (using ref keyword).
(ref $node:expr, $fn:expr) => {{
fn with_rawnode<R: Role, T>(node: &RawNode<R>, f: impl Fn(&RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(ref n) => with_rawnode(n, $fn),
Node::Follower(ref n) => with_rawnode(n, $fn),
Node::Leader(ref n) => with_rawnode(n, $fn),
}
}};
// Owned.
($node:expr, $fn:expr) => {{
fn with_rawnode<R: Role, T>(node: RawNode<R>, f: impl FnOnce(RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(n) => with_rawnode(n, $fn),
Node::Follower(n) => with_rawnode(n, $fn),
Node::Leader(n) => with_rawnode(n, $fn),
}
}};
}

impl Node {
/// Creates a new Raft node, starting as a leaderless follower, or leader if
/// there are no peers.
Expand All @@ -72,39 +99,28 @@ impl Node {

/// Returns the node ID.
pub fn id(&self) -> NodeID {
match self {
Node::Candidate(n) => n.id,
Node::Follower(n) => n.id,
Node::Leader(n) => n.id,
}
with_rawnode!(ref self, |n| n.id)
}

/// Returns the node term.
pub fn term(&self) -> Term {
match self {
Node::Candidate(n) => n.term(),
Node::Follower(n) => n.term(),
Node::Leader(n) => n.term(),
}
with_rawnode!(ref self, |n| n.term())
}

/// Processes a message from a peer.
/// Processes an inbound message.
pub fn step(self, msg: Envelope) -> Result<Self> {
debug!("Stepping {:?}", msg);
match self {
Node::Candidate(n) => n.step(msg),
Node::Follower(n) => n.step(msg),
Node::Leader(n) => n.step(msg),
}
with_rawnode!(ref self, |n| {
assert_eq!(msg.to, n.id, "message to other node: {msg:?}");
assert!(n.peers.contains(&msg.from) || msg.from == n.id, "unknown sender: {msg:?}");
});

debug!("Stepping {msg:?}");
with_rawnode!(self, |n| n.step(msg))
}

/// Moves time forward by a tick.
pub fn tick(self) -> Result<Self> {
match self {
Node::Candidate(n) => n.tick(),
Node::Follower(n) => n.tick(),
Node::Leader(n) => n.tick(),
}
with_rawnode!(self, |n| n.tick())
}
}

Expand Down Expand Up @@ -203,19 +219,6 @@ impl<R: Role> RawNode<R> {
fn gen_election_timeout(&self) -> Ticks {
rand::thread_rng().gen_range(self.opts.election_timeout_range.clone())
}

/// Asserts message invariants when stepping.
fn assert_step(&self, msg: &Envelope) {
// Messages must be addressed to the local node.
assert_eq!(msg.to, self.id, "Message to other node");

// Senders must be known.
assert!(
msg.from == self.id || self.peers.contains(&msg.from),
"Unknown sender {}",
msg.from
);
}
}

/// A candidate is campaigning to become a leader.
Expand All @@ -238,45 +241,33 @@ impl Candidate {
impl Role for Candidate {}

impl RawNode<Candidate> {
/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
assert_ne!(self.term(), 0, "candidates can't have term 0");
assert!(self.role.votes.contains(&self.id), "candidate did not vote for self");
debug_assert_eq!(Some(self.id), self.log.get_term().1, "log vote does not match self");

assert!(
self.role.election_duration < self.role.election_timeout,
"Election timeout passed"
);

Ok(())
}

/// Transitions the candidate to a follower. We either lost the election and
/// follow the winner, or we discovered a new term in which case we step
/// into it as a leaderless follower.
fn into_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
assert!(term >= self.term(), "term regression {} → {}", self.term(), term);

let election_timeout = self.gen_election_timeout();
if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Lost election, following leader {} in term {}", leader, term);
info!("Lost election, following leader {leader} in term {term}");
Ok(self.into_role(Follower::new(Some(leader), election_timeout)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't be leaderless follower in current term");
info!("Discovered new term {}", term);
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
Ok(self.into_role(Follower::new(None, election_timeout)))
}
}

/// Transitions the candidate to a leader. We won the election.
fn into_leader(self) -> Result<RawNode<Leader>> {
info!("Won election for term {}, becoming leader", self.term());
let (term, vote) = self.log.get_term();
assert_ne!(term, 0, "leaders can't have term 0");
assert_eq!(vote, Some(self.id), "leader did not vote for self");

info!("Won election for term {term}, becoming leader");
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.into_role(Leader::new(peers, last_index));
Expand All @@ -295,9 +286,6 @@ impl RawNode<Candidate> {

/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
self.assert()?;
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
Expand Down Expand Up @@ -351,12 +339,11 @@ impl RawNode<Candidate> {

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.assert()?;

self.role.election_duration += 1;
if self.role.election_duration >= self.role.election_timeout {
self.campaign()?;
}
assert!(self.role.election_duration < self.role.election_timeout, "past election timeout");
Ok(self.into())
}

Expand Down Expand Up @@ -407,30 +394,15 @@ impl RawNode<Follower> {
node_tx: crossbeam::channel::Sender<Envelope>,
opts: Options,
) -> Result<Self> {
if peers.contains(&id) {
return errinput!("node ID {id} can't be in peers");
}
let role = Follower::new(None, 0);
let mut node = Self { id, peers, log, state, node_tx, opts, role };
node.role.election_timeout = node.gen_election_timeout();
Ok(node)
}

/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
if let Some(leader) = self.role.leader {
assert_ne!(leader, self.id, "Can't follow self");
assert!(self.peers.contains(&leader), "Leader not in peers");
assert_ne!(self.term(), 0, "Followers with leaders can't have term 0");
} else {
assert!(self.role.forwarded.is_empty(), "Leaderless follower has forwarded requests");
}
assert!(self.role.leader_seen < self.role.election_timeout, "Election timeout passed");

// NB: We allow vote not in peers, since this can happen when removing
// nodes from the cluster via a cold restart. We also allow vote for
// self, which can happen if we lose an election.

Ok(())
}

/// Transitions the follower into a candidate, by campaigning for
/// leadership in a new term.
fn into_candidate(mut self) -> Result<RawNode<Candidate>> {
Expand All @@ -440,31 +412,39 @@ impl RawNode<Follower> {
// Apply any pending log entries, so that we're caught up if we win.
self.maybe_apply()?;

// Become candidate and campaign.
let election_timeout = self.gen_election_timeout();
let mut node = self.into_role(Candidate::new(election_timeout));
node.campaign()?;

let (term, vote) = node.log.get_term();
assert!(node.role.votes.contains(&node.id), "candidate did not vote for self");
assert_ne!(term, 0, "candidate can't have term 0");
assert_eq!(vote, Some(node.id), "log vote does not match self");

Ok(node)
}

/// Transitions the follower into a follower, either a leaderless follower
/// in a new term (e.g. if someone holds a new election) or following a
/// leader in the current term once someone wins the election.
fn into_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term(), "term regression {} → {}", self.term(), term);
assert_ne!(term, 0, "can't become follower in term 0");

// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;

if let Some(leader) = leader {
// We found a leader in the current term.
assert_eq!(self.role.leader, None, "Already have leader in term");
assert_eq!(term, self.term(), "Can't follow leader in different term");
assert!(self.peers.contains(&leader), "leader is not a peer");
assert_eq!(self.role.leader, None, "already have leader in term");
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Following leader {leader} in term {term}");
self.role = Follower::new(Some(leader), self.role.election_timeout);
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't be leaderless follower in current term");
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
self.role = Follower::new(None, self.gen_election_timeout());
Expand All @@ -474,9 +454,6 @@ impl RawNode<Follower> {

/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
self.assert()?;
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
Expand Down Expand Up @@ -612,8 +589,6 @@ impl RawNode<Follower> {

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.assert()?;

self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.into_candidate()?.into());
Expand Down Expand Up @@ -762,19 +737,11 @@ impl Leader {
impl Role for Leader {}

impl RawNode<Leader> {
/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
assert_ne!(self.term(), 0, "leaders can't have term 0");
debug_assert_eq!(Some(self.id), self.log.get_term().1, "vote does not match self");
Ok(())
}

/// Transitions the leader into a follower. This can only happen if we
/// discover a new term, so we become a leaderless follower. Subsequently
/// stepping the received message may discover the leader, if there is one.
fn into_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term(), "term regression {} → {}", self.term(), term);
assert!(term > self.term(), "can only become follower in later term");
assert!(term > self.term(), "leader can only become follower in later term");

info!("Discovered new term {term}");

Expand All @@ -799,9 +766,6 @@ impl RawNode<Leader> {

/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
self.assert()?;
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
Expand Down Expand Up @@ -962,8 +926,6 @@ impl RawNode<Leader> {

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.assert()?;

self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= self.opts.heartbeat_interval {
self.heartbeat()?;
Expand Down

0 comments on commit 952d8b1

Please sign in to comment.