Skip to content

Commit

Permalink
encoding: add Key and Value traits with decode/encode default impls
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed May 31, 2024
1 parent 763caa8 commit d204a21
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 130 deletions.
11 changes: 6 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::encoding::bincode;
use std::io::Write as _;

use crate::encoding::Value as _;
use crate::error::{Error, Result};
use crate::server::{Request, Response, Status};
use crate::sql::execution::ResultSet;
use crate::sql::schema::Table;

use rand::Rng;
use std::io::Write as _;

/// A toyDB client
pub struct Client {
Expand All @@ -25,9 +26,9 @@ impl Client {

/// Call a server method
fn call(&mut self, request: Request) -> Result<Response> {
bincode::serialize_into(&mut self.writer, &request)?;
request.encode_into(&mut self.writer)?;
self.writer.flush()?;
bincode::deserialize_from(&mut self.reader)?
Result::<Response>::decode_from(&mut self.reader)?
}

/// Executes a query
Expand All @@ -40,7 +41,7 @@ impl Client {
// FIXME We buffer rows for now to avoid lifetime hassles
let mut rows = Vec::new();
loop {
match bincode::deserialize_from::<_, Result<_>>(&mut self.reader)?? {
match Result::<Response>::decode_from(&mut self.reader)?? {
Response::Row(Some(row)) => rows.push(row),
Response::Row(None) => break,
response => {
Expand Down
62 changes: 60 additions & 2 deletions src/encoding/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,64 @@
//! Binary data encodings.
//!
//! - keycode: used for keys in the key/value store.
//! - bincode: used for values in the key/value store and network protocols.
//! * keycode: used for keys in the key/value store.
//! * bincode: used for values in the key/value store and network protocols.

pub mod bincode;
pub mod keycode;

use crate::error::Result;
use serde::{de::DeserializeOwned, Deserialize, Serialize};

/// Adds automatic Keycode encode/decode methods to key enums. These are
/// primarily meant for keys stored in key/value storage engines.
///
/// TODO: consider making this DeserializeOwned instead.
pub trait Key<'de>: Serialize + Deserialize<'de> {
/// Decodes a key from a byte slice using Keycode.
fn decode(bytes: &'de [u8]) -> Result<Self> {
keycode::deserialize(bytes)
}

/// Encodes a key to a byte vector using Keycode.
fn encode(&self) -> Result<Vec<u8>> {
keycode::serialize(self)
}
}

/// Adds automatic Bincode encode/decode methods to value types. These are used
/// not only for values in key/value storage engines, but also for e.g. network
/// protocol messages and other values.
pub trait Value: Serialize + DeserializeOwned {
/// Decodes a value from a byte slice using Bincode.
fn decode(bytes: &[u8]) -> Result<Self> {
bincode::deserialize(bytes)
}

/// Decodes a value from a reader using Bincode.
fn decode_from<R: std::io::Read>(reader: R) -> Result<Self> {
bincode::deserialize_from(reader)
}

/// Decodes a value from a reader using Bincode, or returns None if the
/// reader is closed.
fn maybe_decode_from<R: std::io::Read>(reader: R) -> Result<Option<Self>> {
bincode::maybe_deserialize_from(reader)
}

/// Encodes a value to a byte vector using Bincode.
fn encode(&self) -> Result<Vec<u8>> {
bincode::serialize(self)
}

/// Encodes a value into a writer using Bincode.
fn encode_into<W: std::io::Write>(&self, writer: W) -> Result<()> {
bincode::serialize_into(writer, self)
}
}

/// Blanket implementations for various types wrapping a value type.
impl<V: Value + std::cmp::Eq + std::hash::Hash> Value for std::collections::HashSet<V> {}
impl<V: Value> Value for Option<V> {}
impl<V: Value> Value for Result<V> {}
impl<V: Value> Value for Vec<V> {}
impl<V: Value + std::cmp::Eq + std::hash::Hash> Value for Vec<(V, std::collections::HashSet<V>)> {}
18 changes: 3 additions & 15 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{NodeID, Term};
use crate::encoding::{bincode, keycode};
use crate::encoding::{self, bincode, Key as _};
use crate::error::{Error, Result};
use crate::storage;
use crate::{asserterr, errassert};
Expand Down Expand Up @@ -31,15 +31,7 @@ pub enum Key {
CommitIndex,
}

impl Key {
fn decode(bytes: &[u8]) -> Result<Self> {
keycode::deserialize(bytes)
}

fn encode(&self) -> Result<Vec<u8>> {
keycode::serialize(self)
}
}
impl encoding::Key<'_> for Key {}

/// Log key prefixes, used for prefix scans.
///
Expand All @@ -51,11 +43,7 @@ enum KeyPrefix {
CommitIndex,
}

impl KeyPrefix {
fn encode(&self) -> Result<Vec<u8>> {
keycode::serialize(self)
}
}
impl encoding::Key<'_> for KeyPrefix {}

/// A Raft log.
pub struct Log {
Expand Down
7 changes: 7 additions & 0 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::{Entry, Index, NodeID, Term};
use crate::encoding;
use crate::error::Result;
use crate::storage;

Expand All @@ -18,6 +19,8 @@ pub struct Envelope {
pub message: Message,
}

impl encoding::Value for Envelope {}

/// A message sent between Raft nodes.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Message {
Expand Down Expand Up @@ -140,6 +143,8 @@ pub enum Request {
Status,
}

impl encoding::Value for Request {}

/// A client response. This will be wrapped in a Result to handle errors.
///
/// TODO: consider a separate error kind here, or a wrapped Result, to separate
Expand All @@ -155,6 +160,8 @@ pub enum Response {
Status(Status),
}

impl encoding::Value for Response {}

/// Raft cluster status.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Status {
Expand Down
26 changes: 5 additions & 21 deletions src/raft/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ fn quorum_value<T: Ord + Copy>(mut values: Vec<T>) -> T {
#[cfg(test)]
mod tests {
use super::*;
use crate::encoding::bincode;
use crate::encoding::{self, bincode, Value as _};
use crate::raft::{
Entry, Request, RequestID, Response, ELECTION_TIMEOUT_RANGE, HEARTBEAT_INTERVAL,
};
Expand Down Expand Up @@ -2025,6 +2025,8 @@ mod tests {
Scan,
}

impl encoding::Value for TestCommand {}

impl std::fmt::Display for TestCommand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -2035,16 +2037,6 @@ mod tests {
}
}

impl TestCommand {
fn decode(raw: &[u8]) -> crate::error::Result<Self> {
bincode::deserialize(raw)
}

fn encode(&self) -> crate::error::Result<Vec<u8>> {
bincode::serialize(self)
}
}

/// A TestCommand response.
#[derive(Serialize, Deserialize)]
enum TestResponse {
Expand All @@ -2056,6 +2048,8 @@ mod tests {
Scan(BTreeMap<String, String>),
}

impl encoding::Value for TestResponse {}

impl std::fmt::Display for TestResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -2069,14 +2063,4 @@ mod tests {
Ok(())
}
}

impl TestResponse {
fn decode(raw: &[u8]) -> crate::error::Result<Self> {
bincode::deserialize(raw)
}

fn encode(&self) -> crate::error::Result<Vec<u8>> {
bincode::serialize(self)
}
}
}
17 changes: 11 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::encoding::bincode;
use crate::encoding::{self, Value as _};
use crate::error::{Error, Result};
use crate::raft;
use crate::sql;
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Server {
/// stepping into the Raft node.
fn raft_receive_peer(socket: TcpStream, raft_step_tx: Sender<raft::Envelope>) -> Result<()> {
let mut socket = std::io::BufReader::new(socket);
while let Some(message) = bincode::maybe_deserialize_from(&mut socket)? {
while let Some(message) = raft::Envelope::maybe_decode_from(&mut socket)? {
raft_step_tx.send(message)?;
}
Ok(())
Expand All @@ -156,7 +156,8 @@ impl Server {
}
};
while let Ok(message) = raft_node_rx.recv() {
if let Err(err) = bincode::serialize_into(&mut socket, &message)
if let Err(err) = message
.encode_into(&mut socket)
.and_then(|()| socket.flush().map_err(Error::from))
{
error!("Failed sending to Raft peer {addr}: {err}");
Expand Down Expand Up @@ -285,7 +286,7 @@ impl Server {
let mut reader = std::io::BufReader::new(socket.try_clone()?);
let mut writer = std::io::BufWriter::new(socket);

while let Some(request) = bincode::maybe_deserialize_from(&mut reader)? {
while let Some(request) = Request::maybe_decode_from(&mut reader)? {
// Execute request.
debug!("Received request {request:?}");
let mut response = match request {
Expand Down Expand Up @@ -326,9 +327,9 @@ impl Server {
);
}

bincode::serialize_into(&mut writer, &response)?;
response.encode_into(&mut writer)?;
for row in rows {
bincode::serialize_into(&mut writer, &row)?;
row.encode_into(&mut writer)?;
}
writer.flush()?;
}
Expand All @@ -349,6 +350,8 @@ pub enum Request {
Status,
}

impl encoding::Value for Request {}

/// A SQL server response.
#[derive(Debug, Serialize, Deserialize)]
pub enum Response {
Expand All @@ -359,6 +362,8 @@ pub enum Response {
Status(Status),
}

impl encoding::Value for Response {}

/// SQL server status.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct Status {
Expand Down
Loading

0 comments on commit d204a21

Please sign in to comment.