Skip to content

Commit

Permalink
Remove async for Raft server processing
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Apr 12, 2024
1 parent 56c8084 commit 542ef27
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 472 deletions.
282 changes: 11 additions & 271 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 0 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ config = "~0.14.0"
crossbeam = { version = "0.8.4", features = ["crossbeam-channel"] }
derivative = "~2.2.0"
fs4 = "~0.8.1"
futures = "~0.3.15"
futures-util = "~0.3.15"
hdrhistogram = "~7.5.4"
hex = "~0.4.3"
itertools = "0.12.1"
Expand All @@ -31,18 +29,6 @@ serde = "~1.0.126"
serde_bytes = "~0.11.12"
serde_derive = "~1.0.126"
simplelog = "~0.12.1"
tokio = { version = "~1.37.0", features = [
"macros",
"rt",
"rt-multi-thread",
"net",
"io-util",
"time",
"sync",
] }
tokio-serde = { version = "~0.9.0", features = ["bincode"] }
tokio-stream = { version = "~0.1.6", features = ["net"] }
tokio-util = { version = "~0.7.8", features = ["codec"] }
uuid = { version = "~1.8.0", features = ["v4"] }

[dev-dependencies]
Expand Down
8 changes: 3 additions & 5 deletions src/bin/toydb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

use serde_derive::Deserialize;
use std::collections::HashMap;
use tokio::net::TcpListener;
use toydb::error::{Error, Result};
use toydb::raft;
use toydb::sql;
Expand All @@ -17,8 +16,7 @@ use toydb::Server;

const COMPACT_MIN_BYTES: u64 = 1024 * 1024;

