Skip to content

Commit

Permalink
csv: add SingleFieldWriter for progressive (== faster) serialization
Browse files Browse the repository at this point in the history
This struct is an implementation detail that allows for progressive
writing of a field into a writer's internal buffer, which means it can
be used as a fmt::Write-r (i.e. with write!), and thus avoid more
temporary allocations with integers and arbitrary user structures (if
they serialize themselves through `Serializer::collect_seq`).

This improves the benchmarks:

     name                              before.txt ns/iter     after.txt ns/iter      diff ns/iter   diff %  speedup
     count_game_serialize_owned_bytes  14,281,542 (154 MB/s)  13,410,592 (164 MB/s)      -870,950   -6.10%   x 1.06
     count_game_serialize_owned_str    14,325,692 (153 MB/s)  13,572,390 (162 MB/s)      -753,302   -5.26%   x 1.06
     count_mbta_serialize_owned_bytes  3,238,056 (192 MB/s)   2,645,566 (235 MB/s)       -592,490  -18.30%   x 1.22
     count_mbta_serialize_owned_str    3,251,586 (191 MB/s)   2,648,981 (235 MB/s)       -602,605  -18.53%   x 1.23
     count_nfl_serialize_owned_bytes   6,635,845 (205 MB/s)   5,045,906 (270 MB/s)     -1,589,939  -23.96%   x 1.32
     count_nfl_serialize_owned_str     6,662,848 (204 MB/s)   5,341,465 (255 MB/s)     -1,321,383  -19.83%   x 1.25
     count_pop_serialize_owned_bytes   10,289,241 (92 MB/s)   7,750,243 (123 MB/s)     -2,538,998  -24.68%   x 1.33
     count_pop_serialize_owned_str     10,146,352 (94 MB/s)   7,587,681 (125 MB/s)     -2,558,671  -25.22%   x 1.34

Notably the improvement is smaller for the 'game' benchmarks, where
the serialized values are mostly strings, and much larger for the
others which have mostly integers/floats.
  • Loading branch information
huonw authored and BurntSushi committed Aug 23, 2018
1 parent cddabeb commit 16a67c4
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 29 deletions.
32 changes: 19 additions & 13 deletions src/serializer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::fmt::{self, Write};
use std::io;
use std::mem;

Expand All @@ -11,7 +11,7 @@ use serde::ser::{
};

use error::{Error, ErrorKind, new_error};
use writer::Writer;
use writer::{Writer, SingleFieldWriter};

/// Serialize the given value to the given writer, and return an error if
/// anything went wrong.
Expand All @@ -34,6 +34,12 @@ impl<'a, 'w, W: io::Write> Serializer for &'a mut SeRecord<'w, W> {
type SerializeStruct = Self;
type SerializeStructVariant = Self;

fn collect_str<T: ?Sized + fmt::Display>(self, v: &T) -> Result<(), Error> {
let mut sfw = SingleFieldWriter::start_field(self.wtr)?;
let _ = write!(sfw, "{}", v);
sfw.take_formatting_error()
}

fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
if v {
self.wtr.write_field("true")
Expand All @@ -43,47 +49,47 @@ impl<'a, 'w, W: io::Write> Serializer for &'a mut SeRecord<'w, W> {
}

fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_f32(self, v: f32) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_f64(self, v: f64) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_char(self, v: char) -> Result<Self::Ok, Self::Error> {
self.wtr.write_field(v.to_string().as_bytes())
self.collect_str(&v)
}

fn serialize_str(self, value: &str) -> Result<Self::Ok, Self::Error> {
Expand Down
81 changes: 65 additions & 16 deletions src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::fmt;
use std::fs::File;
use std::io;
use std::mem;
use std::path::Path;
use std::result;

Expand Down Expand Up @@ -1083,22 +1085,8 @@ impl<W: io::Write> Writer<W> {
/// into write_record.
#[inline(always)]
fn write_field_impl<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
if self.state.fields_written > 0 {
self.write_delimiter()?;
}
let mut field = field.as_ref();
loop {
let (res, nin, nout) = self.core.field(field, self.buf.writable());
field = &field[nin..];
self.buf.written(nout);
match res {
WriteResult::InputEmpty => {
self.state.fields_written += 1;
return Ok(());
}
WriteResult::OutputFull => self.flush()?,
}
}
let mut sfw = SingleFieldWriter::start_field(self)?;
sfw.append(field.as_ref())
}

/// Flush the contents of the internal buffer to the underlying writer.
Expand Down Expand Up @@ -1198,6 +1186,67 @@ impl<W: io::Write> Writer<W> {
}
}

/// A structure to allow progressively writing a single field, and thus
/// constructing a field from types implementing fmt::* traits directly
/// into a Writer's internal buffer.
///
/// This is public within this module so it can be used elsewhere
/// within this crate (serialization, in particular), but isn't
/// exposed for public use. If an error occurs using the fmt
/// interface, the csv::Error is saved inside this type in a sticky
/// way (i.e. further formatting calls do nothing and return
/// fmt::Error too) and is accessible via take_formatting_error, which
/// clears it.
pub struct SingleFieldWriter<'a, W: io::Write + 'a> {
wtr: &'a mut Writer<W>,
error: Result<()>,
}

impl<'a, W: io::Write> SingleFieldWriter<'a, W> {
pub fn start_field(wtr: &'a mut Writer<W>) -> Result<Self> {
if wtr.state.fields_written > 0 {
wtr.write_delimiter()?;
}

Ok(SingleFieldWriter { wtr: wtr, error: Ok(()) })
}

pub fn append(&mut self, mut data: &[u8]) -> Result<()> {
loop {
let (res, nin, nout) = self.wtr.core.field(data, self.wtr.buf.writable());
data = &data[nin..];
self.wtr.buf.written(nout);
match res {
WriteResult::InputEmpty => {
return Ok(());
}
WriteResult::OutputFull => self.wtr.flush()?,
}
}
}

pub fn take_formatting_error(&mut self) -> Result<()> {
mem::replace(&mut self.error, Ok(()))
}
}
impl<'a, W: io::Write> fmt::Write for SingleFieldWriter<'a, W> {
fn write_str(&mut self, s: &str) -> fmt::Result {
if self.error.is_err() { return Err(fmt::Error) }

self.error = self.append(s.as_bytes());
if self.error.is_ok() {
Ok(())
} else {
Err(fmt::Error)
}
}
}
impl<'a, W: io::Write> Drop for SingleFieldWriter<'a, W> {
fn drop(&mut self) {
self.wtr.state.fields_written += 1;
}
}

impl Buffer {
/// Returns a slice of the buffer's current contents.
///
Expand Down

0 comments on commit 16a67c4

Please sign in to comment.