Skip to content

Commit

Permalink
raft: add and use Log.scan_apply()
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 9, 2024
1 parent 685a9e2 commit b996472
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 59 deletions.
53 changes: 45 additions & 8 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,8 @@ impl Log {
Ok(self.get(index)?.map(|e| e.term == term).unwrap_or(false))
}

/// Iterates over log entries in the given index range.
pub fn scan(
&mut self,
range: impl std::ops::RangeBounds<Index>,
) -> Result<impl Iterator<Item = Result<Entry>> + '_> {
/// Returns an iterator over log entries in the given index range.
pub fn scan(&mut self, range: impl std::ops::RangeBounds<Index>) -> Result<Iterator> {
use std::ops::Bound;
let from = match range.start_bound() {
Bound::Excluded(&index) => Bound::Excluded(Key::Entry(index).encode()?),
Expand All @@ -235,7 +232,19 @@ impl Log {
Bound::Included(&index) => Bound::Included(Key::Entry(index).encode()?),
Bound::Unbounded => Bound::Included(Key::Entry(Index::MAX).encode()?),
};
Ok(self.engine.scan_dyn((from, to)).map(|r| r.and_then(|(_, v)| Entry::decode(&v))))
Ok(Iterator::new(self.engine.scan_dyn((from, to))))
}

/// Returns an iterator over entries that are ready to apply, starting after
/// the current applied index up to the commit index.
pub fn scan_apply(&mut self, applied_index: Index) -> Result<Iterator> {
// NB: we don't assert that commit_index >= applied_index, because the
// local commit index is not flushed to durable storage -- if lost on
// restart, it can be recovered from a quorum of logs.
if applied_index >= self.commit_index {
return Ok(Iterator::new(Box::new(std::iter::empty())));
}
self.scan(applied_index + 1..=self.commit_index)
}

/// Splices a set of entries into the log and flushes it to disk. The
Expand Down Expand Up @@ -317,6 +326,25 @@ impl Log {
}
}

/// A log entry iterator.
pub struct Iterator<'a> {
inner: Box<dyn storage::ScanIterator + 'a>,
}

impl<'a> Iterator<'a> {
fn new(inner: Box<dyn storage::ScanIterator + 'a>) -> Self {
Self { inner }
}
}