#[tokio::main]
async fn main() -> Result<()> {
fn main() -> Result<()> {
let args = clap::command!()
.arg(
clap::Arg::new("config")
Expand Down Expand Up @@ -68,10 +66,10 @@ async fn main() -> Result<()> {

let srv = Server::new(cfg.id, cfg.peers, raft_log, raft_state)?;

let raft_listener = TcpListener::bind(&cfg.listen_raft).await?;
let raft_listener = std::net::TcpListener::bind(&cfg.listen_raft)?;
let sql_listener = std::net::TcpListener::bind(&cfg.listen_sql)?;

srv.serve(raft_listener, sql_listener).await
srv.serve(raft_listener, sql_listener)
}

#[derive(Debug, Deserialize)]
Expand Down
30 changes: 0 additions & 30 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,33 +168,3 @@ impl<T> From<std::sync::PoisonError<T>> for Error {
Error::Internal(err.to_string())
}
}

impl From<tokio::task::JoinError> for Error {
fn from(err: tokio::task::JoinError) -> Self {
Error::Internal(err.to_string())
}
}

impl From<tokio::sync::mpsc::error::TryRecvError> for Error {
fn from(err: tokio::sync::mpsc::error::TryRecvError) -> Self {
Error::Internal(err.to_string())
}
}

impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(err: tokio::sync::mpsc::error::SendError<T>) -> Self {
Error::Internal(err.to_string())
}
}

impl<T> From<tokio::sync::mpsc::error::TrySendError<T>> for Error {
fn from(err: tokio::sync::mpsc::error::TrySendError<T>) -> Self {
Error::Internal(err.to_string())
}
}

impl From<tokio::sync::oneshot::error::RecvError> for Error {
fn from(err: tokio::sync::oneshot::error::RecvError) -> Self {
Error::Internal(err.to_string())
}
}
5 changes: 2 additions & 3 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ mod tests {
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::storage;
use tokio::sync::mpsc;

#[allow(clippy::type_complexity)]
fn setup() -> Result<(RawNode<Candidate>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
fn setup() -> Result<(RawNode<Candidate>, crossbeam::channel::Receiver<Message>)> {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::Memory::new(), false)?;

Expand Down
10 changes: 4 additions & 6 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::error::{Error, Result};

use ::log::{debug, info};
use std::collections::HashSet;
use tokio::sync::mpsc;

// A follower replicates state from a leader.
#[derive(Clone, Debug, PartialEq)]
Expand Down Expand Up @@ -44,7 +43,7 @@ impl RawNode<Follower> {
peers: HashSet<NodeID>,
mut log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
node_tx: crossbeam::channel::Sender<Message>,
) -> Result<Self> {
let (term, voted_for) = log.get_term()?;
let role = Follower::new(None, voted_for);
Expand Down Expand Up @@ -288,11 +287,10 @@ pub mod tests {
use super::*;
use crate::error::Error;
use crate::storage;
use tokio::sync::mpsc;

#[allow(clippy::type_complexity)]
fn setup() -> Result<(RawNode<Follower>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
fn setup() -> Result<(RawNode<Follower>, crossbeam::channel::Receiver<Message>)> {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::Memory::new(), false)?;
log.append(1, Some(vec![0x01]))?;
Expand Down Expand Up @@ -590,7 +588,7 @@ pub mod tests {
// AppendEntries accepts some entries at base 0 without changes
fn step_appendentries_base0() -> Result<()> {
// TODO: Move this into a setup function.
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
let (node_tx, mut node_rx) = crossbeam::channel::unbounded();
let mut log = Log::new(storage::Memory::new(), false)?;
log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
Expand Down
7 changes: 3 additions & 4 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,11 +407,10 @@ mod tests {
use super::*;
use crate::storage;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;

#[allow(clippy::type_complexity)]
fn setup() -> Result<(RawNode<Leader>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
fn setup() -> Result<(RawNode<Leader>, crossbeam::channel::Receiver<Message>)> {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
let peers = HashSet::from([2, 3, 4, 5]);
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::Memory::new(), false)?;
Expand Down Expand Up @@ -688,7 +687,7 @@ mod tests {
#[test]
// Sending a mutate request should append it to log, replicate it to peers, and register notification.
fn step_clientrequest_mutate() -> Result<()> {
let (leader, mut node_rx) = setup()?;
let (leader, node_rx) = setup()?;
let peers = leader.peers.clone();
let mut node: Node = leader.into();

Expand Down
18 changes: 8 additions & 10 deletions src/raft/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use leader::Leader;
use ::log::debug;
use rand::Rng as _;
use std::collections::HashSet;
use tokio::sync::mpsc;

/// A node ID.
pub type NodeID = u8;
Expand Down Expand Up @@ -56,7 +55,7 @@ impl Node {
peers: HashSet<NodeID>,
log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
node_tx: crossbeam::channel::Sender<Message>,
) -> Result<Self> {
let node = RawNode::new(id, peers, log, state, node_tx)?;
if node.peers.is_empty() {
Expand Down Expand Up @@ -126,7 +125,7 @@ pub struct RawNode<R: Role = Follower> {
term: Term,
log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
node_tx: crossbeam::channel::Sender<Message>,
role: R,
}

Expand Down Expand Up @@ -258,11 +257,10 @@ mod tests {
use crate::storage;
use pretty_assertions::assert_eq;
use std::collections::HashSet;
use tokio::sync::mpsc;

#[track_caller]
pub fn assert_messages<T: std::fmt::Debug + PartialEq>(
rx: &mut mpsc::UnboundedReceiver<T>,
rx: &mut crossbeam::channel::Receiver<T>,
msgs: Vec<T>,
) {
let mut actual = Vec::new();
Expand Down Expand Up @@ -432,14 +430,14 @@ mod tests {
NodeAsserter::new(node)
}

fn setup_rolenode() -> Result<(RawNode<Follower>, mpsc::UnboundedReceiver<Message>)> {
fn setup_rolenode() -> Result<(RawNode<Follower>, crossbeam::channel::Receiver<Message>)> {
setup_rolenode_peers(vec![2, 3])
}

fn setup_rolenode_peers(
peers: Vec<NodeID>,
) -> Result<(RawNode<Follower>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
) -> Result<(RawNode<Follower>, crossbeam::channel::Receiver<Message>)> {
let (node_tx, node_rx) = crossbeam::channel::unbounded();
let node = RawNode {
role: Follower::new(None, None),
id: 1,
Expand All @@ -454,7 +452,7 @@ mod tests {

#[test]
fn new() -> Result<()> {
let (node_tx, _) = mpsc::unbounded_channel();
let (node_tx, _) = crossbeam::channel::unbounded();
let node = Node::new(
1,
HashSet::from([2, 3]),
Expand All @@ -475,7 +473,7 @@ mod tests {

#[test]
fn new_single() -> Result<()> {
let (node_tx, _node_rx) = mpsc::unbounded_channel();
let (node_tx, _node_rx) = crossbeam::channel::unbounded();
let node = Node::new(
1,
HashSet::new(),
Expand Down
Loading

0 comments on commit 542ef27

Please sign in to comment.