Skip to content

Commit

Permalink
csv/writer: optimize writing.
Browse files Browse the repository at this point in the history
This commit optimizers the writer in the CSV crate. In particular, it adds
a fast pass that completely bypasses the CoreWriter from csv-core. It's
faster because the csv-core (necessarily) does a bunch of book-keeping to
maintain the state of the writer to allow for incremental writing.

The fast pass works by determining whether there is enough space left in
the buffer to write the current record, based on a comparison with the
upper bound of the space required for that record. If it fits, then we
write the record to the buffer directly without all of the state
management.

The downside is that this requires an extra API method, read_byte_record.
It's necessary because our heuristic needs to be able to cheaply compute
the number of fields and the total number of bytes in the record, which
isn't available in generic iterator APIs (including ExactSizeIterator).
  • Loading branch information
BurntSushi committed May 25, 2017
1 parent f617f8f commit b8175c4
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 14 deletions.
52 changes: 51 additions & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::io;
use serde::de::DeserializeOwned;
use test::Bencher;

use csv::{ByteRecord, Reader, ReaderBuilder, StringRecord};
use csv::{ByteRecord, Reader, ReaderBuilder, StringRecord, Writer};

static NFL: &'static str =
include_str!("../examples/data/bench/nfl.csv");
Expand Down Expand Up @@ -267,6 +267,47 @@ bench!(count_mbta_iter_str, MBTA, count_iter_str, 90000);
bench!(count_mbta_read_bytes, MBTA, count_read_bytes, 90000);
bench!(count_mbta_read_str, MBTA, count_read_str, 90000);

macro_rules! bench_write {
($name:ident, $data:ident) => {
#[bench]
fn $name(b: &mut Bencher) {
let data = $data.as_bytes();
b.bytes = data.len() as u64;
let records = collect_records(data);

b.iter(|| {
let mut wtr = Writer::from_writer(vec![]);
for r in &records {
wtr.write_record(r).unwrap();
}
assert!(wtr.flush().is_ok());
})
}
}
}

macro_rules! bench_write_bytes {
($name:ident, $data:ident) => {
#[bench]
fn $name(b: &mut Bencher) {
let data = $data.as_bytes();
b.bytes = data.len() as u64;
let records = collect_records(data);

b.iter(|| {
let mut wtr = Writer::from_writer(vec![]);
for r in &records {
wtr.write_byte_record(r).unwrap();
}
assert!(wtr.flush().is_ok());
})
}
}
}

bench_write!(write_nfl_record, NFL);
bench_write_bytes!(write_nfl_bytes, NFL);

fn count_deserialize_owned_bytes<R, D>(rdr: &mut Reader<R>) -> u64
where R: io::Read, D: DeserializeOwned
{
Expand Down Expand Up @@ -323,3 +364,12 @@ fn count_read_str<R: io::Read>(rdr: &mut Reader<R>) -> u64 {
}
count
}

fn collect_records(data: &[u8]) -> Vec<ByteRecord> {
let mut rdr = ReaderBuilder::new()
.has_headers(false)
.from_reader(data);
rdr.byte_records()
.collect::<Result<Vec<_>, _>>()
.unwrap()
}
181 changes: 168 additions & 13 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use std::path::Path;
use std::result;

use csv_core::{
Writer as CoreWriter, WriterBuilder as CoreWriterBuilder, WriteResult,
self,
Writer as CoreWriter,
WriterBuilder as CoreWriterBuilder,
WriteResult,
};
use serde::Serialize;