impl<'a> std::iter::Iterator for Iterator<'a> {
type Item = Result<Entry>;

fn next(&mut self) -> Option<Self::Item> {
self.inner.next().map(|r| r.and_then(|(_, v)| Entry::decode(&v)))
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -443,8 +471,17 @@ mod tests {
while let Some(entry) = scan.next().transpose()? {
output.push_str(&format!("{}\n", Self::format_entry(&entry)));
}
if output.is_empty() {
output.push_str("<empty>");
}

// scan_apply APPLIED_INDEX
"scan_apply" => {
let mut args = command.consume_args();
let applied_index =
args.next_pos().ok_or("applied index not given")?.parse()?;
args.reject_rest()?;
let mut scan = self.log.scan_apply(applied_index)?;
while let Some(entry) = scan.next().transpose()? {
output.push_str(&format!("{}\n", Self::format_entry(&entry)));
}
}

Expand Down
75 changes: 30 additions & 45 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,35 +162,6 @@ impl<R: Role> RawNode<R> {
self.log.get_term().0
}

/// Applies any pending, committed entries to the state machine. The command
/// responses are discarded, use maybe_apply_with() instead to access them.
fn maybe_apply(&mut self) -> Result<()> {
Self::maybe_apply_with(&mut self.log, &mut self.state, |_, _| Ok(()))
}

/// Like maybe_apply(), but calls the given closure with the result of every
/// applied command. Not a method, so that the closure can mutate the node.
fn maybe_apply_with<F>(log: &mut Log, state: &mut Box<dyn State>, mut on_apply: F) -> Result<()>
where
F: FnMut(Index, Result<Vec<u8>>) -> Result<()>,
{
let applied_index = state.get_applied_index();
let commit_index = log.get_commit_index().0;
// NB: we don't assert that commit_index >= applied_index, because the
// local commit index is not synced to durable storage -- on restart, it
// can be recovered from a quorum of logs.
if applied_index >= commit_index {
return Ok(());
}

let mut scan = log.scan((applied_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
debug!("Applying {:?}", entry);
on_apply(entry.index, state.apply(entry))?;
}
Ok(())
}

/// Returns the cluster size as number of nodes.
fn cluster_size(&self) -> usize {
self.peers.len() + 1
Expand All @@ -210,9 +181,13 @@ impl<R: Role> RawNode<R> {

/// Sends a message.
fn send(&self, to: NodeID, message: Message) -> Result<()> {
let msg = Envelope { from: self.id, to, term: self.term(), message };
Self::send_with(&self.node_tx, Envelope { from: self.id, to, term: self.term(), message })
}

/// Sends a message without borrowing self, to allow partial borrows.
fn send_with(tx: &crossbeam::channel::Sender<Envelope>, msg: Envelope) -> Result<()> {
debug!("Sending {msg:?}");
Ok(self.node_tx.send(msg)?)
Ok(tx.send(msg)?)
}

/// Broadcasts a message to all peers.
Expand Down Expand Up @@ -656,6 +631,19 @@ impl RawNode<Follower> {
Ok(())
}

/// Applies any pending log entries.
fn maybe_apply(&mut self) -> Result<()> {
let mut iter = self.log.scan_apply(self.state.get_applied_index())?;
while let Some(entry) = iter.next().transpose()? {
debug!("Applying {entry:?}");
// Throw away the result, since there is no client waiting for it.
// This includes errors -- any non-deterministic errors (e.g. IO
// errors) must panic instead to avoid replica divergence.
_ = self.state.apply(entry);
}
Ok(())
}

/// Checks if an address is the current leader.
fn is_leader(&self, from: NodeID) -> bool {
self.role.leader == Some(from)
Expand Down Expand Up @@ -1054,21 +1042,18 @@ impl RawNode<Leader> {

// Apply entries and respond to client writers.
let term = self.term();
Self::maybe_apply_with(&mut self.log, &mut self.state, |index, result| -> Result<()> {
if let Some(write) = self.role.writes.remove(&index) {
// TODO: use self.send() or something.
self.node_tx.send(Envelope {
from: self.id,
to: write.from,
term,
message: Message::ClientResponse {
id: write.id,
response: result.map(Response::Write),
},
})?;
let mut iter = self.log.scan_apply(self.state.get_applied_index())?;
while let Some(entry) = iter.next().transpose()? {
debug!("Applying {entry:?}");
let write = self.role.writes.remove(&entry.index);
let result = self.state.apply(entry);

if let Some(Write { id, from: to }) = write {
let message = Message::ClientResponse { id, response: result.map(Response::Write) };
Self::send_with(&self.node_tx, Envelope { from: self.id, term, to, message })?;
}
Ok(())
})?;
}
drop(iter);

// If the commit term changed, there may be pending reads waiting for us
// to commit an entry from our own term. Execute them.
Expand Down
11 changes: 5 additions & 6 deletions src/raft/testscripts/log/scan
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
scan
scan 3..7
---
<empty>
<empty>
ok

# Append a few entries.
set_term 1
Expand Down Expand Up @@ -31,7 +30,7 @@ scan 2..

scan 4..
---
<empty>
ok

scan 0..
---
Expand All @@ -57,11 +56,11 @@ scan "..7"

scan "..1"
---
<empty>
ok

scan "..0"
---
<empty>
ok

# Both bounds.
scan 1..2
Expand All @@ -81,7 +80,7 @@ scan 0..7

scan 1..1
---
<empty>
ok

# Bounds panics.
!scan 1..0
Expand Down
59 changes: 59 additions & 0 deletions src/raft/testscripts/log/scan_apply
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# scan_apply works on an empty engine, even when given an applied index.
scan_apply 0
scan_apply 3
---
ok

# Append a few entries.
set_term 1
append
append foo
set_term 2
append bar
---
append → 1@1 None
append → 2@1 foo
append → 3@2 bar

# Nothing is committed, so scan_applied yields nothing.
scan_apply 0
---
ok

# Commit the first two entries and apply them.
commit 2
scan_apply 0
---
commit → 2@1 foo
1@1 None
2@1 foo

# Passing the commit index yields nothing.
scan_apply 2
---
ok

# Passing an applied_index after the commit index is ok, and yields nothing.
scan_apply 3
scan_apply 10
---
ok

# Committing and applying the last entry works.
commit 3
scan_apply 2
---
commit → 3@2 bar
3@2 bar

# Scanning from a lower commit index again works.
scan_apply 1
---
2@1 foo
3@2 bar

scan_apply 0
---
1@1 None
2@1 foo
3@2 bar

0 comments on commit b996472

Please sign in to comment.