Skip to content

Commit

Permalink
Add SenderBuilder.
Browse files Browse the repository at this point in the history
  • Loading branch information
4lDO2 committed Mar 7, 2019
1 parent 4d0eecb commit 0c3e924
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 13 deletions.
23 changes: 23 additions & 0 deletions src/endian.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use bincode::Config;
use byteorder::{BigEndian, ByteOrder, LittleEndian};

/// This trait only exists because bincode wasn't compatible with byteorder.
pub(crate) trait Endian: ByteOrder {
fn config() -> Config;
}
impl Endian for BigEndian {
fn config() -> Config {
let mut config = bincode::config();
config.big_endian();
config
}
}
impl Endian for LittleEndian {
fn config() -> Config {
let mut config = bincode::config();
config.little_endian();
config
}
}


4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ extern crate byteorder;
extern crate serde;

mod channel;
mod endian;
mod error;
mod receiver;
mod sender;

pub use channel::{ChannelRecv, ChannelSend};
pub(crate) use endian::Endian;
pub use error::{RecvError, SendError};
pub use receiver::{Receiver, ReceiverBuilder};
pub use sender::{Sender, SenderBuilder};

pub use byteorder::{BigEndian, LittleEndian};
85 changes: 72 additions & 13 deletions src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,95 @@ use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::net::{TcpStream, ToSocketAddrs};

use byteorder::{BigEndian, WriteBytesExt};
use bincode::Config;
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use serde::Serialize;

use crate::{ChannelSend, SendError};
use crate::{ChannelSend, Endian, SendError};

pub struct Sender<T: Serialize, W: Write = BufWriter<TcpStream>> {
/// The sending side of a channel.
pub struct Sender<T: Serialize, E: ByteOrder, W: Write = BufWriter<TcpStream>> {
writer: W,
_marker: PhantomData<T>,
config: Config,
_marker: PhantomData<(T, E)>,
}

/// A more convenient way of initializing senders.
pub struct SenderBuilder<T: Serialize, W: Write, E: ByteOrder> {
_marker: PhantomData<(T, W, E)>,
}

impl<T: Serialize> Sender<T> {
pub fn connect<A: ToSocketAddrs>(address: A) -> std::io::Result<Self> {
Ok(Sender::new(BufWriter::new(TcpStream::connect(address)?)))
impl<T: Serialize, W: Write, E: ByteOrder> SenderBuilder<T, W, E> {
/// Begin building a new, buffered channel.
pub fn new() -> SenderBuilder<T, BufWriter<TcpStream>, BigEndian> {
SenderBuilder::buffered()
}
/// Begin building a new, buffered channel.
pub fn buffered() -> SenderBuilder<T, BufWriter<TcpStream>, BigEndian> {
SenderBuilder {
_marker: PhantomData,
}
}
/// Begin building a new, non-buffered channel.
pub fn realtime() -> SenderBuilder<T, TcpStream, BigEndian> {
SenderBuilder {
_marker: PhantomData,
}
}
pub fn connect_realtime<A: ToSocketAddrs>(address: A) -> std::io::Result<Sender<T, TcpStream>> {
Ok(Sender::new(TcpStream::connect(address)?))
/// Specify the endianness.
///
/// *NOTE* This has to be either BigEndian or LittleEndian,
/// since bincode doesn't use ByteOrder, but instead has big_endian() and little_endian() in
/// its Config.
pub fn with_endianness<F: ByteOrder>(self) -> SenderBuilder<T, W, F> {
SenderBuilder {
_marker: PhantomData,
}
}
pub fn new<W: Write>(writer: W) -> Sender<T, W> {
}
impl<T: Serialize, W: Write, E: Endian> SenderBuilder<T, W, E> {
/// Initialize the sender with the current variables.
pub fn build(self, writer: W) -> Sender<T, BigEndian, W> {
Sender {
writer,
_marker: PhantomData,
writer,
config: E::config(),
}
}
}
impl<T: Serialize, W: Write> ChannelSend<T> for Sender<T, W> {
impl<T: Serialize, E: Endian> SenderBuilder<T, BufWriter<TcpStream>, E> {
/// Connect to a listening receiver, at a specified address.
pub fn connect<A: ToSocketAddrs>(self, address: A) -> std::io::Result<Sender<T, E, BufWriter<TcpStream>>> {
let mut stream = TcpStream::connect(address)?;
stream.set_nodelay(false);
stream.set_nonblocking(false);

Ok(Sender {
writer: BufWriter::new(stream),
_marker: PhantomData,
config: E::config(),
})
}
}
impl<T: Serialize, E: Endian> SenderBuilder<T, TcpStream, E> {
/// Connect to a listening receiver, at a specified address.
pub fn connect<A: ToSocketAddrs>(self, address: A) -> std::io::Result<Sender<T, E, TcpStream>> {
let mut stream = TcpStream::connect(address)?;
stream.set_nodelay(true);
stream.set_nonblocking(false);

Ok(Sender {
writer: stream,
_marker: PhantomData,
config: E::config(),
})
}
}
impl<T: Serialize, W: Write, E: ByteOrder> ChannelSend<T> for Sender<T, E, W> {
type Error = SendError;
fn send(&mut self, value: &T) -> Result<(), SendError> {
let bytes = bincode::serialize(value)?;
self.writer.write_u64::<BigEndian>(bytes.len() as u64)?;
self.writer.write_u64::<E>(bytes.len() as u64)?;
self.writer.write(&bytes)?;

Ok(())
Expand Down

0 comments on commit 0c3e924

Please sign in to comment.