Skip to content

Commit

Permalink
raft: add and use AppendResponse.reject_index for rejections
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 4, 2024
1 parent 1bdc337 commit 3619a3b
Show file tree
Hide file tree
Showing 27 changed files with 139 additions and 146 deletions.
34 changes: 20 additions & 14 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,34 @@ pub enum Message {
vote: bool,
},

/// Leaders replicate log entries to followers by appending to their logs.
/// Leaders replicate log entries to followers by appending to their logs
/// after the given base entry. If the base entry matches the follower's log
/// then their logs are identical up to it (see section 5.3 in the Raft
/// paper), and the entries can be appended. Otherwise, the append is
/// rejected and the leader must retry an earlier base index until a common
/// base is found.
Append {
/// The index of the log entry immediately preceding the submitted commands.
///
/// TODO: this isn't needed -- determine it from the first entry, and
/// require it to be included.
/// The index of the log entry to append after.
base_index: Index,
/// The term of the log entry immediately preceding the submitted commands.
/// The term of the log entry to append after.
base_term: Term,
/// Commands to replicate.
/// Log entries to append.
entries: Vec<Entry>,
},

/// Followers may accept or reject appending entries from the leader.
/// Followers accept or reject appends from the leader depending on whether
/// the base entry matches in both logs.
AppendResponse {
/// If true, the follower rejected the leader's entries.
reject: bool,
/// If non-zero, the follower rejected an append at this base index
/// because the base index/term did not match its log.
reject_index: Index,
/// The index of the follower's last log entry.
///
/// NB: if the entries already existed in the follower's log, the append
/// is a noop and the last_index may be greater than the latest entry in
/// the append. That's fine -- if we're still the leader then our logs
/// must be consistent, otherwise the follower will have stopped
/// accepting our appends anyway.
last_index: Index,
/// The term of the follower's last log entry.
last_term: Term,
Expand Down Expand Up @@ -146,10 +156,6 @@ pub enum Request {
impl encoding::Value for Request {}

/// A client response. This will be wrapped in a Result to handle errors.
///
/// TODO: consider a separate error kind here, or a wrapped Result, to separate
/// fallible state machine operations (returned to the caller) from apply errors
/// (fatal).
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Response {
/// A state machine read result.
Expand Down
83 changes: 42 additions & 41 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl RawNode<Candidate> {

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// AppendEntries from the leader, stepping it will follow the leader.
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.into_follower(msg.term, None)?.step(msg);
}
Expand Down Expand Up @@ -529,7 +529,7 @@ impl RawNode<Follower> {

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// AppendEntries from the leader, stepping it will follow the leader.
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.into_follower(None, msg.term)?.step(msg);
}
Expand Down Expand Up @@ -566,23 +566,29 @@ impl RawNode<Follower> {
}
}

// Replicate entries from the leader. If we don't have a leader in
// this term yet, follow it.
// Append log entries from the leader to the local log.
Message::Append { base_index, base_term, entries } => {
// Check that the entries are from our leader.
let from = msg.from;
let Some(first) = entries.first() else { panic!("empty append message") };
assert_eq!(base_index, first.index - 1, "base index mismatch");

// Make sure the message comes from our leader.
match self.role.leader {
Some(leader) => assert_eq!(from, leader, "multiple leaders in term"),
None => self = self.into_follower(Some(from), msg.term)?,
Some(leader) => assert_eq!(msg.from, leader, "multiple leaders in term"),
None => self = self.into_follower(Some(msg.from), msg.term)?,
}

// Append the entries, if possible.
let reject = base_index > 0 && !self.log.has(base_index, base_term)?;
if !reject {
// If the base entry is in our log, append the entries.
let mut reject_index = 0;
if base_index == 0 || self.log.has(base_index, base_term)? {
self.log.splice(entries)?;
} else {
reject_index = base_index;
}
let (last_index, last_term) = self.log.get_last_index();
self.send(msg.from, Message::AppendResponse { reject, last_index, last_term })?;
self.send(
msg.from,
Message::AppendResponse { reject_index, last_index, last_term },
)?;
}

// A candidate in this term is requesting our vote.
Expand Down Expand Up @@ -814,7 +820,7 @@ impl RawNode<Leader> {

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// AppendEntries from the leader, stepping it will follow the leader.
// Append from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.into_follower(msg.term)?.step(msg);
}
Expand Down Expand Up @@ -848,41 +854,36 @@ impl RawNode<Leader> {
}
}

// A follower appended log entries we sent it. Record its progress
// and attempt to commit new entries.
Message::AppendResponse { reject: false, last_index, last_term } => {
assert!(
last_index <= self.log.get_last_index().0,
"follower accepted entries after last index"
);
assert!(
last_term <= self.log.get_last_index().1,
"follower accepted entries after last term"
);
// A follower appended our log entries. Record its progress and
// attempt to commit.
Message::AppendResponse { reject_index: 0, last_index, last_term } => {
let (li, lt) = self.log.get_last_index();
assert!(last_index <= li && last_term <= lt, "follower appended future entries");

let progress = self.role.progress.get_mut(&msg.from).unwrap();
let progress = self.role.progress.get_mut(&msg.from).expect("unknown node");
if last_index > progress.last {
progress.last = last_index;
progress.next = last_index + 1;
self.maybe_commit_and_apply()?;
}
}

// A follower rejected log entries we sent it, typically because it
// does not have the base index in its log. Try to replicate from
// the previous entry.
// A follower rejected the log entries because the base entry in
// reject_index did not match its log. Try the previous entry until
// we find a common base.
//
// This linear probing, as described in the Raft paper, can be very
// slow with long divergent logs, but we keep it simple.
//
// TODO: make use of last_index and last_term here.
Message::AppendResponse { reject: true, last_index: _, last_term: _ } => {
self.role.progress.entry(msg.from).and_modify(|p| {
if p.next > 1 {
p.next -= 1
}
});
self.send_log(msg.from)?;
// This linear probing can be slow with long divergent logs, but we
// keep it simple.
Message::AppendResponse { reject_index, last_index, last_term: _ } => {
let progress = self.role.progress.get_mut(&msg.from).expect("unknown node");

// If the next index was rejected, try the previous one.
// Otherwise, the rejection is stale and can be ignored. If the
// follower's log is shorter, skip straight to its last index.
if progress.next == reject_index + 1 {
progress.next = std::cmp::min(progress.next - 1, last_index + 1);
self.send_log(msg.from)?;
}
}

// A client submitted a read command. To ensure linearizability, we
Expand Down Expand Up @@ -1859,8 +1860,8 @@ mod tests {
entries.iter().map(|e| format!("{}@{}", e.index, e.term)).join(" ")
)
}
Message::AppendResponse { reject, last_index, last_term } => {
format!("AppendResponse last={last_index}@{last_term} reject={reject}")
Message::AppendResponse { reject_index, last_index, last_term } => {
format!("AppendResponse last={last_index}@{last_term} reject={reject_index}")
}
Message::ClientRequest { id, request } => {
format!(
Expand Down
8 changes: 4 additions & 4 deletions src/raft/testscripts/node/append
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ n1@1 ⇥ n3 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶]̶
stabilize
---
n2@1 append 2@1 put foo=bar
n2@1 → n1 AppendResponse last=2@1 reject=false
n2@1 → n1 AppendResponse last=2@1 reject=0
n1@1 commit 2@1
n1@1 apply 2@1 put foo=bar
n1@1 → c1 ClientResponse id=0x01 write 0x0102
Expand All @@ -39,7 +39,7 @@ n1@1 ⇥ n3 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶ ̶3̶@
stabilize
---
n2@1 append 3@1 put a=1
n2@1 → n1 AppendResponse last=3@1 reject=false
n2@1 → n1 AppendResponse last=3@1 reject=0
n1@1 commit 3@1
n1@1 apply 3@1 put a=1
n1@1 → c1 ClientResponse id=0x02 write 0x0103
Expand All @@ -61,11 +61,11 @@ n1@1 → n3 Append base=1@1 [2@1 3@1 4@1]
stabilize
---
n2@1 append 4@1 put b=2
n2@1 → n1 AppendResponse last=4@1 reject=false
n2@1 → n1 AppendResponse last=4@1 reject=0
n3@1 append 2@1 put foo=bar
n3@1 append 3@1 put a=1
n3@1 append 4@1 put b=2
n3@1 → n1 AppendResponse last=4@1 reject=false
n3@1 → n1 AppendResponse last=4@1 reject=0
n1@1 commit 4@1
n1@1 apply 4@1 put b=2
n1@1 → c1 ClientResponse id=0x03 write 0x0104
Expand Down
22 changes: 4 additions & 18 deletions src/raft/testscripts/node/append_base_missing
Original file line number Diff line number Diff line change
Expand Up @@ -44,23 +44,9 @@ n2@2 → n3 Append base=3@1 [4@2 5@2]
deliver 3
---
n3@1 follower(n1) ⇨ n3@2 follower(n2)
n3@2 → n2 AppendResponse last=1@1 reject=true
n3@2 → n2 AppendResponse last=1@1 reject=3

# n2 reduces the base by 1 and tries again, but this is also rejected.
#
# TODO: this should retry at 1@1 immediately, since the leader knows the
# follower's last index. The progress also isn't updated.
deliver 2
status 2
---
n2@2 → n3 Append base=2@1 [3@1 4@2 5@2]
n2@2 leader last=5@2 commit=4@2 apply=4 progress={1:4→5 3:0→3}

deliver 3
---
n3@2 → n2 AppendResponse last=1@1 reject=true

# Finally, n2 sends base 1 which succeeds.
# n2 tries replicating using n3's last index as a base, which succeeds.
deliver 2
status 2
---
Expand All @@ -73,7 +59,7 @@ n3@2 append 2@1 put a=1
n3@2 append 3@1 put b=2
n3@2 append 4@2 None
n3@2 append 5@2 put c=3
n3@2 → n2 AppendResponse last=5@2 reject=false
n3@2 → n2 AppendResponse last=5@2 reject=0

# When n2 receives the ack, it commits and applies the write.
deliver 2
Expand All @@ -83,7 +69,7 @@ n2@2 apply 5@2 put c=3
n2@2 → c2 ClientResponse id=0x03 write 0x0105
c2@2 put c=3 ⇒ 5

# The progress is also updated (finally).
# The progress is also updated.
status
---
n1@2 follower(n2) last=4@2 commit=4@2 apply=4
Expand Down
20 changes: 10 additions & 10 deletions src/raft/testscripts/node/append_commit_quorum
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ n1@1 ⇥ n4 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶]̶
n1@1 ⇥ n5 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶]̶
n1@1 ⇥ n6 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶]̶
n2@1 append 2@1 put a=1
n2@1 → n1 AppendResponse last=2@1 reject=false
n2@1 → n1 AppendResponse last=2@1 reject=0

status
---
Expand Down Expand Up @@ -59,7 +59,7 @@ n1@1 ⇥ n5 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶ ̶3̶@
n1@1 ⇥ n6 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶ ̶3̶@̶1̶]̶
n3@1 append 2@1 put a=1
n3@1 append 3@1 put b=2
n3@1 → n1 AppendResponse last=3@1 reject=false
n3@1 → n1 AppendResponse last=3@1 reject=0

status
---
Expand Down Expand Up @@ -90,7 +90,7 @@ n1@1 ⇥ n6 A̶p̶p̶e̶n̶d̶ ̶b̶a̶s̶e̶=̶1̶@̶1̶ ̶[̶2̶@̶1̶ ̶3̶@
n4@1 append 2@1 put a=1
n4@1 append 3@1 put b=2
n4@1 append 4@1 put c=3
n4@1 → n1 AppendResponse last=4@1 reject=false
n4@1 → n1 AppendResponse last=4@1 reject=0
n1@1 commit 2@1
n1@1 apply 2@1 put a=1
n1@1 → c1 ClientResponse id=0x01 write 0x0102
Expand Down Expand Up @@ -126,7 +126,7 @@ n5@1 append 2@1 put a=1
n5@1 append 3@1 put b=2
n5@1 append 4@1 put c=3
n5@1 append 5@1 put d=4
n5@1 → n1 AppendResponse last=5@1 reject=false
n5@1 → n1 AppendResponse last=5@1 reject=0
n1@1 commit 3@1
n1@1 apply 3@1 put b=2
n1@1 → c1 ClientResponse id=0x02 write 0x0103
Expand Down Expand Up @@ -163,7 +163,7 @@ n6@1 append 3@1 put b=2
n6@1 append 4@1 put c=3
n6@1 append 5@1 put d=4
n6@1 append 6@1 put e=5
n6@1 → n1 AppendResponse last=6@1 reject=false
n6@1 → n1 AppendResponse last=6@1 reject=0
n1@1 commit 4@1
n1@1 apply 4@1 put c=3
n1@1 → c1 ClientResponse id=0x03 write 0x0104
Expand Down Expand Up @@ -199,21 +199,21 @@ n2@1 append 4@1 put c=3
n2@1 append 5@1 put d=4
n2@1 append 6@1 put e=5
n2@1 append 7@1 put f=6
n2@1 → n1 AppendResponse last=7@1 reject=false
n2@1 → n1 AppendResponse last=7@1 reject=0
n3@1 append 4@1 put c=3
n3@1 append 5@1 put d=4
n3@1 append 6@1 put e=5
n3@1 append 7@1 put f=6
n3@1 → n1 AppendResponse last=7@1 reject=false
n3@1 → n1 AppendResponse last=7@1 reject=0
n4@1 append 5@1 put d=4
n4@1 append 6@1 put e=5
n4@1 append 7@1 put f=6
n4@1 → n1 AppendResponse last=7@1 reject=false
n4@1 → n1 AppendResponse last=7@1 reject=0
n5@1 append 6@1 put e=5
n5@1 append 7@1 put f=6
n5@1 → n1 AppendResponse last=7@1 reject=false
n5@1 → n1 AppendResponse last=7@1 reject=0
n6@1 append 7@1 put f=6
n6@1 → n1 AppendResponse last=7@1 reject=false
n6@1 → n1 AppendResponse last=7@1 reject=0
n1@1 commit 5@1
n1@1 apply 5@1 put d=4
n1@1 → c1 ClientResponse id=0x04 write 0x0105
Expand Down
Loading

0 comments on commit 3619a3b

Please sign in to comment.