Skip to content

Commit

Permalink
raft: improve assertions
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 9, 2024
1 parent c6879c6 commit 37cee37
Showing 1 changed file with 45 additions and 91 deletions.
136 changes: 45 additions & 91 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Default for Options {
/// processing inbound messages via step() or by advancing time via tick().
/// These methods consume the current node, and return a new one with a possibly
/// different role. Outbound messages are sent via the given node_tx channel,
/// and must be delivered to the remote peers.
/// and must be delivered to remote peers or clients.
///
/// This enum is the public interface to the node, with a closed set of roles.
/// It wraps the RawNode<Role> types, which implement the actual internal
Expand All @@ -58,6 +58,20 @@ pub enum Node {
Leader(RawNode<Leader>),
}

/// Helper macro which calls a closure on the inner RawNode<R>.
macro_rules! with_rawnode {
($node:expr, $fn:expr) => {{
fn with_rawnode<R: Role, T>(node: &RawNode<R>, f: impl Fn(&RawNode<R>) -> T) -> T {
f(node)
}
match $node {
Node::Candidate(ref n) => with_rawnode(n, $fn),
Node::Follower(ref n) => with_rawnode(n, $fn),
Node::Leader(ref n) => with_rawnode(n, $fn),
}
}};
}

impl Node {
/// Creates a new Raft node. It starts as a leaderless follower, waiting to
/// hear from a leader or otherwise transitioning to candidate and
Expand All @@ -81,24 +95,21 @@ impl Node {

/// Returns the node ID.
pub fn id(&self) -> NodeID {
match self {
Node::Candidate(n) => n.id,
Node::Follower(n) => n.id,
Node::Leader(n) => n.id,
}
with_rawnode!(self, |n| n.id)
}

/// Returns the node term.
pub fn term(&self) -> Term {
match self {
Node::Candidate(n) => n.term(),
Node::Follower(n) => n.term(),
Node::Leader(n) => n.term(),
}
with_rawnode!(self, |n| n.term())
}

/// Processes a message from a peer.
/// Processes an inbound message.
pub fn step(self, msg: Envelope) -> Result<Self> {
with_rawnode!(self, |n| {
assert_eq!(msg.to, n.id, "message to other node: {msg:?}");
assert!(n.peers.contains(&msg.from) || msg.from == n.id, "unknown sender: {msg:?}");
});

debug!("Stepping {msg:?}");
match self {
Node::Candidate(n) => n.step(msg),
Expand Down Expand Up @@ -221,19 +232,6 @@ impl<R: Role> RawNode<R> {
fn gen_election_timeout(&self) -> Ticks {
rand::thread_rng().gen_range(self.opts.election_timeout_range.clone())
}

/// Asserts message invariants when stepping.
fn assert_step(&self, msg: &Envelope) {
// Messages must be addressed to the local node.
assert_eq!(msg.to, self.id, "Message to other node");

// Senders must be known.
assert!(
msg.from == self.id || self.peers.contains(&msg.from),
"Unknown sender {}",
msg.from
);
}
}

/// A candidate is campaigning to become a leader.
Expand All @@ -256,45 +254,33 @@ impl Candidate {
impl Role for Candidate {}

impl RawNode<Candidate> {
/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
assert_ne!(self.term(), 0, "candidates can't have term 0");
assert!(self.role.votes.contains(&self.id), "candidate did not vote for self");
debug_assert_eq!(Some(self.id), self.log.get_term().1, "log vote does not match self");

assert!(
self.role.election_duration < self.role.election_timeout,
"Election timeout passed"
);

Ok(())
}

/// Transitions the candidate to a follower. We either lost the election and
/// follow the winner, or we discovered a new term in which case we step
/// into it as a leaderless follower.
fn into_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
assert!(term >= self.term(), "term regression {} → {}", self.term(), term);

let election_timeout = self.gen_election_timeout();
if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Lost election, following leader {} in term {}", leader, term);
info!("Lost election, following leader {leader} in term {term}");
Ok(self.into_role(Follower::new(Some(leader), election_timeout)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't be leaderless follower in current term");
info!("Discovered new term {}", term);
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
Ok(self.into_role(Follower::new(None, election_timeout)))
}
}

/// Transitions the candidate to a leader. We won the election.
fn into_leader(self) -> Result<RawNode<Leader>> {
info!("Won election for term {}, becoming leader", self.term());
let (term, vote) = self.log.get_term();
assert_ne!(term, 0, "leaders can't have term 0");
assert_eq!(vote, Some(self.id), "leader did not vote for self");

info!("Won election for term {term}, becoming leader");
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.into_role(Leader::new(peers, last_index));
Expand All @@ -313,9 +299,6 @@ impl RawNode<Candidate> {

/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
self.assert()?;
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
Expand Down Expand Up @@ -369,12 +352,11 @@ impl RawNode<Candidate> {

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.assert()?;

self.role.election_duration += 1;
if self.role.election_duration >= self.role.election_timeout {
self.campaign()?;
}
assert!(self.role.election_duration < self.role.election_timeout, "past election timeout");
Ok(self.into())
}

Expand Down Expand Up @@ -431,24 +413,6 @@ impl RawNode<Follower> {
Ok(node)
}

/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
if let Some(leader) = self.role.leader {
assert_ne!(leader, self.id, "Can't follow self");
assert!(self.peers.contains(&leader), "Leader not in peers");
assert_ne!(self.term(), 0, "Followers with leaders can't have term 0");
} else {
assert!(self.role.forwarded.is_empty(), "Leaderless follower has forwarded requests");
}
assert!(self.role.leader_seen < self.role.election_timeout, "Election timeout passed");

// NB: We allow vote not in peers, since this can happen when removing
// nodes from the cluster via a cold restart. We also allow vote for
// self, which can happen if we lose an election.

Ok(())
}

/// Transitions the follower into a candidate, by campaigning for
/// leadership in a new term.
fn into_candidate(mut self) -> Result<RawNode<Candidate>> {
Expand All @@ -458,31 +422,39 @@ impl RawNode<Follower> {
// Apply any pending log entries, so that we're caught up if we win.
self.maybe_apply()?;

// Become candidate and campaign.
let election_timeout = self.gen_election_timeout();
let mut node = self.into_role(Candidate::new(election_timeout));
node.campaign()?;

let (term, vote) = node.log.get_term();
assert!(node.role.votes.contains(&node.id), "candidate did not vote for self");
assert_ne!(term, 0, "candidate can't have term 0");
assert_eq!(vote, Some(node.id), "log vote does not match self");

Ok(node)
}

/// Transitions the follower into a follower, either a leaderless follower
/// in a new term (e.g. if someone holds a new election) or following a
/// leader in the current term once someone wins the election.
fn into_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term(), "term regression {} → {}", self.term(), term);
assert_ne!(term, 0, "can't become follower in term 0");

// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;

if let Some(leader) = leader {
// We found a leader in the current term.
assert_eq!(self.role.leader, None, "Already have leader in term");
assert_eq!(term, self.term(), "Can't follow leader in different term");
assert!(self.peers.contains(&leader), "leader is not a peer");
assert_eq!(self.role.leader, None, "already have leader in term");
assert_eq!(term, self.term(), "can't follow leader in different term");
info!("Following leader {leader} in term {term}");
self.role = Follower::new(Some(leader), self.role.election_timeout);
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term(), "can't be leaderless follower in current term");
assert_ne!(term, self.term(), "can't become leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
self.role = Follower::new(None, self.gen_election_timeout());
Expand All @@ -492,9 +464,6 @@ impl RawNode<Follower> {

/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
self.assert()?;
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
Expand Down Expand Up @@ -630,8 +599,6 @@ impl RawNode<Follower> {

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.assert()?;

self.role.leader_seen += 1;
if self.role.leader_seen >= self.role.election_timeout {
return Ok(self.into_candidate()?.into());
Expand Down Expand Up @@ -780,19 +747,11 @@ impl Leader {
impl Role for Leader {}

impl RawNode<Leader> {
/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
assert_ne!(self.term(), 0, "leaders can't have term 0");
debug_assert_eq!(Some(self.id), self.log.get_term().1, "vote does not match self");
Ok(())
}

/// Transitions the leader into a follower. This can only happen if we
/// discover a new term, so we become a leaderless follower. Subsequently
/// stepping the received message may discover the leader, if there is one.
fn into_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term(), "term regression {} → {}", self.term(), term);
assert!(term > self.term(), "can only become follower in later term");
assert!(term > self.term(), "leader can only become follower in later term");

info!("Discovered new term {term}");

Expand All @@ -817,9 +776,6 @@ impl RawNode<Leader> {

/// Processes a message.
fn step(mut self, msg: Envelope) -> Result<Node> {
self.assert()?;
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term() {
debug!("Dropping message from past term ({:?})", msg);
Expand Down Expand Up @@ -980,8 +936,6 @@ impl RawNode<Leader> {

/// Processes a logical clock tick.
fn tick(mut self) -> Result<Node> {
self.assert()?;

self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= self.opts.heartbeat_interval {
self.heartbeat()?;
Expand Down

0 comments on commit 37cee37

Please sign in to comment.