From b8175c44cbb6d815bfd310e35a1130f65581940d Mon Sep 17 00:00:00 2001 From: Andrew Gallant Date: Wed, 24 May 2017 22:27:56 -0400 Subject: [PATCH] csv/writer: optimize writing. This commit optimizers the writer in the CSV crate. In particular, it adds a fast pass that completely bypasses the CoreWriter from csv-core. It's faster because the csv-core (necessarily) does a bunch of book-keeping to maintain the state of the writer to allow for incremental writing. The fast pass works by determining whether there is enough space left in the buffer to write the current record, based on a comparison with the upper bound of the space required for that record. If it fits, then we write the record to the buffer directly without all of the state management. The downside is that this requires an extra API method, read_byte_record. It's necessary because our heuristic needs to be able to cheaply compute the number of fields and the total number of bytes in the record, which isn't available in generic iterator APIs (including ExactSizeIterator). --- benches/bench.rs | 52 +++++++++++++- src/writer.rs | 181 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 219 insertions(+), 14 deletions(-) diff --git a/benches/bench.rs b/benches/bench.rs index 7e2c25f..637771b 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -11,7 +11,7 @@ use std::io; use serde::de::DeserializeOwned; use test::Bencher; -use csv::{ByteRecord, Reader, ReaderBuilder, StringRecord}; +use csv::{ByteRecord, Reader, ReaderBuilder, StringRecord, Writer}; static NFL: &'static str = include_str!("../examples/data/bench/nfl.csv"); @@ -267,6 +267,47 @@ bench!(count_mbta_iter_str, MBTA, count_iter_str, 90000); bench!(count_mbta_read_bytes, MBTA, count_read_bytes, 90000); bench!(count_mbta_read_str, MBTA, count_read_str, 90000); +macro_rules! bench_write { + ($name:ident, $data:ident) => { + #[bench] + fn $name(b: &mut Bencher) { + let data = $data.as_bytes(); + b.bytes = data.len() as u64; + let records = collect_records(data); + + b.iter(|| { + let mut wtr = Writer::from_writer(vec![]); + for r in &records { + wtr.write_record(r).unwrap(); + } + assert!(wtr.flush().is_ok()); + }) + } + } +} + +macro_rules! bench_write_bytes { + ($name:ident, $data:ident) => { + #[bench] + fn $name(b: &mut Bencher) { + let data = $data.as_bytes(); + b.bytes = data.len() as u64; + let records = collect_records(data); + + b.iter(|| { + let mut wtr = Writer::from_writer(vec![]); + for r in &records { + wtr.write_byte_record(r).unwrap(); + } + assert!(wtr.flush().is_ok()); + }) + } + } +} + +bench_write!(write_nfl_record, NFL); +bench_write_bytes!(write_nfl_bytes, NFL); + fn count_deserialize_owned_bytes(rdr: &mut Reader) -> u64 where R: io::Read, D: DeserializeOwned { @@ -323,3 +364,12 @@ fn count_read_str(rdr: &mut Reader) -> u64 { } count } + +fn collect_records(data: &[u8]) -> Vec { + let mut rdr = ReaderBuilder::new() + .has_headers(false) + .from_reader(data); + rdr.byte_records() + .collect::, _>>() + .unwrap() +} diff --git a/src/writer.rs b/src/writer.rs index f3be142..6dae171 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -4,7 +4,10 @@ use std::path::Path; use std::result; use csv_core::{ - Writer as CoreWriter, WriterBuilder as CoreWriterBuilder, WriteResult, + self, + Writer as CoreWriter, + WriterBuilder as CoreWriterBuilder, + WriteResult, }; use serde::Serialize; @@ -888,11 +891,99 @@ impl Writer { where I: IntoIterator, T: AsRef<[u8]> { for field in record.into_iter() { - self.write_field(field)?; + self.write_field_impl(field)?; } self.write_terminator() } + /// Write a single `ByteRecord`. + /// + /// This method accepts a borrowed `ByteRecord` and writes its contents + /// to the underlying writer. + /// + /// This is similar to `write_record` except that it specifically requires + /// a `ByteRecord`. This permits the writer to possibly write the record + /// more quickly than the more generic `write_record`. + /// + /// This may be called with an empty record, which will cause a record + /// terminator to be written. If no fields had been written, then a single + /// empty field is written before the terminator. + /// + /// # Example + /// + /// ``` + /// extern crate csv; + /// + /// use std::error::Error; + /// use csv::{ByteRecord, Writer}; + /// + /// # fn main() { example().unwrap(); } + /// fn example() -> Result<(), Box> { + /// let mut wtr = Writer::from_writer(vec![]); + /// wtr.write_byte_record(&ByteRecord::from(&["a", "b", "c"][..]))?; + /// wtr.write_byte_record(&ByteRecord::from(&["x", "y", "z"][..]))?; + /// + /// let data = String::from_utf8(wtr.into_inner()?)?; + /// assert_eq!(data, "a,b,c\nx,y,z\n"); + /// Ok(()) + /// } + /// ``` + #[inline(never)] + pub fn write_byte_record(&mut self, record: &ByteRecord) -> Result<()> { + if record.as_slice().is_empty() { + return self.write_record(record); + } + // The idea here is to find a fast path for shuffling our record into + // our buffer as quickly as possible. We do this because the underlying + // "core" CSV writer does a lot of book-keeping to maintain its state + // oriented API. + // + // The fast path occurs when we know our record will fit in whatever + // space we have left in our buffer. We can actually quickly compute + // the upper bound on the space required: + let upper_bound = + // The data itself plus the worst case: every byte is a quote. + (2 * record.as_slice().len()) + // The number of field delimiters. + + (record.len().saturating_sub(1)) + // The maximum number of quotes inserted around each field. + + (2 * record.len()) + // The maximum number of bytes for the terminator. + + 2; + if self.buf.writable().len() < upper_bound { + return self.write_record(record); + } + let mut first = true; + for field in record.iter() { + if !first { + self.buf.writable()[0] = self.core.get_delimiter(); + self.buf.written(1); + } + first = false; + + if !self.core.should_quote(field) { + self.buf.writable()[..field.len()].copy_from_slice(field); + self.buf.written(field.len()); + } else { + self.buf.writable()[0] = self.core.get_quote(); + self.buf.written(1); + let (res, nin, nout) = csv_core::quote( + field, + self.buf.writable(), + self.core.get_quote(), + self.core.get_escape(), + self.core.get_double_quote()); + debug_assert!(res == WriteResult::InputEmpty); + debug_assert!(nin == field.len()); + self.buf.written(nout); + self.buf.writable()[0] = self.core.get_quote(); + self.buf.written(1); + } + } + self.state.fields_written = record.len() as u64; + self.write_terminator_into_buffer() + } + /// Write a single field. /// /// One should prefer using `write_record` over this method. It is provided @@ -928,6 +1019,15 @@ impl Writer { /// } /// ``` pub fn write_field>(&mut self, field: T) -> Result<()> { + self.write_field_impl(field) + } + + /// Implementation of write_field. + /// + /// This is a separate method so we can force the compiler to inline it + /// into write_record. + #[inline(always)] + fn write_field_impl>(&mut self, field: T) -> Result<()> { if self.state.fields_written > 0 { self.write_delimiter()?; } @@ -987,6 +1087,42 @@ impl Writer { /// Write a CSV terminator. fn write_terminator(&mut self) -> Result<()> { + self.check_field_count()?; + loop { + let (res, nout) = self.core.terminator(self.buf.writable()); + self.buf.written(nout); + match res { + WriteResult::InputEmpty => { + self.state.fields_written = 0; + return Ok(()); + } + WriteResult::OutputFull => self.flush()?, + } + } + } + + /// Write a CSV terminator that is guaranteed to fit into the current + /// buffer. + #[inline(never)] + fn write_terminator_into_buffer(&mut self) -> Result<()> { + self.check_field_count()?; + match self.core.get_terminator() { + csv_core::Terminator::CRLF => { + self.buf.writable()[0] = b'\r'; + self.buf.writable()[1] = b'\n'; + self.buf.written(2); + } + csv_core::Terminator::Any(b) => { + self.buf.writable()[0] = b; + self.buf.written(1); + } + _ => unreachable!(), + } + self.state.fields_written = 0; + Ok(()) + } + + fn check_field_count(&mut self) -> Result<()> { if !self.state.flexible { match self.state.first_field_count { None => { @@ -1003,17 +1139,7 @@ impl Writer { Some(_) => {} } } - loop { - let (res, nout) = self.core.terminator(self.buf.writable()); - self.buf.written(nout); - match res { - WriteResult::InputEmpty => { - self.state.fields_written = 0; - return Ok(()); - } - WriteResult::OutputFull => self.flush()?, - } - } + Ok(()) } } @@ -1021,6 +1147,7 @@ impl Buffer { /// Returns a slice of the buffer's current contents. /// /// The slice returned may be empty. + #[inline] fn readable(&self) -> &[u8] { &self.buf[..self.len] } @@ -1028,16 +1155,19 @@ impl Buffer { /// Returns a mutable slice of the remaining space in this buffer. /// /// The slice returned may be empty. + #[inline] fn writable(&mut self) -> &mut [u8] { &mut self.buf[self.len..] } /// Indicates that `n` bytes have been written to this buffer. + #[inline] fn written(&mut self, n: usize) { self.len += n; } /// Clear the buffer. + #[inline] fn clear(&mut self) { self.len = 0; } @@ -1079,6 +1209,14 @@ mod tests { assert_eq!(wtr_as_string(wtr), "a,b,c\n"); } + #[test] + fn raw_one_byte_record() { + let mut wtr = WriterBuilder::new().from_writer(vec![]); + wtr.write_byte_record(&ByteRecord::from(vec!["a", "b", "c"])).unwrap(); + + assert_eq!(wtr_as_string(wtr), "a,b,c\n"); + } + #[test] fn one_empty_record() { let mut wtr = WriterBuilder::new().from_writer(vec![]); @@ -1087,6 +1225,14 @@ mod tests { assert_eq!(wtr_as_string(wtr), "\"\"\n"); } + #[test] + fn raw_one_empty_record() { + let mut wtr = WriterBuilder::new().from_writer(vec![]); + wtr.write_byte_record(&ByteRecord::from(vec![""])).unwrap(); + + assert_eq!(wtr_as_string(wtr), "\"\"\n"); + } + #[test] fn two_empty_records() { let mut wtr = WriterBuilder::new().from_writer(vec![]); @@ -1096,6 +1242,15 @@ mod tests { assert_eq!(wtr_as_string(wtr), "\"\"\n\"\"\n"); } + #[test] + fn raw_two_empty_records() { + let mut wtr = WriterBuilder::new().from_writer(vec![]); + wtr.write_byte_record(&ByteRecord::from(vec![""])).unwrap(); + wtr.write_byte_record(&ByteRecord::from(vec![""])).unwrap(); + + assert_eq!(wtr_as_string(wtr), "\"\"\n\"\"\n"); + } + #[test] fn unequal_records_bad() { let mut wtr = WriterBuilder::new().from_writer(vec![]);