Skip to content

Commit

Permalink
raft: use Uuid type for request IDs
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 8, 2024
1 parent 3cd31cb commit 16dcc86
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde = "1.0.200"
serde_bytes = "0.11.14"
serde_derive = "1.0.200"
simplelog = "0.12.2"
uuid = { version = "1.8.0", features = ["v4"] }
uuid = { version = "1.8.0", features = ["serde", "v4"] }

[dev-dependencies]
escargot = "0.5.10"
Expand Down
14 changes: 8 additions & 6 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,24 +132,26 @@ pub enum Message {
/// leader or term changes, the request is aborted with an Error::Abort
/// ClientResponse and the client must retry.
ClientRequest {
/// The request ID. This is arbitrary, but must be globally unique for
/// the duration of the request.
/// The request ID. Must be globally unique for the request duration.
id: RequestID,
/// The request.
/// The request itself.
request: Request,
},

/// A client response.
ClientResponse {
/// The response ID. This matches the ID of the ClientRequest.
/// The ID of the original ClientRequest.
id: RequestID,
/// The response, or an error.
response: Result<Response>,
},
}

/// A client request ID.
pub type RequestID = Vec<u8>;
/// A client request ID. Must be globally unique for the duration of the
/// request. For simplicity, a random UUIDv4 is used -- the node ID and process
/// ID could be incorporated for further collision avoidance, but it does not
/// matter at this scale.
pub type RequestID = uuid::Uuid;

/// A read sequence number, used to confirm leadership for linearizable reads.
pub type ReadSequence = u64;
Expand Down
22 changes: 9 additions & 13 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,10 +622,9 @@ impl RawNode<Follower> {

// Forward client requests to the leader, or abort them if there is
// none (the client must retry).
Message::ClientRequest { ref id, .. } => {
Message::ClientRequest { id, .. } => {
assert_eq!(msg.from, self.id, "Client request from other node");

let id = id.clone();
if let Some(leader) = self.role.leader {
debug!("Forwarding request to leader {}: {:?}", leader, msg);
self.role.forwarded.insert(id);
Expand Down Expand Up @@ -818,16 +817,13 @@ impl RawNode<Leader> {
info!("Discovered new term {}", term);

// Cancel in-flight requests.
for write in
std::mem::take(&mut self.role.writes).into_values().sorted_by_key(|w| w.id.clone())
{
for write in std::mem::take(&mut self.role.writes).into_values().sorted_by_key(|w| w.id) {
self.send(
write.from,
Message::ClientResponse { id: write.id, response: Err(Error::Abort) },
)?;
}
for read in std::mem::take(&mut self.role.reads).into_iter().sorted_by_key(|r| r.id.clone())
{
for read in std::mem::take(&mut self.role.reads).into_iter().sorted_by_key(|r| r.id) {
self.send(
read.from,
Message::ClientResponse { id: read.id, response: Err(Error::Abort) },
Expand Down Expand Up @@ -960,7 +956,7 @@ impl RawNode<Leader> {
// until it's applied and the response is returned to the client.
Message::ClientRequest { id, request: Request::Write(command) } => {
let index = self.propose(Some(command))?;
self.role.writes.insert(index, Write { from: msg.from, id: id.clone() });
self.role.writes.insert(index, Write { from: msg.from, id });
if self.peers.is_empty() {
self.maybe_commit_and_apply()?;
}
Expand Down Expand Up @@ -1281,7 +1277,7 @@ mod tests {
/// In-flight client requests.
requests: HashMap<RequestID, Request>,
/// The request ID to use for the next client request.
next_request_id: u8,
next_request_id: u64,
}

impl goldenscript::Runner for TestRunner {
Expand Down Expand Up @@ -1687,9 +1683,9 @@ mod tests {
output: &mut String,
) -> Result<(), Box<dyn Error>> {
let node = self.nodes.get(&id).ok_or(format!("unknown node {id}"))?;
let request_id = vec![self.next_request_id];
let request_id = uuid::Uuid::from_u64_pair(0, self.next_request_id);
self.next_request_id += 1;
self.requests.insert(request_id.clone(), request.clone());
self.requests.insert(request_id, request.clone());

let msg = Envelope {
from: node.id(),
Expand Down Expand Up @@ -2004,7 +2000,7 @@ mod tests {
Message::ClientRequest { id, request } => {
format!(
"ClientRequest id=0x{} {}",
hex::encode(id),
hex::encode(id).trim_start_matches("00"),
match request {
Request::Read(v) => format!("read 0x{}", hex::encode(v)),
Request::Write(v) => format!("write 0x{}", hex::encode(v)),
Expand All @@ -2015,7 +2011,7 @@ mod tests {
Message::ClientResponse { id, response } => {
format!(
"ClientResponse id=0x{} {}",
hex::encode(id),
hex::encode(id).trim_start_matches("00"),
match response {
Ok(Response::Read(v)) => format!("read 0x{}", hex::encode(v)),
Ok(Response::Write(v)) => format!("write 0x{}", hex::encode(v)),
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ impl Server {
// Track inbound client requests and step them into the node.
recv(request_rx) -> result => {
let (request, response_tx) = result.expect("request_rx disconnected");
let id = uuid::Uuid::new_v4().into_bytes().to_vec();
let id = uuid::Uuid::new_v4();
let msg = raft::Envelope{
from: node.id(),
to: node.id(),
term: node.term(),
message: raft::Message::ClientRequest{id: id.clone(), request},
message: raft::Message::ClientRequest{id, request},
};
node = node.step(msg).expect("step failed");
response_txs.insert(id, response_tx);
Expand Down

0 comments on commit 16dcc86

Please sign in to comment.