Expand Down Expand Up @@ -888,11 +891,99 @@ impl<W: io::Write> Writer<W> {
where I: IntoIterator<Item=T>, T: AsRef<[u8]>
{
for field in record.into_iter() {
self.write_field(field)?;
self.write_field_impl(field)?;
}
self.write_terminator()
}

/// Write a single `ByteRecord`.
///
/// This method accepts a borrowed `ByteRecord` and writes its contents
/// to the underlying writer.
///
/// This is similar to `write_record` except that it specifically requires
/// a `ByteRecord`. This permits the writer to possibly write the record
/// more quickly than the more generic `write_record`.
///
/// This may be called with an empty record, which will cause a record
/// terminator to be written. If no fields had been written, then a single
/// empty field is written before the terminator.
///
/// # Example
///
/// ```
/// extern crate csv;
///
/// use std::error::Error;
/// use csv::{ByteRecord, Writer};
///
/// # fn main() { example().unwrap(); }
/// fn example() -> Result<(), Box<Error>> {
/// let mut wtr = Writer::from_writer(vec![]);
/// wtr.write_byte_record(&ByteRecord::from(&["a", "b", "c"][..]))?;
/// wtr.write_byte_record(&ByteRecord::from(&["x", "y", "z"][..]))?;
///
/// let data = String::from_utf8(wtr.into_inner()?)?;
/// assert_eq!(data, "a,b,c\nx,y,z\n");
/// Ok(())
/// }
/// ```
#[inline(never)]
pub fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> {
if record.as_slice().is_empty() {
return self.write_record(record);
}
// The idea here is to find a fast path for shuffling our record into
// our buffer as quickly as possible. We do this because the underlying
// "core" CSV writer does a lot of book-keeping to maintain its state
// oriented API.
//
// The fast path occurs when we know our record will fit in whatever
// space we have left in our buffer. We can actually quickly compute
// the upper bound on the space required:
let upper_bound =
// The data itself plus the worst case: every byte is a quote.
(2 * record.as_slice().len())
// The number of field delimiters.
+ (record.len().saturating_sub(1))
// The maximum number of quotes inserted around each field.
+ (2 * record.len())
// The maximum number of bytes for the terminator.
+ 2;
if self.buf.writable().len() < upper_bound {
return self.write_record(record);
}
let mut first = true;
for field in record.iter() {
if !first {
self.buf.writable()[0] = self.core.get_delimiter();
self.buf.written(1);
}
first = false;

if !self.core.should_quote(field) {
self.buf.writable()[..field.len()].copy_from_slice(field);
self.buf.written(field.len());
} else {
self.buf.writable()[0] = self.core.get_quote();
self.buf.written(1);
let (res, nin, nout) = csv_core::quote(
field,
self.buf.writable(),
self.core.get_quote(),
self.core.get_escape(),
self.core.get_double_quote());
debug_assert!(res == WriteResult::InputEmpty);
debug_assert!(nin == field.len());
self.buf.written(nout);
self.buf.writable()[0] = self.core.get_quote();
self.buf.written(1);
}
}
self.state.fields_written = record.len() as u64;
self.write_terminator_into_buffer()
}

/// Write a single field.
///
/// One should prefer using `write_record` over this method. It is provided
Expand Down Expand Up @@ -928,6 +1019,15 @@ impl<W: io::Write> Writer<W> {
/// }
/// ```
pub fn write_field<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
self.write_field_impl(field)
}

/// Implementation of write_field.
///
/// This is a separate method so we can force the compiler to inline it
/// into write_record.
#[inline(always)]
fn write_field_impl<T: AsRef<[u8]>>(&mut self, field: T) -> Result<()> {
if self.state.fields_written > 0 {
self.write_delimiter()?;
}
Expand Down Expand Up @@ -987,6 +1087,42 @@ impl<W: io::Write> Writer<W> {

/// Write a CSV terminator.
fn write_terminator(&mut self) -> Result<()> {
self.check_field_count()?;
loop {
let (res, nout) = self.core.terminator(self.buf.writable());
self.buf.written(nout);
match res {
WriteResult::InputEmpty => {
self.state.fields_written = 0;
return Ok(());
}
WriteResult::OutputFull => self.flush()?,
}
}
}

/// Write a CSV terminator that is guaranteed to fit into the current
/// buffer.
#[inline(never)]
fn write_terminator_into_buffer(&mut self) -> Result<()> {
self.check_field_count()?;
match self.core.get_terminator() {
csv_core::Terminator::CRLF => {
self.buf.writable()[0] = b'\r';
self.buf.writable()[1] = b'\n';
self.buf.written(2);
}
csv_core::Terminator::Any(b) => {
self.buf.writable()[0] = b;
self.buf.written(1);
}
_ => unreachable!(),
}
self.state.fields_written = 0;
Ok(())
}

fn check_field_count(&mut self) -> Result<()> {
if !self.state.flexible {
match self.state.first_field_count {
None => {
Expand All @@ -1003,41 +1139,35 @@ impl<W: io::Write> Writer<W> {
Some(_) => {}
}
}
loop {
let (res, nout) = self.core.terminator(self.buf.writable());
self.buf.written(nout);
match res {
WriteResult::InputEmpty => {
self.state.fields_written = 0;
return Ok(());
}
WriteResult::OutputFull => self.flush()?,
}
}
Ok(())
}
}

impl Buffer {
/// Returns a slice of the buffer's current contents.
///
/// The slice returned may be empty.
#[inline]
fn readable(&self) -> &[u8] {
&self.buf[..self.len]
}

/// Returns a mutable slice of the remaining space in this buffer.
///
/// The slice returned may be empty.
#[inline]
fn writable(&mut self) -> &mut [u8] {
&mut self.buf[self.len..]
}

/// Indicates that `n` bytes have been written to this buffer.
#[inline]
fn written(&mut self, n: usize) {
self.len += n;
}

/// Clear the buffer.
#[inline]
fn clear(&mut self) {
self.len = 0;
}
Expand Down Expand Up @@ -1079,6 +1209,14 @@ mod tests {
assert_eq!(wtr_as_string(wtr), "a,b,c\n");
}

#[test]
fn raw_one_byte_record() {
let mut wtr = WriterBuilder::new().from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).unwrap();

assert_eq!(wtr_as_string(wtr), "a,b,c\n");
}

#[test]
fn one_empty_record() {
let mut wtr = WriterBuilder::new().from_writer(vec![]);
Expand All @@ -1087,6 +1225,14 @@ mod tests {
assert_eq!(wtr_as_string(wtr), "\"\"\n");
}

#[test]
fn raw_one_empty_record() {
let mut wtr = WriterBuilder::new().from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec![""])).unwrap();

assert_eq!(wtr_as_string(wtr), "\"\"\n");
}

#[test]
fn two_empty_records() {
let mut wtr = WriterBuilder::new().from_writer(vec![]);
Expand All @@ -1096,6 +1242,15 @@ mod tests {
assert_eq!(wtr_as_string(wtr), "\"\"\n\"\"\n");
}

#[test]
fn raw_two_empty_records() {
let mut wtr = WriterBuilder::new().from_writer(vec![]);
wtr.write_byte_record(&ByteRecord::from(vec![""])).unwrap();
wtr.write_byte_record(&ByteRecord::from(vec![""])).unwrap();

assert_eq!(wtr_as_string(wtr), "\"\"\n\"\"\n");
}

#[test]
fn unequal_records_bad() {
let mut wtr = WriterBuilder::new().from_writer(vec![]);
Expand Down

0 comments on commit b8175c4

Please sign in to comment.