Skip to content

Commit

Permalink
raft: use log to store current term
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 8, 2024
1 parent 72b2d9e commit 1a6bb76
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 57 deletions.
1 change: 1 addition & 0 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl Log {
}

/// Returns the current term (0 if none).
/// TODO: consider adding a RawNode.term() shorthand instead.
pub fn get_term(&self) -> Term {
self.term
}
Expand Down
95 changes: 38 additions & 57 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ impl Node {
/// Returns the node term.
pub fn term(&self) -> Term {
match self {
Node::Candidate(n) => n.term,
Node::Follower(n) => n.term,
Node::Leader(n) => n.term,
Node::Candidate(n) => n.log.get_term(),
Node::Follower(n) => n.log.get_term(),
Node::Leader(n) => n.log.get_term(),
}
}

Expand Down Expand Up @@ -136,7 +136,6 @@ pub trait Role {}
pub struct RawNode<R: Role = Follower> {
id: NodeID,
peers: HashSet<NodeID>,
term: Term,
log: Log,
state: Box<dyn State>,
node_tx: crossbeam::channel::Sender<Envelope>,
Expand All @@ -150,7 +149,6 @@ impl<R: Role> RawNode<R> {
RawNode {
id: self.id,
peers: self.peers,
term: self.term,
log: self.log,
state: self.state,
node_tx: self.node_tx,
Expand Down Expand Up @@ -207,7 +205,7 @@ impl<R: Role> RawNode<R> {

/// Sends a message.
fn send(&self, to: NodeID, message: Message) -> Result<()> {
let msg = Envelope { from: self.id, to, term: self.term, message };
let msg = Envelope { from: self.id, to, term: self.log.get_term(), message };
debug!("Sending {msg:?}");
Ok(self.node_tx.send(msg)?)
}
Expand All @@ -226,12 +224,6 @@ impl<R: Role> RawNode<R> {
rand::thread_rng().gen_range(self.opts.election_timeout_range.clone())
}

/// Asserts common node invariants.
fn assert_node(&mut self) -> Result<()> {
debug_assert_eq!(self.term, self.log.get_term(), "Term does not match log");
Ok(())
}

/// Asserts message invariants when stepping.
fn assert_step(&self, msg: &Envelope) {
// Messages must be addressed to the local node.
Expand Down Expand Up @@ -268,9 +260,7 @@ impl Role for Candidate {}
impl RawNode<Candidate> {
/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
self.assert_node()?;

assert_ne!(self.term, 0, "Candidates can't have term 0");
assert_ne!(self.log.get_term(), 0, "Candidates can't have term 0");
assert!(self.role.votes.contains(&self.id), "Candidate did not vote for self");
debug_assert_eq!(Some(self.id), self.log.get_term_vote().1, "Log vote does not match self");

Expand All @@ -286,28 +276,27 @@ impl RawNode<Candidate> {
/// follow the winner, or we discovered a new term in which case we step
/// into it as a leaderless follower.
fn into_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RawNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);
assert!(term >= self.log.get_term(), "term regression {} {}", self.log.get_term(), term);

let election_timeout = self.gen_election_timeout();
if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term, "Can't follow leader in different term");
assert_eq!(term, self.log.get_term(), "can't follow leader in different term");
info!("Lost election, following leader {} in term {}", leader, term);
Ok(self.into_role(Follower::new(Some(leader), election_timeout)))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
assert_ne!(term, self.log.get_term(), "can't be leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
Ok(self.into_role(Follower::new(None, election_timeout)))
}
}

/// Transitions the candidate to a leader. We won the election.
fn into_leader(self) -> Result<RawNode<Leader>> {
info!("Won election for term {}, becoming leader", self.term);
info!("Won election for term {}, becoming leader", self.log.get_term());
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let mut node = self.into_role(Leader::new(peers, last_index));
Expand All @@ -330,15 +319,15 @@ impl RawNode<Candidate> {
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term {
if msg.term < self.log.get_term() {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term {
if msg.term > self.log.get_term() {
return self.into_follower(msg.term, None)?.step(msg);
}

Expand Down Expand Up @@ -394,11 +383,10 @@ impl RawNode<Candidate> {
/// Campaign for leadership by increasing the term, voting for ourself, and
/// soliciting votes from all peers.
fn campaign(&mut self) -> Result<()> {
let term = self.term + 1;
info!("Starting new election for term {}", term);
let term = self.log.get_term() + 1;
info!("Starting new election for term {term}");
self.role = Candidate::new(self.gen_election_timeout());
self.role.votes.insert(self.id); // vote for ourself
self.term = term;
self.log.set_term(term, Some(self.id))?;

let (last_index, last_term) = self.log.get_last_index();
Expand Down Expand Up @@ -439,21 +427,18 @@ impl RawNode<Follower> {
node_tx: crossbeam::channel::Sender<Envelope>,
opts: Options,
) -> Result<Self> {
let term = log.get_term();
let role = Follower::new(None, 0);
let mut node = Self { id, peers, term, log, state, node_tx, opts, role };
let mut node = Self { id, peers, log, state, node_tx, opts, role };
node.role.election_timeout = node.gen_election_timeout();
Ok(node)
}

/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
self.assert_node()?;

if let Some(leader) = self.role.leader {
assert_ne!(leader, self.id, "Can't follow self");
assert!(self.peers.contains(&leader), "Leader not in peers");
assert_ne!(self.term, 0, "Followers with leaders can't have term 0");
assert_ne!(self.log.get_term(), 0, "Followers with leaders can't have term 0");
} else {
assert!(self.role.forwarded.is_empty(), "Leaderless follower has forwarded requests");
}
Expand Down Expand Up @@ -485,23 +470,22 @@ impl RawNode<Follower> {
/// in a new term (e.g. if someone holds a new election) or following a
/// leader in the current term once someone wins the election.
fn into_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);
assert!(term >= self.log.get_term(), "term regression {} {}", self.log.get_term(), term);

// Abort any forwarded requests. These must be retried with new leader.
self.abort_forwarded()?;

if let Some(leader) = leader {
// We found a leader in the current term.
assert_eq!(self.role.leader, None, "Already have leader in term");
assert_eq!(term, self.term, "Can't follow leader in different term");
info!("Following leader {} in term {}", leader, term);
assert_eq!(term, self.log.get_term(), "Can't follow leader in different term");
info!("Following leader {leader} in term {term}");
self.role = Follower::new(Some(leader), self.role.election_timeout);
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
assert_ne!(term, self.log.get_term(), "can't be leaderless follower in current term");
info!("Discovered new term {term}");
self.log.set_term(term, None)?;
self.role = Follower::new(None, self.gen_election_timeout());
}
Expand All @@ -514,15 +498,15 @@ impl RawNode<Follower> {
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term {
if msg.term < self.log.get_term() {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term {
if msg.term > self.log.get_term() {
return self.into_follower(None, msg.term)?.step(msg);
}

Expand Down Expand Up @@ -603,8 +587,8 @@ impl RawNode<Follower> {
}

// Grant the vote.
info!("Voting for {} in term {} election", msg.from, self.term);
self.log.set_term(self.term, Some(msg.from))?;
info!("Voting for {} in term {} election", msg.from, msg.term);
self.log.set_term(msg.term, Some(msg.from))?;
self.send(msg.from, Message::CampaignResponse { vote: true })?;
}

Expand Down Expand Up @@ -787,22 +771,19 @@ impl Role for Leader {}
impl RawNode<Leader> {
/// Asserts internal invariants.
fn assert(&mut self) -> Result<()> {
self.assert_node()?;

assert_ne!(self.term, 0, "Leaders can't have term 0");
debug_assert_eq!(Some(self.id), self.log.get_term_vote().1, "Log vote does not match self");

assert_ne!(self.log.get_term(), 0, "leaders can't have term 0");
debug_assert_eq!(Some(self.id), self.log.get_term_vote().1, "vote does not match self");
Ok(())
}

/// Transitions the leader into a follower. This can only happen if we
/// discover a new term, so we become a leaderless follower. Subsequently
/// stepping the received message may discover the leader, if there is one.
fn into_follower(mut self, term: Term) -> Result<RawNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);
assert!(term > self.term, "Can only become follower in later term");
assert!(term >= self.log.get_term(), "term regression {} {}", self.log.get_term(), term);
assert!(term > self.log.get_term(), "can only become follower in later term");

info!("Discovered new term {}", term);
info!("Discovered new term {term}");

// Cancel in-flight requests.
for write in std::mem::take(&mut self.role.writes).into_values().sorted_by_key(|w| w.id) {
Expand All @@ -818,7 +799,6 @@ impl RawNode<Leader> {
)?;
}

self.term = term;
self.log.set_term(term, None)?;
let election_timeout = self.gen_election_timeout();
Ok(self.into_role(Follower::new(None, election_timeout)))
Expand All @@ -830,15 +810,15 @@ impl RawNode<Leader> {
self.assert_step(&msg);

// Drop messages from past terms.
if msg.term < self.term {
if msg.term < self.log.get_term() {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term {
if msg.term > self.log.get_term() {
return self.into_follower(msg.term)?.step(msg);
}

Expand Down Expand Up @@ -953,7 +933,7 @@ impl RawNode<Leader> {
Message::ClientRequest { id, request: Request::Status } => {
let status = Status {
leader: self.id,
term: self.term,
term: self.log.get_term(),
match_index: self
.role
.progress
Expand Down Expand Up @@ -1005,7 +985,7 @@ impl RawNode<Leader> {
let (commit_index, _) = self.log.get_commit_index();
let read_seq = self.role.read_seq;

assert_eq!(last_term, self.term, "leader has stale last_term");
assert_eq!(last_term, self.log.get_term(), "leader has stale last_term");

self.broadcast(Message::Heartbeat { last_index, commit_index, read_seq })?;
// NB: We don't reset self.since_heartbeat here, because we want to send
Expand Down Expand Up @@ -1059,7 +1039,7 @@ impl RawNode<Leader> {
// We can only safely commit an entry from our own term (see figure 8 in
// Raft paper).
commit_index = match self.log.get(quorum_index)? {
Some(entry) if entry.term == self.term => quorum_index,
Some(entry) if entry.term == self.log.get_term() => quorum_index,
Some(_) => return Ok(commit_index),
None => panic!("missing commit index {quorum_index} missing"),
};
Expand All @@ -1068,13 +1048,14 @@ impl RawNode<Leader> {
self.log.commit(commit_index)?;

// Apply entries and respond to client writers.
let term = self.log.get_term();
Self::maybe_apply_with(&mut self.log, &mut self.state, |index, result| -> Result<()> {
if let Some(write) = self.role.writes.remove(&index) {
// TODO: use self.send() or something.
self.node_tx.send(Envelope {
from: self.id,
to: write.from,
term: self.term,
term,
message: Message::ClientResponse {
id: write.id,
response: result.map(Response::Write),
Expand All @@ -1086,7 +1067,7 @@ impl RawNode<Leader> {

// If the commit term changed, there may be pending reads waiting for us
// to commit an entry from our own term. Execute them.
if old_commit_term != self.term {
if old_commit_term != self.log.get_term() {
self.maybe_read()?;
}

Expand All @@ -1106,7 +1087,7 @@ impl RawNode<Leader> {
// linearizability.
let (commit_index, commit_term) = self.log.get_commit_index();
let applied_index = self.state.get_applied_index();
if commit_term < self.term || applied_index < commit_index {
if commit_term < self.log.get_term() || applied_index < commit_index {
return Ok(());
}

Expand Down

0 comments on commit 1a6bb76

Please sign in to comment.