Skip to content

Commit

Permalink
raft: clean up message module
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 8, 2024
1 parent e1c3d9a commit 3cd31cb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 39 deletions.
90 changes: 52 additions & 38 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,50 @@ use crate::error::Result;
use crate::storage;

use serde_derive::{Deserialize, Serialize};
use std::collections::BTreeMap;

/// A message envelope sent between Raft nodes.
/// A message envelope specifying the sender and receiver.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Envelope {
/// The sender.
pub from: NodeID,
/// The recipient.
pub to: NodeID,
/// The sender's current term.
pub term: Term,
/// The recipient.
pub to: NodeID,
/// The message.
pub message: Message,
}

impl encoding::Value for Envelope {}

/// A message sent between Raft nodes.
/// A message sent between Raft nodes. Messages are sent asynchronously (i.e.
/// they are not request/response) and may be dropped or reordered.
///
/// In practice, they are sent across a TCP connection and crossbeam channels
/// ensuring messages are not dropped or reordered as long as the connection
/// remains intact. A message and its response are sent across separate TCP
/// connections (outbound from their respective senders).
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Message {
/// Candidates campaign for leadership by soliciting votes from peers.
/// Votes will only be granted if the candidate's log is at least as
/// up-to-date as the voter.
Campaign {
/// The index of the candidate's last log entry.
last_index: Index,
/// The term of the candidate's last log entry.
last_term: Term,
},

/// Followers may vote for a single candidate per term, but only if the
/// candidate's log is at least as up-to-date as the follower. Candidates
/// implicitly vote for themselves.
CampaignResponse {
/// If true, the follower granted the candidate a vote. A false response
/// isn't necessary, but is emitted for clarity.
vote: bool,
},

/// Leaders send heartbeats periodically, and on client read requests. This
/// serves several purposes:
///
Expand All @@ -41,9 +65,12 @@ pub enum Message {
/// separation of concerns.
Heartbeat {
/// The index of the leader's last log entry. The term is the leader's
/// current term, since it appends a noop entry on election win.
/// current term, since it appends a noop entry on election win. The
/// follower compares this to its own log to determine if it's
/// up-to-date.
last_index: Index,
/// The index of the leader's last committed log entry. It's only safe
/// The index of the leader's last committed log entry. Followers use
/// this to advance their commit index and apply entries. It's only safe
/// to commit this if the local log matches last_index, such that the
/// follower's log is identical to the leader at the commit index.
commit_index: Index,
Expand All @@ -62,21 +89,6 @@ pub enum Message {
read_seq: ReadSequence,
},

/// Candidates campaign for leadership by soliciting votes from peers.
Campaign {
/// The index of the candidate's last stored log entry
last_index: Index,
/// The term of the candidate's last stored log entry
last_term: Term,
},

/// Followers may vote for a single candidate per term, on a first-come
/// first-serve basis. Candidates implicitly vote for themselves.
CampaignResponse {
/// If true, the sender granted a vote for the candidate.
vote: bool,
},

/// Leaders replicate log entries to followers by appending to their logs
/// after the given base entry.
///
Expand All @@ -94,7 +106,7 @@ pub enum Message {
Append {
/// The index of the log entry to append after.
base_index: Index,
/// The term of the log entry to append after.
/// The term of the base entry.
base_term: Term,
/// Log entries to append. Must start at base_index + 1.
entries: Vec<Entry>,
Expand All @@ -103,13 +115,15 @@ pub enum Message {
/// Followers accept or reject appends from the leader depending on whether
/// the base entry matches their log.
AppendResponse {
/// If non-zero, the follower accepted entries to this index. The entire
/// log up to this index is consistent with the leader.
/// If non-zero, the follower appended entries up to this index. The
/// entire log up to this index is consistent with the leader. If no
/// entries were sent (a probe), this will be the matching base index.
match_index: Index,
/// If non-zero, the follower rejected an append at this base index
/// because the base index/term did not match its log. This may be
/// lowered by the follower if its log is shorter than the base index,
/// to avoid trying each missing entry.
/// because the base index/term did not match its log. If the follower's
/// log is shorter than the base index, the reject index will be lowered
/// to the index after its last local index, to avoid probing each
/// missing index.
reject_index: Index,
},

Expand Down Expand Up @@ -143,19 +157,19 @@ pub type ReadSequence = u64;
/// A client request, typically passed through to the state machine.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Request {
/// A state machine read command. This is not replicated, and only evaluted
/// on the leader.
/// A state machine read command, executed via `State::read`. This is not
/// replicated, and only evaluated on the leader.
Read(Vec<u8>),
/// A state machine write command. This is replicated across all nodes, and
/// must result in a deterministic response.
/// A state machine write command, executed via `State::apply`. This is
/// replicated across all nodes, and must produce a deterministic result.
Write(Vec<u8>),
/// Requests Raft cluster status from the leader.
Status,
}

impl encoding::Value for Request {}

/// A client response. This will be wrapped in a Result to handle errors.
/// A client response. This will be wrapped in a Result for error handling.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Response {
/// A state machine read result.
Expand All @@ -168,20 +182,20 @@ pub enum Response {

impl encoding::Value for Response {}

/// Raft cluster status.
/// Raft cluster status. Generated by the leader.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Status {
/// The current Raft leader, which generated this status.
pub leader: NodeID,
/// The current Raft term.
pub term: Term,
/// The match indexes of all nodes. Use a BTreeMap for deterministic
/// debug output.
pub match_index: BTreeMap<NodeID, Index>,
/// The match indexes of all nodes, indicating replication progress. Uses a
/// BTreeMap for test determinism.
pub match_index: std::collections::BTreeMap<NodeID, Index>,
/// The current commit index.
pub commit_index: Index,
/// The current applied index.
pub apply_index: Index,
/// The log storage engine status.
pub storage: storage::engine::Status,
pub storage: storage::Status,
}
1 change: 0 additions & 1 deletion src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,6 @@ impl RawNode<Leader> {
.iter()
.map(|(id, p)| (*id, p.match_index))
.chain(std::iter::once((self.id, self.log.get_last_index().0)))
.sorted()
.collect(),
commit_index: self.log.get_commit_index().0,
apply_index: self.state.get_applied_index(),
Expand Down

0 comments on commit 3cd31cb

Please sign in to comment.