Skip to content

Commit

Permalink
raft: use read/write instead of query/mutate for requests
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Apr 13, 2024
1 parent 1b9d8f2 commit 0b24345
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 39 deletions.
24 changes: 18 additions & 6 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,31 @@ pub type RequestID = Vec<u8>;
/// A read sequence number, used to confirm leadership for linearizable reads.
pub type ReadSequence = u64;

/// A client request.
/// A client request, typically passed through to the state machine.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Request {
Query(Vec<u8>),
Mutate(Vec<u8>),
/// A state machine read command. This is not replicated, and only evaluted
/// on the leader.
Read(Vec<u8>),
/// A state machine write command. This is replicated across all nodes, and
/// must result in a deterministic response.
Write(Vec<u8>),
/// Requests Raft cluster status from the leader.
Status,
}

/// A client response.
/// A client response. This will be wrapped in a Result to handle errors.
///
/// TODO: consider a separate error kind here, or a wrapped Result, to separate
/// fallible state machine operations (returned to the caller) from apply errors
/// (fatal).
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Response {
Query(Vec<u8>),
Mutate(Vec<u8>),
/// A state machine read result.
Read(Vec<u8>),
/// A state machine write result.
Write(Vec<u8>),
/// The current Raft leader status.
Status(Status),
}

Expand Down
5 changes: 1 addition & 4 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,7 @@ mod tests {
from: 1,
to: 1,
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
},
message: Message::ClientRequest { id: vec![0x01], request: Request::Write(vec![0xaf]) },
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(
Expand Down
23 changes: 7 additions & 16 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,10 +868,7 @@ pub mod tests {
from: 1,
to: 1,
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
},
message: Message::ClientRequest { id: vec![0x01], request: Request::Write(vec![0xaf]) },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).forwarded(vec![vec![0x01]]);
assert_messages(
Expand All @@ -882,7 +879,7 @@ pub mod tests {
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
request: Request::Write(vec![0xaf]),
},
}],
);
Expand All @@ -893,7 +890,7 @@ pub mod tests {
term: 3,
message: Message::ClientResponse {
id: vec![0x01],
response: Ok(Response::Mutate(vec![0xaf])),
response: Ok(Response::Write(vec![0xaf])),
},
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).forwarded(vec![]);
Expand All @@ -905,7 +902,7 @@ pub mod tests {
term: 3,
message: Message::ClientResponse {
id: vec![0x01],
response: Ok(Response::Mutate(vec![0xaf])),
response: Ok(Response::Write(vec![0xaf])),
},
}],
);
Expand All @@ -923,10 +920,7 @@ pub mod tests {
from: 1,
to: 1,
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
},
message: Message::ClientRequest { id: vec![0x01], request: Request::Write(vec![0xaf]) },
})?;
assert_node(&mut node).is_follower().term(3).leader(None).forwarded(vec![]);
assert_messages(
Expand All @@ -951,10 +945,7 @@ pub mod tests {
from: 1,
to: 1,
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
},
message: Message::ClientRequest { id: vec![0x01], request: Request::Write(vec![0xaf]) },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).forwarded(vec![vec![0x01]]);
assert_messages(
Expand All @@ -965,7 +956,7 @@ pub mod tests {
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
request: Request::Write(vec![0xaf]),
},
}],
);
Expand Down
15 changes: 6 additions & 9 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl RawNode<Leader> {
// must confirm that we are still the leader by sending a heartbeat
// with the read's sequence number and wait for confirmation from a
// quorum before executing the read.
Message::ClientRequest { id, request: Request::Query(command) } => {
Message::ClientRequest { id, request: Request::Read(command) } => {
self.role.read_seq += 1;
self.role.reads.push_back(Read {
seq: self.role.read_seq,
Expand All @@ -225,7 +225,7 @@ impl RawNode<Leader> {

// A client submitted a write command. Propose it, and track it
// until it's applied and the response is returned to the client.
Message::ClientRequest { id, request: Request::Mutate(command) } => {
Message::ClientRequest { id, request: Request::Write(command) } => {
let index = self.propose(Some(command))?;
self.role.writes.insert(index, Write { from: msg.from, id: id.clone() });
if self.peers.is_empty() {
Expand Down Expand Up @@ -340,7 +340,7 @@ impl RawNode<Leader> {
term: self.term,
message: Message::ClientResponse {
id: write.id,
response: result.map(Response::Mutate),
response: result.map(Response::Write),
},
})?;
}
Expand Down Expand Up @@ -376,7 +376,7 @@ impl RawNode<Leader> {
let result = self.state.query(read.command);
self.send(
read.from,
Message::ClientResponse { id: read.id, response: result.map(Response::Query) },
Message::ClientResponse { id: read.id, response: result.map(Response::Read) },
)?;
}

Expand Down Expand Up @@ -673,7 +673,7 @@ mod tests {
from: 1,
to: 1,
term: 3,
message: Message::ClientRequest { id: vec![0x01], request: Request::Query(vec![0xaf]) },
message: Message::ClientRequest { id: vec![0x01], request: Request::Read(vec![0xaf]) },
})?;
assert_node(&mut node).is_leader().term(3).committed(2).last(5);
for to in peers.iter().copied().sorted() {
Expand Down Expand Up @@ -701,10 +701,7 @@ mod tests {
from: 1,
to: 1,
term: 3,
message: Message::ClientRequest {
id: vec![0x01],
request: Request::Mutate(vec![0xaf]),
},
message: Message::ClientRequest { id: vec![0x01], request: Request::Write(vec![0xaf]) },
})?;
assert_node(&mut node).is_leader().term(3).committed(2).last(6).entry(Entry {
index: 6,
Expand Down
8 changes: 4 additions & 4 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ impl Client {
/// Mutates the Raft state machine, deserializing the response into the
/// return type.
fn mutate<V: DeserializeOwned>(&self, mutation: Mutation) -> Result<V> {
match self.execute(raft::Request::Mutate(bincode::serialize(&mutation)?))? {
raft::Response::Mutate(response) => Ok(bincode::deserialize(&response)?),
match self.execute(raft::Request::Write(bincode::serialize(&mutation)?))? {
raft::Response::Write(response) => Ok(bincode::deserialize(&response)?),
resp => Err(Error::Internal(format!("Unexpected Raft mutation response {:?}", resp))),
}
}

/// Queries the Raft state machine, deserializing the response into the
/// return type.
fn query<V: DeserializeOwned>(&self, query: Query) -> Result<V> {
match self.execute(raft::Request::Query(bincode::serialize(&query)?))? {
raft::Response::Query(response) => Ok(bincode::deserialize(&response)?),
match self.execute(raft::Request::Read(bincode::serialize(&query)?))? {
raft::Response::Read(response) => Ok(bincode::deserialize(&response)?),
resp => Err(Error::Internal(format!("Unexpected Raft query response {:?}", resp))),
}
}
Expand Down

0 comments on commit 0b24345

Please sign in to comment.