Skip to content

Commit

Permalink
raft: use log to track follower votes
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 8, 2024
1 parent a5fdfc6 commit 72b2d9e
Showing 1 changed file with 13 additions and 19 deletions.
32 changes: 13 additions & 19 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,15 @@ impl RawNode<Candidate> {
// We lost the election, follow the winner.
assert_eq!(term, self.term, "Can't follow leader in different term");
info!("Lost election, following leader {} in term {}", leader, term);
let vote = Some(self.id); // by definition
Ok(self.into_role(Follower::new(Some(leader), vote, election_timeout)))
Ok(self.into_role(Follower::new(Some(leader), election_timeout)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
Ok(self.into_role(Follower::new(None, None, election_timeout)))
Ok(self.into_role(Follower::new(None, election_timeout)))
}
}

Expand Down Expand Up @@ -416,17 +415,15 @@ pub struct Follower {
leader_seen: Ticks,
/// The leader_seen timeout before triggering an election.
election_timeout: Ticks,
/// The node we voted for in the current term, if any.
vote: Option<NodeID>,
// Local client requests that have been forwarded to the leader. These are
// aborted on leader/term changes.
forwarded: HashSet<RequestID>,
}

impl Follower {
/// Creates a new follower role.
fn new(leader: Option<NodeID>, vote: Option<NodeID>, election_timeout: Ticks) -> Self {
Self { leader, vote, leader_seen: 0, election_timeout, forwarded: HashSet::new() }
fn new(leader: Option<NodeID>, election_timeout: Ticks) -> Self {
Self { leader, leader_seen: 0, election_timeout, forwarded: HashSet::new() }
}
}

Expand All @@ -442,8 +439,8 @@ impl RawNode<Follower> {
node_tx: crossbeam::channel::Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let (term, vote) = log.get_term_vote();
let role = Follower::new(None, vote, 0);
let term = log.get_term();
let role = Follower::new(None, 0);
let mut node = Self { id, peers, term, log, state, node_tx, opts, role };
node.role.election_timeout = node.gen_election_timeout();
Ok(node)
Expand All @@ -460,14 +457,12 @@ impl RawNode<Follower> {
} else {
assert!(self.role.forwarded.is_empty(), "Leaderless follower has forwarded requests");
}
assert!(self.role.leader_seen < self.role.election_timeout, "Election timeout passed");

// NB: We allow vote not in peers, since this can happen when removing
// nodes from the cluster via a cold restart. We also allow vote for
// self, which can happen if we lose an election.

debug_assert_eq!(self.role.vote, self.log.get_term_vote().1, "Vote does not match log");
assert!(self.role.leader_seen < self.role.election_timeout, "Election timeout passed");

Ok(())
}

Expand All @@ -486,7 +481,7 @@ impl RawNode<Follower> {
Ok(node)
}

/// Transitions the candidate into a follower, either a leaderless follower
/// Transitions the follower into a follower, either a leaderless follower
/// in a new term (e.g. if someone holds a new election) or following a
/// leader in the current term once someone wins the election.
fn into_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RawNode<Follower>> {
Expand All @@ -500,15 +495,15 @@ impl RawNode<Follower> {
assert_eq!(self.role.leader, None, "Already have leader in term");
assert_eq!(term, self.term, "Can't follow leader in different term");
info!("Following leader {} in term {}", leader, term);
self.role = Follower::new(Some(leader), self.role.vote, self.role.election_timeout);
self.role = Follower::new(Some(leader), self.role.election_timeout);
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
self.role = Follower::new(None, None, self.gen_election_timeout());
self.role = Follower::new(None, self.gen_election_timeout());
}
Ok(self)
}
Expand Down Expand Up @@ -593,7 +588,7 @@ impl RawNode<Follower> {
// A candidate in this term is requesting our vote.
Message::Campaign { last_index, last_term } => {
// Don't vote if we already voted for someone else in this term.
if let Some(vote) = self.role.vote {
if let (_, Some(vote)) = self.log.get_term_vote() {
if msg.from != vote {
self.send(msg.from, Message::CampaignResponse { vote: false })?;
return Ok(self.into());
Expand All @@ -609,9 +604,8 @@ impl RawNode<Follower> {

// Grant the vote.
info!("Voting for {} in term {} election", msg.from, self.term);
self.send(msg.from, Message::CampaignResponse { vote: true })?;
self.log.set_term(self.term, Some(msg.from))?;
self.role.vote = Some(msg.from);
self.send(msg.from, Message::CampaignResponse { vote: true })?;
}

// We may receive a vote after we lost an election and followed a
Expand Down Expand Up @@ -827,7 +821,7 @@ impl RawNode<Leader> {
self.term = term;
self.log.set_term(term, None)?;
let election_timeout = self.gen_election_timeout();
Ok(self.into_role(Follower::new(None, None, election_timeout)))
Ok(self.into_role(Follower::new(None, election_timeout)))
}

/// Processes a message.
Expand Down

0 comments on commit 72b2d9e

Please sign in to comment.