From c05997d5d5ef18d341f0d6ae837e2dbde7c8f859 Mon Sep 17 00:00:00 2001 From: Andrew Gallant Date: Sat, 4 Apr 2015 21:18:23 -0400 Subject: [PATCH] Refactoring and performance improvements. The major change here is that the zero-allocation reader has doubled its performance. This has resulted in a perf boost to the record/decoder iterators, but not as dramatic. In doing this, I've refactored pieces of the code, which includes some public facing changes. 1. `ByteString` is no longer a newtype because it no longer provided any added benefit over `Vec`. Instead, it is a type alias. Since `ByteString` deref'd to `Vec`, it's possible your code will need no changes. If you used `ByteString` specific things (like its constructor), then you'll need to replace it with standard `Vec` functions 2. Parse errors have been tweaked. Notably, line/column numbers are no longer recorded. Instead, record/field numbers are saved. (This was done for performance reasons.) See the documentation for the error's new structure. 3. The `index` sub-module has received some documentation love and some small naming tweaks. Notably, the `csv` method was removed in favor of `Deref`/`DerefMut` impls on `Indexed`. No changes to the format were made. 4. The `quote` and `escape` methods have had their argument types tweaked. It is currently no longer possible to specify "no quoting" to the parser. [breaking-change] --- .gitignore | 3 +- Cargo.toml | 12 + bench_large/README.md | 34 +- bench_large/huge.go | 10 +- bench_large/huge.rs | 20 +- benches/bench.rs | 2 +- src/borrow_bytes.rs | 41 +++ src/buffered.rs | 125 -------- src/bytestr.rs | 193 ------------ src/decoder.rs | 4 +- src/encoder.rs | 2 +- src/index/mod.rs | 281 ++++++++++++++++- src/lib.rs | 111 ++++--- src/reader.rs | 701 ++++++++++++++++++------------------------ src/tests.rs | 16 +- src/writer.rs | 2 +- 16 files changed, 728 insertions(+), 829 deletions(-) create mode 100644 src/borrow_bytes.rs delete mode 100644 src/buffered.rs delete mode 100644 src/bytestr.rs diff --git a/.gitignore b/.gitignore index b265ff5..d24edbd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,9 @@ .*.swp doc tags -examples/data/ss10pusa.csv +examples/ss10pusa.csv build target Cargo.lock scratch* +bench_large/huge diff --git a/Cargo.toml b/Cargo.toml index 3f6333a..2362305 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,14 @@ license = "Unlicense" [lib] name = "csv" +bench = false + +[[bin]] +name = "bench-large" +path = "bench_large/huge.rs" +test = false +bench = false +doc = false [dependencies] byteorder = "*" @@ -23,3 +31,7 @@ regex = "*" [profile.bench] opt-level = 3 lto = true # this doesn't seem to work... why? + +[profile.release] +opt-level = 3 +lto = true diff --git a/bench_large/README.md b/bench_large/README.md index df901ce..68f3bde 100644 --- a/bench_large/README.md +++ b/bench_large/README.md @@ -14,7 +14,7 @@ Then compile and run: go build -o huge-go time ./huge-go -To run the huge benchmark for Rust, make sure `ss10pusa.csv` is in the same +To run the huge benchmark for Rust, make sure `ss10pusa.csv` is in the same location as above and run: rustc --opt-level=3 -Z lto -L ../target/release/ huge.rs -o huge-rust @@ -23,43 +23,43 @@ location as above and run: To get libraries in `../target/release/`, run `cargo build --release` in the project root directory. -(Please make sure that one CPU is pegged when running this benchmark. If it +(Please make sure that one CPU is pegged when running this benchmark. If it isn't, you're probably just testing the speed of your disk.) ### Results -Benchmarks were run on an Intel i3930K. Note that the -'ns/iter' value is computed by each language's microbenchmark facilities. I +Benchmarks were run on an Intel i3930K. Note that the +'ns/iter' value is computed by each language's microbenchmark facilities. I suspect the granularity is big enough that the values are comparable. For rust, --opt-level=3 was used. ``` -Go 41033948 ns/iter -Rust (decode) 24016498 -Rust (string) 17052713 -Rust (byte string) 14876428 -Rust (byte slice) 11932269 +Go 41146322 ns/iter +Rust (decode) 16341720 +Rust (string) 10959665 +Rust (byte string) 9228027 +Rust (byte slice) 5589359 ``` -You'll note that none of the above benchmarks use a particularly large CSV -file. So I've also run a pretty rough benchmark on a huge CSV file (3.6GB). A -single large benchmark isn't exactly definitive, but I think we can use it as a +You'll note that none of the above benchmarks use a particularly large CSV +file. So I've also run a pretty rough benchmark on a huge CSV file (3.6GB). A +single large benchmark isn't exactly definitive, but I think we can use it as a ballpark estimate. -The huge benchmark for both Rust and Go use buffering. The times are wall +The huge benchmark for both Rust and Go use buffering. The times are wall clock times. The file system cache was warm and no disk access occurred during the benchmark. Both use a negligible and constant amount of memory (~1KB). ``` -Go 146 seconds -Rust (byte slice) 32 seconds +Go 190 seconds +Rust (byte slice) 19 seconds ``` -TODO: Fill in the other Rust access patterns for the huge benchmark. (The "byte +TODO: Fill in the other Rust access patterns for the huge benchmark. (The "byte slice" access pattern is the fastest.) -TODO: Benchmark with Python. (Estimate: "byte slice" is faster by around 2x, +TODO: Benchmark with Python. (Estimate: "byte slice" is faster by around 2x, but the other access patterns are probably comparable.) diff --git a/bench_large/huge.go b/bench_large/huge.go index f31f84b..d2a29a5 100644 --- a/bench_large/huge.go +++ b/bench_large/huge.go @@ -2,22 +2,26 @@ package main import ( "encoding/csv" + "fmt" "io" "log" "os" ) -func readAll(r io.Reader) { +func readAll(r io.Reader) int { + fields := 0 csvr := csv.NewReader(r) for { - _, err := csvr.Read() + row, err := csvr.Read() if err != nil { if err == io.EOF { break } log.Fatal(err) } + fields += len(row) } + return fields } func main() { @@ -28,5 +32,5 @@ func main() { if err != nil { log.Fatal(err) } - readAll(f) + fmt.Println(readAll(f)) } diff --git a/bench_large/huge.rs b/bench_large/huge.rs index 0be438e..198b48f 100644 --- a/bench_large/huge.rs +++ b/bench_large/huge.rs @@ -1,16 +1,16 @@ extern crate csv; -use std::path::Path; - fn main() { - let huge = "../examples/data/ss10pusa.csv"; - let mut rdr = csv::Reader::from_file(&Path::new(huge)); - while !rdr.done() { - loop { - match rdr.next_field() { - None => break, - Some(f) => { f.unwrap(); } - } + let huge = ::std::env::args().nth(1).unwrap(); + let mut rdr = csv::Reader::from_file(huge).unwrap(); + let mut count = 0; + loop { + match rdr.next_bytes() { + csv::NextField::Error(err) => panic!("{:?}", err), + csv::NextField::EndOfCsv => break, + csv::NextField::EndOfRecord => {} + csv::NextField::Data(_) => { count += 1; } } } + println!("{}", count); } diff --git a/benches/bench.rs b/benches/bench.rs index b92cd27..aabebf8 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -37,7 +37,7 @@ fn raw_records(b: &mut Bencher) { b.iter(|| { let mut dec = reader(&mut data); while !dec.done() { - while let Some(r) = dec.next_field().into_iter_result() { + while let Some(r) = dec.next_bytes().into_iter_result() { r.unwrap(); } } diff --git a/src/borrow_bytes.rs b/src/borrow_bytes.rs new file mode 100644 index 0000000..952e819 --- /dev/null +++ b/src/borrow_bytes.rs @@ -0,0 +1,41 @@ +use std::borrow::{Cow, ToOwned}; +use ByteString; + +/// A trait that permits borrowing byte vectors. +/// +/// This is useful for providing an API that can abstract over Unicode +/// strings and byte strings. +pub trait BorrowBytes { + /// Borrow a byte vector. + fn borrow_bytes<'a>(&'a self) -> &'a [u8]; +} + +impl BorrowBytes for String { + fn borrow_bytes(&self) -> &[u8] { self.as_bytes() } +} + +impl BorrowBytes for str { + fn borrow_bytes(&self) -> &[u8] { self.as_bytes() } +} + +impl BorrowBytes for ByteString { + fn borrow_bytes(&self) -> &[u8] { &**self } +} + +impl BorrowBytes for [u8] { + fn borrow_bytes(&self) -> &[u8] { self } +} + +impl<'a, B: ?Sized> BorrowBytes for Cow<'a, B> + where B: BorrowBytes + ToOwned, ::Owned: BorrowBytes { + fn borrow_bytes(&self) -> &[u8] { + match *self { + Cow::Borrowed(v) => v.borrow_bytes(), + Cow::Owned(ref v) => v.borrow_bytes(), + } + } +} + +impl<'a, T: ?Sized + BorrowBytes> BorrowBytes for &'a T { + fn borrow_bytes(&self) -> &[u8] { (*self).borrow_bytes() } +} diff --git a/src/buffered.rs b/src/buffered.rs deleted file mode 100644 index 1cee096..0000000 --- a/src/buffered.rs +++ /dev/null @@ -1,125 +0,0 @@ -// This is a copy of the `std::io::BufReader` with one additional -// method: `clear`. It resets the buffer to be empty (thereby losing any -// unread data). -use std::cmp; -use std::fmt; -use std::io::{self, BufRead}; -use std::slice; - -static DEFAULT_BUF_SIZE: usize = 1024 * 1024; - -/// Wraps a `Read` and buffers input from it -/// -/// It can be excessively inefficient to work directly with a `Read` instance. -/// For example, every call to `read` on `TcpStream` results in a system call. -/// A `BufReader` performs large, infrequent reads on the underlying `Read` -/// and maintains an in-memory buffer of the results. -pub struct BufReader { - inner: R, - buf: io::Cursor>, -} - -impl BufReader { - /// Creates a new `BufReader` with a default buffer capacity - pub fn new(inner: R) -> BufReader { - BufReader::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufReader` with the specified buffer capacity - pub fn with_capacity(cap: usize, inner: R) -> BufReader { - BufReader { - inner: inner, - buf: io::Cursor::new(Vec::with_capacity(cap)), - } - } - - /// Gets a reference to the underlying reader. - #[allow(dead_code)] pub fn get_ref(&self) -> &R { &self.inner } - - /// Gets a mutable reference to the underlying reader. - /// - /// # Warning - /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_mut(&mut self) -> &mut R { &mut self.inner } - - /// Unwraps this `BufReader`, returning the underlying reader. - /// - /// Note that any leftover data in the internal buffer is lost. - #[allow(dead_code)] pub fn into_inner(self) -> R { self.inner } - - pub fn clear(&mut self) { - self.buf.set_position(0); - self.buf.get_mut().truncate(0); - } -} - -impl io::Read for BufReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - // If we don't have any buffered data and we're doing a massive read - // (larger than our internal buffer), bypass our internal buffer - // entirely. - if self.buf.get_ref().len() == self.buf.position() as usize && - buf.len() >= self.buf.get_ref().capacity() { - return self.inner.read(buf); - } - try!(self.fill_buf()); - self.buf.read(buf) - } -} - -impl io::BufRead for BufReader { - fn fill_buf(&mut self) -> io::Result<&[u8]> { - // If we've reached the end of our internal buffer then we need to - // fetch some more data from the underlying reader. - if self.buf.position() as usize == self.buf.get_ref().len() { - self.buf.set_position(0); - let v = self.buf.get_mut(); - v.truncate(0); - let inner = &mut self.inner; - try!(with_end_to_cap(v, |b| inner.read(b))); - } - self.buf.fill_buf() - } - - fn consume(&mut self, amt: usize) { - self.buf.consume(amt) - } -} - -impl fmt::Debug for BufReader where R: fmt::Debug { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "BufReader {{ reader: {:?}, buffer: {}/{} }}", - self.inner, self.buf.position(), self.buf.get_ref().len()) - } -} - -// Acquires a slice of the vector `v` from its length to its capacity -// (uninitialized data), reads into it, and then updates the length. -// -// This function is leveraged to efficiently read some bytes into a destination -// vector without extra copying and taking advantage of the space that's -// already in `v`. -// -// The buffer we're passing down, however, is pointing at uninitialized data -// (the end of a `Vec`), and many operations will be *much* faster if we don't -// have to zero it out. In order to prevent LLVM from generating an `undef` -// value when reads happen from this uninitialized memory, we force LLVM to -// think it's initialized by sending it through a black box. This should -// prevent actual undefined behavior after optimizations. -fn with_end_to_cap(v: &mut Vec, f: F) -> io::Result - where F: FnOnce(&mut [u8]) -> io::Result { - unsafe { - let n = try!(f({ - let base = v.as_mut_ptr().offset(v.len() as isize); - slice::from_raw_parts_mut(base, v.capacity() - v.len()) - })); - - // If the closure (typically a `read` implementation) reported that it - // read a larger number of bytes than the vector actually has, we need - // to be sure to clamp the vector to at most its capacity. - let new_len = cmp::min(v.capacity(), v.len() + n); - v.set_len(new_len); - return Ok(n); - } -} diff --git a/src/bytestr.rs b/src/bytestr.rs deleted file mode 100644 index 95ac1be..0000000 --- a/src/bytestr.rs +++ /dev/null @@ -1,193 +0,0 @@ -use std::borrow::{Borrow, Cow, ToOwned}; -use std::fmt; -use std::hash; -use std::iter::{FromIterator, IntoIterator}; -use std::ops; - -/// A trait that permits borrowing byte vectors. -/// -/// This is useful for providing an API that can abstract over Unicode -/// strings and byte strings. -pub trait BorrowBytes { - /// Borrow a byte vector. - fn borrow_bytes<'a>(&'a self) -> &'a [u8]; -} - -impl BorrowBytes for String { - fn borrow_bytes(&self) -> &[u8] { self.as_bytes() } -} - -impl BorrowBytes for str { - fn borrow_bytes(&self) -> &[u8] { self.as_bytes() } -} - -impl BorrowBytes for Vec { - fn borrow_bytes(&self) -> &[u8] { &**self } -} - -impl BorrowBytes for ByteString { - fn borrow_bytes(&self) -> &[u8] { &**self } -} - -impl BorrowBytes for [u8] { - fn borrow_bytes(&self) -> &[u8] { self } -} - -impl<'a, B: ?Sized> BorrowBytes for Cow<'a, B> - where B: BorrowBytes + ToOwned, ::Owned: BorrowBytes { - fn borrow_bytes(&self) -> &[u8] { - match *self { - Cow::Borrowed(v) => v.borrow_bytes(), - Cow::Owned(ref v) => v.borrow_bytes(), - } - } -} - -impl<'a, T: ?Sized + BorrowBytes> BorrowBytes for &'a T { - fn borrow_bytes(&self) -> &[u8] { (*self).borrow_bytes() } -} - -/// A type that represents unadulterated byte strings. -/// -/// Byte strings represent *any* 8 bit character encoding. There are no -/// restrictions placed on the type of encoding used. (This means that there -/// may be *multiple* encodings in any particular byte string!) -/// -/// Many CSV files in the wild aren't just malformed with respect to RFC 4180, -/// but they are commonly *not* UTF-8 encoded. Even worse, some of them are -/// encoded improperly. Therefore, any useful CSV parser must be flexible with -/// respect to encodings. -/// -/// Thus, this CSV parser uses byte strings internally. This means that -/// quotes and field and record separators *must* be ASCII. Otherwise, -/// the parser places no other restrictions on the content of data in each -/// cell. -/// -/// Note that most of the methods in the encoder/decoder will assume UTF-8 -/// encoding, but they also expose some lower level methods that use byte -/// strings when absolutely necessary. This type is exposed in case you need -/// to deal with the raw bytes directly. -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct ByteString(Vec); - -impl ByteString { - /// Create a new byte string from a vector or slice of bytes. - pub fn from_bytes<'a, S>(bs: S) -> ByteString where S: Into> { - ByteString(bs.into()) - } - - /// Consumes this byte string into a vector of bytes. - pub fn into_bytes(self) -> Vec { - self.0 - } - - /// Returns this byte string as a slice of bytes. - pub fn as_bytes<'a>(&'a self) -> &'a [u8] { - &**self - } - - /// Consumes the byte string and decodes it into a Unicode string. If the - /// decoding fails, then the original ByteString is returned. - pub fn into_utf8_string(self) -> Result { - // FIXME: Figure out how to return an error here. - String::from_utf8(self.into_bytes()) - .map_err(|err| ByteString(err.into_bytes())) - } - - /// Return the number of bytes in the string. - pub fn len(&self) -> usize { - self.as_bytes().len() - } - - /// Returns whether the byte string is empty or not. - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl fmt::Debug for ByteString { - /// Writes the underlying bytes as a `&[u8]`. - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - // XXX: Ideally, we could just do this: - // - // f.write(chars[]) - // - // and let the output device figure out how to render it. But it seems - // the formatting infrastructure assumes that the data is UTF-8 - // encodable, which obviously doesn't work with raw byte strings. - // - // For now, we just show the bytes, e.g., `[255, 50, 48, 49, ...]`. - write!(f, "{:?}", &**self) - } -} - -impl ops::Deref for ByteString { - type Target = [u8]; - - fn deref<'a>(&'a self) -> &'a [u8] { - &*self.0 - } -} - -impl ops::Index for ByteString { - type Output = [u8]; - - fn index<'a>(&'a self, _: ops::RangeFull) -> &'a [u8] { - &**self - } -} - -impl ops::Index> for ByteString { - type Output = [u8]; - - fn index<'a>(&'a self, index: ops::RangeFrom) -> &'a [u8] { - &(&**self)[index.start..] - } -} - -impl ops::Index> for ByteString { - type Output = [u8]; - - fn index<'a>(&'a self, index: ops::RangeTo) -> &'a [u8] { - &(&**self)[..index.end] - } -} - -impl ops::Index> for ByteString { - type Output = [u8]; - - fn index<'a>(&'a self, index: ops::Range) -> &'a [u8] { - &(&**self)[index.start..index.end] - } -} - -impl hash::Hash for ByteString { - fn hash(&self, state: &mut H) { - // WHOA. This used to be `(&*self).hash(hasher);`, but it introduced - // a *major* performance regression that got fixed by using - // `self.as_slice().hash(hasher);` instead. I didn't do any profiling, - // but maybe the `(&*self)` was causing a copy somehow through the - // `Deref` trait? No clue. ---AG - // - // TODO: Try `(&*self)` again (maybe when 1.0 hits). If the regression - // remains, create a smaller reproducible example and report it as a - // bug. - self.0.hash(state); - } -} - -impl PartialEq for ByteString where S: ops::Deref { - fn eq(&self, other: &S) -> bool { - self.as_bytes() == other.as_bytes() - } -} - -impl FromIterator for ByteString { - fn from_iter>(it: I) -> ByteString { - ByteString::from_bytes(it.into_iter().collect::>()) - } -} - -impl Borrow<[u8]> for ByteString { - fn borrow(&self) -> &[u8] { &*self.0 } -} diff --git a/src/decoder.rs b/src/decoder.rs index 607da84..5f7e544 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -42,7 +42,7 @@ impl Decoded { } fn pop_string(&mut self) -> Result { - {try!(self.pop())}.into_utf8_string().map_err(|bytes| { + String::from_utf8(try!(self.pop())).map_err(|bytes| { Error::Decode( format!("Could not convert bytes '{:?}' to UTF-8.", bytes)) }) @@ -62,7 +62,7 @@ impl Decoded { } fn push_string(&mut self, s: String) { - self.push(ByteString::from_bytes(s.into_bytes())); + self.push(s.into_bytes()); } fn err<'a, T, S>(&self, msg: S) -> Result where S: Into { diff --git a/src/encoder.rs b/src/encoder.rs index c259132..6e3a2a6 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -24,7 +24,7 @@ impl Encoded { fn push_bytes<'a, S>(&mut self, s: S) -> Result<()> where S: Into> { - self.record.push(ByteString::from_bytes(s)); + self.record.push(s.into()); Ok(()) } diff --git a/src/index/mod.rs b/src/index/mod.rs index d1a1502..0ccd61d 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,19 +1,85 @@ -#![allow(missing_docs)] +/*! +This sub-module provides experimental CSV record indexing. +It is simplistic, but once a CSV index is created, you can use it to jump to +any record in the data instantly. In essence, it gives you random access for a +modest upfront cost in time and memory. + +## Simple example + +This example shows how to create an in-memory index and use it to jump to +any record in the data. (The indexing interface works with seekable readers +and writers, so you can use `std::fs::File` for this too.) + +```rust +# extern crate csv; fn main() { +use std::io::{self, Write}; +use csv::index::{Indexed, create_index}; + +let data = " +h1,h2,h3 +a,b,c +d,e,f +g,h,i +"; + +let new_csv_rdr = || csv::Reader::from_string(data); + +let mut index_data = io::Cursor::new(Vec::new()); +create_index(new_csv_rdr(), index_data.by_ref()).unwrap(); +let mut index = Indexed::open(new_csv_rdr(), index_data).unwrap(); + +// Seek to the second record and read its data. This is done *without* +// reading the first record. +index.seek(1).unwrap(); + +// Read the first row at this position (which is the second record). +// Since `Indexed` derefs to a `csv::Reader`, we can call CSV reader methods +// on it directly. +let row = index.records().next().unwrap().unwrap(); + +assert_eq!(row, vec!["d", "e", "f"]); +# } +``` +*/ use std::io; +use std::ops; use byteorder::{ReadBytesExt, WriteBytesExt, BigEndian}; use {Result, Error, Reader, NextField}; +/// A type for representing CSV data with a basic record index. +/// +/// The index is a sequence of 64 bit (8 bytes) integers, where each integer +/// corresponds to the *byte offset* of the start of the corresponding record +/// in the CSV data. This allows one to skip to any record in the CSV data +/// and read it instantly. +/// +/// Note that this type derefs to a `&mut csv::Reader`. pub struct Indexed { rdr: Reader, idx: I, count: u64, } -impl Indexed { - pub fn new(mut rdr: Reader, mut idx: I) -> Result> { +impl ops::Deref for Indexed { + type Target = Reader; + fn deref(&self) -> &Reader { &self.rdr } +} + +impl ops::DerefMut for Indexed { + fn deref_mut(&mut self) -> &mut Reader { &mut self.rdr } +} + +impl Indexed where R: io::Read + io::Seek, I: io::Read + io::Seek { + /// Opens a new index corresponding to the CSV reader given. + /// + /// If the CSV reader has headers enabled, they are read first. + /// + /// Note that there are no checks in place to make sure the index + /// accurately represents the CSV reader given. + pub fn open(mut rdr: Reader, mut idx: I) -> Result> { try!(idx.seek(io::SeekFrom::End(-8))); let mut count = try!(idx.read_u64::()); if rdr.has_headers && count > 0 { @@ -27,38 +93,51 @@ impl Indexed { }) } + /// Seeks to `i`th record. + /// + /// This uses zero-based indexing, so seeking to the `0`th record will read + /// the first record. (The first record may not be the first written row + /// in the CSV data if the underlying reader has headers enabled.) + /// + /// An error is returned if the index given is greater than or equal to the + /// number of records in the index. pub fn seek(&mut self, mut i: u64) -> Result<()> { if i >= self.count { return Err(Error::Index(format!( "Record index {} is out of bounds. (There are {} records.)", i, self.count))); } + // If the underlying reader has headers enabled, then we should offset + // the index appropriately. if self.rdr.has_headers { i += 1; } + // 1. Seek the index. + // 2. Read the corresponding offset. + // 3. Seek the CSV reader. try!(self.idx.seek(io::SeekFrom::Start(i * 8))); let offset = try!(self.idx.read_u64::()); self.rdr.seek(offset) } + /// Returns the number of CSV records in the index in `O(1)` time. pub fn count(&self) -> u64 { self.count } - - pub fn csv<'a>(&'a mut self) -> &'a mut Reader { - &mut self.rdr - } } -pub fn create - (mut csv_rdr: Reader, mut idx_wtr: W) -> Result<()> { +/// Creates a new index for the given CSV reader. +/// +/// The CSV data is read from `rdr` and the index is written to `wtr`. +pub fn create_index(mut rdr: Reader, mut wtr: W) -> Result<()> + where R: io::Read + io::Seek, W: io::Write { // Seek to the beginning so that we get everything. - try!(csv_rdr.seek(0)); + try!(rdr.seek(0)); let mut count = 0u64; - while !csv_rdr.done() { - try!(idx_wtr.write_u64::(csv_rdr.byte_offset())); + while !rdr.done() { + try!(wtr.write_u64::(rdr.byte_offset())); loop { - match csv_rdr.next_field() { + match rdr.next_bytes() { NextField::EndOfCsv => break, NextField::EndOfRecord => { count += 1; break; }, NextField::Error(err) => return Err(err), @@ -66,5 +145,179 @@ pub fn create } } } - idx_wtr.write_u64::(count).map_err(From::from) + wtr.write_u64::(count).map_err(From::from) +} + +#[cfg(test)] +mod tests { + use std::io::{self, Write}; + use Reader; + + type CsvReader = Reader>>; + type Bytes = io::Cursor>; + type Indexed = super::Indexed; + + fn index>(s: S) -> Indexed { + index_with(s, |rdr| rdr, |rdr| rdr) + } + + fn index_nh>(s: S) -> Indexed { + let then = |rdr: CsvReader| rdr.has_headers(false); + index_with(s, &then, &then) + } + + fn index_with(s: S, create: F, new: G) -> Indexed + where S: Into, + F: FnOnce(CsvReader) -> CsvReader, + G: FnOnce(CsvReader) -> CsvReader { + let data = s.into(); + let mut idx_bytes = io::Cursor::new(vec![]); + super::create_index(create(Reader::from_string(&*data)), + idx_bytes.by_ref()).unwrap(); + super::Indexed::open(new(Reader::from_string(data)), + idx_bytes).unwrap() + } + + fn next(idx: &mut Indexed) -> Vec { + idx.records().next().unwrap().unwrap() + } + + fn nth(idx: &mut Indexed, i: u64) -> Vec { + idx.seek(i).unwrap(); + next(idx) + } + + #[test] + fn headers_one_field() { + let data = "\ +h1 +a +b +c +"; + let mut idx = index(data); + assert_eq!(idx.count(), 3); + + assert_eq!(nth(&mut idx, 0), vec!["a"]); + assert_eq!(nth(&mut idx, 1), vec!["b"]); + assert_eq!(nth(&mut idx, 2), vec!["c"]); + } + + #[test] + fn headers_many_fields() { + let data = "\ +h1,h2,h3 +a,b,c +d,e,f +g,h,i +"; + let mut idx = index(data); + assert_eq!(idx.count(), 3); + + assert_eq!(nth(&mut idx, 0), vec!["a", "b", "c"]); + assert_eq!(nth(&mut idx, 1), vec!["d", "e", "f"]); + assert_eq!(nth(&mut idx, 2), vec!["g", "h", "i"]); + } + + #[test] + fn no_headers_one_field() { + let data = "\ +h1 +a +b +c +"; + let mut idx = index_nh(data); + assert_eq!(idx.count(), 4); + + assert_eq!(nth(&mut idx, 0), vec!["h1"]); + assert_eq!(nth(&mut idx, 1), vec!["a"]); + assert_eq!(nth(&mut idx, 2), vec!["b"]); + assert_eq!(nth(&mut idx, 3), vec!["c"]); + } + + #[test] + fn no_headers_many_fields() { + let data = "\ +h1,h2,h3 +a,b,c +d,e,f +g,h,i +"; + let mut idx = index_nh(data); + assert_eq!(idx.count(), 4); + + assert_eq!(nth(&mut idx, 0), vec!["h1", "h2", "h3"]); + assert_eq!(nth(&mut idx, 1), vec!["a", "b", "c"]); + assert_eq!(nth(&mut idx, 2), vec!["d", "e", "f"]); + assert_eq!(nth(&mut idx, 3), vec!["g", "h", "i"]); + } + + #[test] + fn switch_headers_one_field1() { + let data = "\ +h1 +a +b +c +"; + let mut idx = index_with(data, |r| r.has_headers(false), |r| r); + assert_eq!(idx.count(), 3); + + assert_eq!(nth(&mut idx, 0), vec!["a"]); + assert_eq!(nth(&mut idx, 1), vec!["b"]); + assert_eq!(nth(&mut idx, 2), vec!["c"]); + } + + #[test] + fn switch_headers_one_field2() { + let data = "\ +h1 +a +b +c +"; + let mut idx = index_with(data, |r| r, |r| r.has_headers(false)); + assert_eq!(idx.count(), 4); + + assert_eq!(nth(&mut idx, 0), vec!["h1"]); + assert_eq!(nth(&mut idx, 1), vec!["a"]); + assert_eq!(nth(&mut idx, 2), vec!["b"]); + assert_eq!(nth(&mut idx, 3), vec!["c"]); + } + + #[test] + fn headers_one_field_newlines() { + let data = " + + + + +h1 + +a + + +b + + + + + + +c + + + + + + +"; + let mut idx = index(data); + assert_eq!(idx.count(), 3); + + assert_eq!(nth(&mut idx, 0), vec!["a"]); + assert_eq!(nth(&mut idx, 1), vec!["b"]); + assert_eq!(nth(&mut idx, 2), vec!["c"]); + } } diff --git a/src/lib.rs b/src/lib.rs index 609481b..87f5a07 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,8 +79,7 @@ //! chocolate,refile,7"; //! //! let mut rdr = csv::Reader::from_string(data).has_headers(false); -//! for row in rdr.records() { -//! let row = row.unwrap(); +//! for row in rdr.records().map(|r| r.unwrap()) { //! println!("{:?}", row); //! } //! ``` @@ -96,8 +95,7 @@ //! chocolate,refile,7"; //! //! let mut rdr = csv::Reader::from_bytes(&data[..]).has_headers(false); -//! for row in rdr.byte_records() { -//! let row = row.unwrap(); +//! for row in rdr.byte_records().map(|r| r.unwrap()) { //! println!("{:?}", row); //! } //! ``` @@ -124,7 +122,7 @@ //! //! let mut rdr = csv::Reader::from_string(data); //! while !rdr.done() { -//! while let Some(r) = rdr.next_field().into_iter_result() { +//! while let Some(r) = rdr.next_bytes().into_iter_result() { //! print!("{:?} ", r.unwrap()); //! } //! println!(""); @@ -134,15 +132,23 @@ //! There is more explanation for how this iterator interface works on the //! `Reader` type. //! +//! ## Indexing +//! +//! This crate has experimental support for CSV record indexing. It's very +//! simplistic, but once the index is created, you can seek a `csv::Reader` +//! to any record instantly. See the +//! [`csv::index`](/rustdoc/csv/index/index.html) +//! sub-module for more details and examples. +//! //! ## Compliance with RFC 4180 //! //! [RFC 4180](http://tools.ietf.org/html/rfc4180) seems to the closest thing -//! to an official specification for CSV. -//! Currently, the parser in this crate will read a strict superset of -//! RFC 4180 while the writer will always write CSV data that conforms to -//! RFC 4180. This approach was taken because CSV data is commonly malformed -//! and there is nothing worse than trying to read busted CSV data with a -//! library that says it can't do it. +//! to an official specification for CSV. Currently, the parser in this crate +//! will read a strict superset of RFC 4180 while the writer will always write +//! CSV data that conforms to RFC 4180 (unless configured to do otherwise). +//! This approach was taken because CSV data is commonly malformed and there is +//! nothing worse than trying to read busted CSV data with a library that says +//! it can't do it. //! //! With that said, a "strict" mode may be added that will only read CSV data //! that conforms to RFC 4180. @@ -153,14 +159,12 @@ //! parser. By default, the encoder uses LF line endings but can be //! instructed to use CRLF with the `crlf` method. //! * The first record is read as a "header" by default, but this can be -//! disabled by calling `no_headers` before reading any records. +//! disabled by calling `has_headers(false)` before reading any records. //! (N.B. The encoder has no explicit support for headers. Simply encode a //! vector of strings instead.) //! * By default, the delimiter is a comma, but it can be changed to any //! **ASCII** byte character with the `delimiter` method (for either //! writing or reading). -//! * The decoder interprets `\"` as an escaped quote in addition to the -//! standard `""`. //! * By default, both the writer and reader will enforce the invariant //! that all records are the same length. (This is what RFC 4180 demands.) //! If a record with a different length is found, an error is returned. @@ -186,7 +190,7 @@ use std::fmt; use std::io; use std::result; -pub use bytestr::{BorrowBytes, ByteString}; +pub use borrow_bytes::BorrowBytes; pub use encoder::Encoded; pub use decoder::Decoded; pub use reader::{ @@ -195,11 +199,16 @@ pub use reader::{ }; pub use writer::{Writer, QuoteStyle}; -/// An experimental module for processing CSV data in parallel. +macro_rules! lg { + ($($tt:tt)*) => ({ + use std::io::Write; + writeln!(&mut ::std::io::stderr(), $($tt)*).unwrap(); + }); +} + pub mod index; -mod buffered; -mod bytestr; +mod borrow_bytes; mod encoder; mod decoder; mod reader; @@ -212,6 +221,9 @@ mod tests; /// operations. pub type Result = result::Result; +/// A convenience type for referring to a plain byte string. +pub type ByteString = Vec; + /// An error produced by an operation on CSV data. #[derive(Debug)] pub enum Error { @@ -220,40 +232,41 @@ pub enum Error { /// An error reported by the type-based decoder. Decode(String), /// An error reported by the CSV parser. - Parse(ParseError), + Parse(LocatableError), /// An error originating from reading or writing to the underlying buffer. Io(io::Error), /// An error originating from using a CSV index. Index(String), } -/// A description of a CSV parse error. +/// An error tagged with a location at which it occurred. #[derive(Clone, Copy, Debug)] -pub struct ParseError { - /// The line number of the parse error. - pub line: u64, - /// The column (byte offset) of the parse error. - pub column: u64, - /// The type of parse error. - pub kind: ParseErrorKind, +pub struct LocatableError { + /// The record number (starting at 1). + pub record: u64, + /// The field number (starting at 1). + pub field: u64, + /// The error. + pub err: T, } -/// The different types of parse errors. -/// -/// Currently, the CSV parser is extremely liberal in what it accepts, so -/// there aren't many possible parse errors. (Instead, there are just -/// variations in what the parse returns.) -/// -/// If and when a "strict" mode is added to this crate, this list of errors -/// will expand. +/// A description of a CSV parse error. #[derive(Clone, Copy, Debug)] -pub enum ParseErrorKind { - /// This error occurs when a record has a different number of fields - /// than the first record parsed. - UnequalLengths(u64, u64), - - /// This error occurs when parsing CSV data as Unicode. - InvalidUTF8, +pub enum ParseError { + /// A record was found that has a different size than other records. + /// + /// This is only reported when `flexible` is set to `false` on the + /// corresponding CSV reader/writer. + UnequalLengths { + /// Expected a record with this many fields. + expected: u64, + /// Got a record with this many fields. + got: u64, + }, + /// An error occurred when trying to convert a field to a Unicode string. + /// + /// TODO: Include the real Utf8Error, but it is not stabilized yet. + InvalidUtf8, } impl fmt::Display for Error { @@ -268,20 +281,20 @@ impl fmt::Display for Error { } } -impl fmt::Display for ParseError { +impl fmt::Display for LocatableError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "CSV parse error:{}:{}: {}", - self.line, self.column, self.kind) + write!(f, "CSV error (at record {}, field {}): {}", + self.record, self.field, self.err) } } -impl fmt::Display for ParseErrorKind { +impl fmt::Display for ParseError { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { - ParseErrorKind::UnequalLengths(first, cur) => + ParseError::UnequalLengths { expected, got } => write!(f, "First record has length {}, but found record \ - with length {}.", first, cur), - ParseErrorKind::InvalidUTF8 => + with length {}.", expected, got), + ParseError::InvalidUtf8 => write!(f, "Invalid UTF8 encoding."), } } diff --git a/src/reader.rs b/src/reader.rs index c9a2bae..12eee17 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,20 +1,18 @@ use std::fs; -use std::io::{self, BufRead}; +use std::io; use std::path::Path; +use std::str; use rustc_serialize::Decodable; -use buffered::BufReader; use { ByteString, Result, Decoded, - Error, ParseError, ParseErrorKind, + Error, LocatableError, ParseError, }; -use self::ParseState::{ - StartRecord, EndRecord, StartField, - RecordTermCR, RecordTermLF, RecordTermAny, - InField, InQuotedField, InQuotedFieldEscape, InQuotedFieldQuote, -}; +use self::State::*; + +const BUF_SIZE: usize = 1024 * 128; /// A record terminator. /// @@ -31,11 +29,22 @@ pub enum RecordTerminator { Any(u8), } +impl RecordTerminator { + #[inline] + fn is_crlf(&self) -> bool { + match *self { + RecordTerminator::CRLF => true, + RecordTerminator::Any(_) => false, + } + } +} + impl PartialEq for RecordTerminator { - fn eq(&self, other: &u8) -> bool { + #[inline] + fn eq(&self, &other: &u8) -> bool { match *self { - RecordTerminator::CRLF => *other == b'\r' || *other == b'\n', - RecordTerminator::Any(b) => *other == b + RecordTerminator::CRLF => other == b'\r' || other == b'\n', + RecordTerminator::Any(b) => other == b } } } @@ -85,23 +94,23 @@ impl PartialEq for RecordTerminator { /// } /// ``` pub struct Reader { - pmachine: ParseMachine, // various parsing settings - flexible: bool, // true => records of varying length are allowed - buffer: BufReader, - fieldbuf: Vec, // reusable buffer used to store fields - state: ParseState, // current state in parsing machine - errored: bool, // true when an error has occurred, parsing is done - eof: bool, // true when EOF is reached - - // Keep a copy of the first record parsed. - first_record: Vec, - parsing_first_record: bool, // true only before first EndRecord state - - // Is set if `seek` is ever called. - // This subtlely modifies the behavior of iterators so that there is - // no special handling of headers. (After you seek, iterators should - // just give whatever records are being parsed.) - has_seeked: bool, + rdr: R, + buf: Vec, + bufi: usize, + fieldbuf: Vec, + state: State, + eof: bool, + first_row: Vec, + first_row_done: bool, + irecord: u64, + ifield: u64, + byte_offset: u64, + delimiter: u8, + quote: u8, + escape: Option, + double_quote: bool, + record_term: RecordTerminator, + flexible: bool, // When this is true, the first record is interpreted as a "header" row. // This is opaque to the raw iterator, but is used in any iterator that @@ -110,13 +119,7 @@ pub struct Reader { // TODO: This is exposed for use in the `index` sub-module. Is that OK? #[doc(hidden)] pub has_headers: bool, - - // Various book-keeping counts. - field_count: u64, // number of fields in current record - column: u64, // current column (by byte, *shrug*) - line_record: u64, // line at which current record started - line_current: u64, // current line - byte_offset: u64, // current byte offset + has_seeked: bool, } impl Reader { @@ -124,41 +127,26 @@ impl Reader { /// /// The reader is buffered for you automatically. pub fn from_reader(rdr: R) -> Reader { - Reader::from_buffer(BufReader::new(rdr)) - } - - /// Creates a new CSV reader from a buffer. - /// - /// This allows you to create your own buffer with a capacity of your - /// choosing. In all other constructors, a buffer with default capacity - /// is created for you. - /// - /// ... but this isn't public right now because we're using our own - /// implemented of `BufReader`. - fn from_buffer(buf: BufReader) -> Reader { Reader { - pmachine: ParseMachine { - delimiter: b',', - record_terminator: RecordTerminator::CRLF, - quote: Some(b'"'), - escape: b'\\', - double_quote: true, - }, - flexible: false, - buffer: buf, + rdr: rdr, + buf: vec![0; BUF_SIZE], + bufi: BUF_SIZE, fieldbuf: Vec::with_capacity(1024), state: StartRecord, - errored: false, eof: false, - first_record: vec![], - parsing_first_record: true, - has_seeked: false, - has_headers: true, - field_count: 0, - column: 1, - line_record: 1, - line_current: 1, + first_row: vec![], + first_row_done: false, + irecord: 1, + ifield: 1, byte_offset: 0, + delimiter: b',', + quote: b'"', + escape: None, + double_quote: true, + record_term: RecordTerminator::CRLF, + flexible: false, + has_headers: true, + has_seeked: false, } } } @@ -375,7 +363,7 @@ impl Reader { /// /// The default value is `b','`. pub fn delimiter(mut self, delimiter: u8) -> Reader { - self.pmachine.delimiter = delimiter; + self.delimiter = delimiter; self } @@ -415,7 +403,7 @@ impl Reader { /// use `RecordTerminator::Any(b'\n')` to only accept line feeds as /// record terminators, or `b'\x1e'` for the ASCII record separator. pub fn record_terminator(mut self, term: RecordTerminator) -> Reader { - self.pmachine.record_terminator = term; + self.record_term = term; self } @@ -428,33 +416,31 @@ impl Reader { /// The default value is `b'"'`. /// /// If `quote` is `None`, then no quoting will be used. - pub fn quote(mut self, quote: Option) -> Reader { - self.pmachine.quote = quote; + pub fn quote(mut self, quote: u8) -> Reader { + self.quote = quote; self } /// Set the escape character to use when reading CSV data. /// - /// This is only used when `double_quote` is set to false. - /// /// Since the CSV reader is meant to be mostly encoding agnostic, you must /// specify the escape as a single ASCII byte. /// - /// The default value is `b'\\'`. - pub fn escape(mut self, escape: u8) -> Reader { - self.pmachine.escape = escape; + /// When set to `None` (which is the default), the "doubling" escape + /// is used for quote character. + /// + /// When set to something other than `None`, it is used as the escape + /// character for quotes. (e.g., `b'\\'`.) + pub fn escape(mut self, escape: Option) -> Reader { + self.escape = escape; self } - /// Set the quoting escape mechanism. - /// - /// When enabled (which is the default), quotes are escaped by doubling - /// them. e.g., `""` resolves to a single `"`. + /// Enable double quote escapes. /// - /// When disabled, double quotes have no significance. Instead, they can - /// be escaped with the escape character (which is `\\` by default). + /// When disabled, doubled quotes are not interpreted as escapes. pub fn double_quote(mut self, yes: bool) -> Reader { - self.pmachine.double_quote = yes; + self.double_quote = yes; self } @@ -466,23 +452,21 @@ impl Reader { /// Since ASCII delimited text is meant to be unquoted, this also sets /// `quote` to `None`. pub fn ascii(self) -> Reader { - self.quote(None) - .double_quote(false) - .delimiter(b'\x1f') + self.delimiter(b'\x1f') .record_terminator(RecordTerminator::Any(b'\x1e')) } } /// NextField is the result of parsing a single CSV field. /// -/// This is only useful if you're using the low level `next_field` method. -pub enum NextField<'a> { - /// A single CSV field as a borrow slice of bytes from the - /// parser's internal buffer. - Data(&'a [u8]), +/// This is only useful if you're using the low level `next_bytes` method. +#[derive(Debug)] +pub enum NextField<'a, T: ?Sized + 'a> { + /// A single CSV field as a borrowed slice of the parser's internal buffer. + Data(&'a T), /// A CSV error found during parsing. When an error is found, it is - /// first returned. All subsequent calls of `next_field` will return + /// first returned. All subsequent calls of `next_bytes` will return /// `EndOfCsv`. (EOF is exempt from this. Depending on the state of the /// parser, an EOF could trigger `Data`, `EndOfRecord` and `EndOfCsv`, /// all in succession.) @@ -499,9 +483,9 @@ pub enum NextField<'a> { EndOfCsv, } -impl<'a> NextField<'a> { +impl<'a, T: ?Sized + ::std::fmt::Debug> NextField<'a, T> { /// Transform NextField into an iterator result. - pub fn into_iter_result(self) -> Option> { + pub fn into_iter_result(self) -> Option> { match self { NextField::EndOfRecord | NextField::EndOfCsv => None, NextField::Error(err) => Some(Err(err)), @@ -513,6 +497,17 @@ impl<'a> NextField<'a> { pub fn is_end(&self) -> bool { if let NextField::EndOfCsv = *self { true } else { false } } + + /// Returns the underlying field data. + /// + /// If `NextField` is an error or an end of record/CSV marker, this will + /// panic. + pub fn unwrap(self) -> &'a T { + match self { + NextField::Data(field) => field, + v => panic!("Cannot unwrap '{:?}'", v), + } + } } /// These are low level methods for dealing with the raw bytes of CSV records. @@ -522,17 +517,17 @@ impl Reader { /// This is just like `headers`, except fields are `ByteString`s instead /// of `String`s. pub fn byte_headers(&mut self) -> Result> { - if !self.first_record.is_empty() { - Ok(self.first_record.clone()) + if !self.first_row.is_empty() { + Ok(self.first_row.clone()) } else { let mut headers = vec![]; loop { - let field = match self.next_field() { + let field = match self.next_bytes() { NextField::EndOfRecord | NextField::EndOfCsv => break, NextField::Error(err) => return Err(err), NextField::Data(field) => field, }; - headers.push(ByteString::from_bytes(field)); + headers.push(field.to_vec()); } assert!(headers.len() > 0 || self.done()); Ok(headers) @@ -543,7 +538,7 @@ impl Reader { /// of `String`s. pub fn byte_records<'a>(&'a mut self) -> ByteRecords<'a, R> { let first = self.has_seeked; - ByteRecords { p: self, first: first } + ByteRecords { p: self, first: first, errored: false } } /// Returns `true` if the CSV parser has reached its final state. When @@ -574,7 +569,7 @@ impl Reader { /// // This case analysis is necessary because we only want to /// // increment the count when `EndOfRecord` is seen. (If the /// // CSV data is empty, then it will never be emitted.) - /// match rdr.next_field() { + /// match rdr.next_bytes() { /// csv::NextField::EndOfCsv => break, /// csv::NextField::EndOfRecord => { count += 1; break; }, /// csv::NextField::Error(err) => panic!(err), @@ -586,7 +581,7 @@ impl Reader { /// assert_eq!(count, 5); /// ``` pub fn done(&self) -> bool { - self.eof || self.errored + self.eof } /// An iterator over fields in the current record. @@ -618,146 +613,142 @@ impl Reader { /// /// let mut rdr = csv::Reader::from_string(data); /// while !rdr.done() { - /// while let Some(r) = rdr.next_field().into_iter_result() { + /// while let Some(r) = rdr.next_bytes().into_iter_result() { /// print!("{:?} ", r.unwrap()); /// } /// println!(""); /// } /// ``` - pub fn next_field<'a>(&'a mut self) -> NextField<'a> { + pub fn next_bytes(&mut self) -> NextField<[u8]> { unsafe { self.fieldbuf.set_len(0); } - - // The EndRecord state indicates what you'd expect: stop the current - // iteration, check for same-length records and reset a little - // record-based book keeping. - if self.state == EndRecord { - let first_len = self.first_record.len() as u64; - if !self.flexible && first_len != self.field_count { - let err = self.parse_err(ParseErrorKind::UnequalLengths( - self.first_record.len() as u64, self.field_count)); - self.errored = true; - return NextField::Error(err); + loop { + if let Err(err) = self.fill_buf() { + return NextField::Error(Error::Io(err)); } - // After processing an EndRecord (and determined there are no - // errors), we should always start parsing the next record. - self.state = StartRecord; - self.parsing_first_record = false; - self.line_record = self.line_current; - self.field_count = 0; - return NextField::EndOfRecord; - } - - // Check to see if we've recorded an error and quit parsing if we have. - // This serves two purposes: - // 1) When CSV parsing reaches an error, it is unrecoverable. So the - // parse function will initially return that error (unless it is - // EOF) and then return `None` indefinitely. - // 2) EOF errors are handled specially and can be returned "lazily". - // e.g., EOF in the middle of parsing a field. First we have to - // return the field and then return "end of record" and then - // EOF on the next call. - if self.eof || self.errored { - // We don't return the error here because it is always returned - // immediately when it is first found (unless it's EOF, but if it's - // EOF, we just want to stop the iteration anyway). - return NextField::EndOfCsv; - } - - let mut consumed = 0; // tells the buffer how much we consumed - let mut err: Option = None; - 'TOPLOOP: loop { - // The following code is basically, "fill a buffer with data from - // the underlying reader if it's empty, and then run the parser - // over each byte in the slice returned." - // - // This technique is critical for performance, because it lifts - // a lot of case analysis off of each byte. (i.e., This loop could - // be more simply written with `buf.read_byte()`, but it is much - // slower.) - match self.buffer.fill_buf() { - Err(ioerr) => { - // The error is processed below. - // We don't handle it here because we need to do some - // book keeping first. - err = Some(Error::Io(ioerr)); - break 'TOPLOOP; + if self.buf.len() == 0 { + self.eof = true; + if let StartRecord = self.state { + return self.next_eoc(); + } else if let EndRecord = self.state { + self.state = StartRecord; + return self.next_eor(); + } else { + self.state = EndRecord; + return self.next_data(); } - Ok(bs) if bs.len() == 0 => { self.eof = true; break 'TOPLOOP } - Ok(bs) => { - // This "batch" processing of bytes is critical for - // performance. - for &b in bs { - let (accept, next) = - self.pmachine.parse_byte(self.state, b); - self.state = next; - if accept { self.fieldbuf.push(b); } - if self.state == EndRecord { - // Don't consume the byte we just read, because - // it is the first byte of the next record. - break 'TOPLOOP; + } + while self.bufi < self.buf.len() { + let c = self.buf[self.bufi]; + match self.state { + StartRecord => { + if self.is_record_term(c) { + self.bump(); } else { - consumed += 1; - self.column += 1; - self.byte_offset += 1; - if b == b'\n' { - self.line_current += 1; - self.column = 1; - } - if self.state == StartField { - break 'TOPLOOP - } + self.state = StartField; + } + } + EndRecord => { + if self.record_term.is_crlf() && c == b'\n' { + self.bump(); + } + self.state = StartRecord; + return self.next_eor(); + } + StartField => { + self.bump(); + if c == self.quote { + self.state = InQuotedField; + } else if c == self.delimiter { + return self.next_data(); + } else if self.is_record_term(c) { + self.state = EndRecord; + return self.next_data(); + } else { + self.add(c); + self.state = InField; + } + } + InField => { + self.bump(); + if c == self.delimiter { + self.state = StartField; + return self.next_data(); + } else if self.is_record_term(c) { + self.state = EndRecord; + return self.next_data(); + } else { + self.add(c); + } + } + InQuotedField => { + self.bump(); + if c == self.quote { + self.state = InDoubleEscapedQuote; + } else if self.escape == Some(c) { + self.state = InEscapedQuote; + } else { + self.add(c); + } + } + InEscapedQuote => { + self.bump(); + self.add(c); + self.state = InQuotedField; + } + InDoubleEscapedQuote => { + self.bump(); + if self.double_quote && c == self.quote { + self.add(c); + self.state = InQuotedField; + } else if c == self.delimiter { + self.state = StartField; + return self.next_data(); + } else if self.is_record_term(c) { + self.state = EndRecord; + return self.next_data(); + } else { + self.add(c); + self.state = InField; // degrade gracefully? } } } } - self.buffer.consume(consumed); - consumed = 0; } - // We get here when we break out of the loop, so make sure the buffer - // knows how much we read. - self.buffer.consume(consumed); - - // Handle the error. EOF is a bit tricky, but otherwise, we just stop - // the parser cold. - match (self.eof, err) { - (false, None) => {} - (true, None) => { - // If we get EOF while we're trying to parse a new record - // but haven't actually seen any fields yet (i.e., trailing - // new lines in a file), then we should immediately stop the - // parser. - if self.state == StartRecord { - return NextField::EndOfCsv; + } + + /// This is just like `next_bytes` except it converts each field to + /// a Unicode string in place. + pub fn next_str(&mut self) -> NextField { + // This really grates me. Once we call `next_bytes`, we initiate a + // *mutable* borrow of `self` that doesn't stop until the return value + // goes out of scope. Since we have to return that value, it will never + // go out of scope in this function. + // + // Therefore, we can't get access to any state information after + // calling `next_bytes`. But we might need it to report an error. + // + // One possible way around this is to use interior mutability... + let (record, field) = (self.irecord, self.ifield); + match self.next_bytes() { + NextField::EndOfRecord => NextField::EndOfRecord, + NextField::EndOfCsv => NextField::EndOfCsv, + NextField::Error(err) => NextField::Error(err), + NextField::Data(bytes) => { + match str::from_utf8(bytes) { + Ok(s) => NextField::Data(s), + Err(_) => NextField::Error(Error::Parse(LocatableError { + record: record, + field: field, + err: ParseError::InvalidUtf8, + })), } - // Otherwise, indicate that we've reached the end of the - // record. This will allow the last field to get returned. - // Then "EndOfRecord" will get returned. Finally, "EndOfCsv". - self.state = EndRecord; - // fallthrough to return current field. - // On the next call, `None` will be returned. } - (false, Some(err)) => { - // Reset the state to the beginning so that bad errors - // are always reported. (i.e., Don't let an EndRecord state - // slip in here.) - self.state = StartRecord; - self.errored = true; - return NextField::Error(err); - } - _ => unreachable!(), // can never have EOF and an error - } - if self.parsing_first_record { - // This is only copying bytes for the first record. - let bytes = ByteString::from_bytes(&*self.fieldbuf); - self.first_record.push(bytes); } - self.field_count += 1; - NextField::Data(&self.fieldbuf) } /// An unsafe iterator over byte fields. /// - /// This iterator calls `next_field` at each step. + /// This iterator calls `next_bytes` at each step. /// /// It is (wildly) unsafe because the lifetime yielded for each element /// is incorrect. It refers to the lifetime of the CSV reader instead of @@ -771,216 +762,112 @@ impl Reader { UnsafeByteFields { rdr: self } } - /// Returns the line at which the current record started. - pub fn line(&self) -> u64 { - self.line_record - } - /// Returns the byte offset at which the current record started. pub fn byte_offset(&self) -> u64 { self.byte_offset } - fn parse_err(&self, kind: ParseErrorKind) -> Error { - Error::Parse(ParseError { - line: self.line_record, - column: self.column, - kind: kind, - }) - } -} - -impl Reader { - /// Seeks the underlying reader to the file cursor specified. - /// - /// This comes with several caveats: - /// - /// * The existing buffer is dropped and a new one is created. - /// * If you seek to a position other than the start of a record, you'll - /// probably get an incorrect parse. (This is *not* unsafe.) - /// - /// Mostly, this is intended for use with the `index` sub module. - /// - /// Note that if `pos` is equivalent to the current *parsed* byte offset, - /// then no seeking is performed. (In this case, `seek` is a no-op.) - pub fn seek(&mut self, pos: u64) -> Result<()> { - self.has_seeked = true; - if pos == self.byte_offset() { - return Ok(()) - } - self.buffer.clear(); - self.errored = false; - self.eof = false; - self.byte_offset = pos; - try!(self.buffer.get_mut().seek(io::SeekFrom::Start(pos))); - Ok(()) - } -} - -#[derive(Clone, Copy)] -struct ParseMachine { - delimiter: u8, - record_terminator: RecordTerminator, - quote: Option, - escape: u8, - double_quote: bool, -} - -#[derive(Clone, Copy, Eq, PartialEq, Debug)] -enum ParseState { - StartRecord, - EndRecord, - StartField, - RecordTermCR, - RecordTermLF, - RecordTermAny, - InField, - InQuotedField, - InQuotedFieldEscape, - InQuotedFieldQuote, -} - -type NextState = (bool, ParseState); - -impl ParseMachine { #[inline] - fn parse_byte(&self, state: ParseState, b: u8) -> NextState { - match state { - StartRecord => self.parse_start_record(b), - EndRecord => unreachable!(), - StartField => self.parse_start_field(b), - RecordTermCR => self.parse_record_term_cr(b), - RecordTermLF => self.parse_record_term_lf(b), - RecordTermAny => self.parse_record_term_any(b), - InField => self.parse_in_field(b), - InQuotedField => self.parse_in_quoted_field(b), - InQuotedFieldEscape => self.parse_in_quoted_field_escape(b), - InQuotedFieldQuote => self.parse_in_quoted_field_quote(b), - } - } - - #[inline] - fn parse_start_record(&self, b: u8) -> NextState { - if self.is_record_term(b) { - // Skip empty new lines. - (false, StartRecord) - } else { - self.parse_start_field(b) - } - } - - #[inline] - fn parse_start_field(&self, b: u8) -> NextState { - if self.is_record_term(b) { - (false, self.record_term_next_state(b)) - } else if Some(b) == self.quote { - (false, InQuotedField) - } else if b == self.delimiter { - // empty field, so return in StartField state, - // which causes a new empty field to be returned - (false, StartField) - } else { - (true, InField) + fn next_data(&mut self) -> NextField<[u8]> { + if !self.first_row_done { + self.first_row.push(self.fieldbuf.to_vec()); } + self.ifield += 1; + NextField::Data(&self.fieldbuf) } #[inline] - fn parse_record_term_cr(&self, b: u8) -> NextState { - if b == b'\n' { - (false, RecordTermLF) - } else if b == b'\r' { - (false, RecordTermCR) - } else { - (false, EndRecord) + fn next_eor(&mut self) -> NextField<[u8]> { + if !self.flexible + && self.first_row_done + && self.ifield != self.first_row.len() as u64 { + return self.parse_error(ParseError::UnequalLengths { + expected: self.first_row.len() as u64, + got: self.ifield as u64, + }); } + self.irecord += 1; + self.ifield = 0; + self.first_row_done = true; + NextField::EndOfRecord } #[inline] - fn parse_record_term_lf(&self, b: u8) -> NextState { - if b == b'\r' { - (false, RecordTermCR) - } else if b == b'\n' { - (false, RecordTermLF) - } else { - (false, EndRecord) - } + fn next_eoc(&self) -> NextField<[u8]> { + NextField::EndOfCsv } #[inline] - fn parse_record_term_any(&self, b: u8) -> NextState { - match self.record_terminator { - RecordTerminator::CRLF => unreachable!(), - RecordTerminator::Any(bb) => { - if b == bb { - (false, RecordTermAny) - } else { - (false, EndRecord) - } - } + fn fill_buf(&mut self) -> io::Result<()> { + if self.bufi == self.buf.len() { + unsafe { let cap = self.buf.capacity(); self.buf.set_len(cap); } + let n = try!(self.rdr.read(&mut self.buf)); + unsafe { self.buf.set_len(n); } + self.bufi = 0; } + Ok(()) } #[inline] - fn parse_in_field(&self, b: u8) -> NextState { - if self.is_record_term(b) { - (false, self.record_term_next_state(b)) - } else if b == self.delimiter { - (false, StartField) - } else { - (true, InField) - } + fn bump(&mut self) { + self.bufi += 1; + self.byte_offset += 1; } #[inline] - fn parse_in_quoted_field(&self, b: u8) -> NextState { - if Some(b) == self.quote { - (false, InQuotedFieldQuote) - } else if !self.double_quote && b == self.escape { - (false, InQuotedFieldEscape) - } else { - (true, InQuotedField) - } + fn add(&mut self, c: u8) { + self.fieldbuf.push(c); } #[inline] - fn parse_in_quoted_field_escape(&self, _: u8) -> NextState { - (true, InQuotedField) + fn is_record_term(&self, c: u8) -> bool { + self.record_term == c } - #[inline] - fn parse_in_quoted_field_quote(self, b: u8) -> NextState { - if self.double_quote && Some(b) == self.quote { - (true, InQuotedField) - } else if b == self.delimiter { - (false, StartField) - } else if self.is_record_term(b) { - (false, self.record_term_next_state(b)) - } else { - // Should we provide a strict variant that disallows - // random chars after a quote? - (true, InField) - } + fn parse_error(&self, err: ParseError) -> NextField<[u8]> { + NextField::Error(Error::Parse(LocatableError { + record: self.irecord, + field: self.ifield, + err: err, + })) } +} - #[inline] - fn is_record_term(self, b: u8) -> bool { - self.record_terminator == b - } +#[derive(Debug)] +enum State { + StartRecord, + EndRecord, + StartField, + InField, + InQuotedField, + InEscapedQuote, + InDoubleEscapedQuote, +} - #[inline] - fn record_term_next_state(self, b: u8) -> ParseState { - match self.record_terminator { - RecordTerminator::CRLF => { - if b == b'\r' { - RecordTermCR - } else if b == b'\n' { - RecordTermLF - } else { - unreachable!() - } - } - RecordTerminator::Any(_) => RecordTermAny, +impl Reader { + /// Seeks the underlying reader to the file cursor specified. + /// + /// This comes with several caveats: + /// + /// * The existing buffer is dropped and a new one is created. + /// * If you seek to a position other than the start of a record, you'll + /// probably get an incorrect parse. (This is *not* unsafe.) + /// + /// Mostly, this is intended for use with the `index` sub module. + /// + /// Note that if `pos` is equivalent to the current *parsed* byte offset, + /// then no seeking is performed. (In this case, `seek` is a no-op.) + pub fn seek(&mut self, pos: u64) -> Result<()> { + self.has_seeked = true; + self.state = StartRecord; + if pos == self.byte_offset() { + return Ok(()) } + self.bufi = self.buf.len(); // will force a buffer refresh + self.eof = false; + self.byte_offset = pos; + try!(self.rdr.seek(io::SeekFrom::Start(pos))); + Ok(()) } } @@ -995,7 +882,7 @@ impl<'a, R> Iterator for UnsafeByteFields<'a, R> where R: io::Read { fn next(&mut self) -> Option> { unsafe { - ::std::mem::transmute(self.rdr.next_field().into_iter_result()) + ::std::mem::transmute(self.rdr.next_bytes().into_iter_result()) } } } @@ -1057,6 +944,7 @@ impl<'a, R> Iterator for StringRecords<'a, R> where R: io::Read { pub struct ByteRecords<'a, R: 'a> { p: &'a mut Reader, first: bool, + errored: bool, } impl<'a, R> Iterator for ByteRecords<'a, R> where R: io::Read { @@ -1097,16 +985,23 @@ impl<'a, R> Iterator for ByteRecords<'a, R> where R: io::Read { } } // OK, we're done checking the weird first-record-corner-case. - if self.p.done() { + if self.p.done() || self.errored { return None; } - let mut record = Vec::with_capacity(self.p.first_record.len()); + let mut record = Vec::with_capacity(self.p.first_row.len()); loop { - match self.p.next_field() { - NextField::EndOfRecord | NextField::EndOfCsv => break, - NextField::Error(err) => return Some(Err(err)), - NextField::Data(field) => - record.push(ByteString::from_bytes(field)), + match self.p.next_bytes() { + NextField::EndOfRecord | NextField::EndOfCsv => { + if record.len() == 0 { + return None + } + break + } + NextField::Error(err) => { + self.errored = true; + return Some(Err(err)); + } + NextField::Data(field) => record.push(field.to_vec()), } } Some(Ok(record)) diff --git a/src/tests.rs b/src/tests.rs index 139c171..572b382 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -204,20 +204,20 @@ parses_to!(quote_inner_space, "\" a \"", vec![vec![" a "]]); parses_to!(quote_outer_space, " \"a\" ", vec![vec![" \"a\" "]]); parses_to!(quote_change, "zaz", vec![vec!["a"]], - |rdr: Reader<_>| rdr.quote(Some(b'z'))); + |rdr: Reader<_>| rdr.quote(b'z')); // This one is pretty hokey. I don't really know what the "right" behavior is. parses_to!(quote_delimiter, ",a,,b", vec![vec!["a,b"]], - |rdr: Reader<_>| rdr.quote(Some(b','))); + |rdr: Reader<_>| rdr.quote(b',')); // Another hokey one... parses_to!(quote_no_escapes, r#""a\"b""#, vec![vec![r#"a\b""#]]); parses_to!(quote_escapes_no_double, r#""a""b""#, vec![vec![r#"a"b""#]], |rdr: Reader<_>| rdr.double_quote(false)); parses_to!(quote_escapes, r#""a\"b""#, vec![vec![r#"a"b"#]], - |rdr: Reader<_>| rdr.double_quote(false)); + |rdr: Reader<_>| rdr.escape(Some(b'\\'))); parses_to!(quote_escapes_change, r#""az"b""#, vec![vec![r#"a"b"#]], - |rdr: Reader<_>| rdr.double_quote(false).escape(b'z')); + |rdr: Reader<_>| rdr.escape(Some(b'z'))); parses_to!(delimiter_tabs, "a\tb", vec![vec!["a", "b"]], |rdr: Reader<_>| rdr.delimiter(b'\t')); @@ -354,7 +354,7 @@ fn headers_trailing_lf() { let mut d = Reader::from_string("a,b,c\n\n\n\n"); assert_eq!(d.headers().unwrap(), vec!("a".to_string(), "b".to_string(), "c".to_string())); - assert!(d.next_field().is_end()); + assert!(d.next_bytes().is_end()); } #[test] @@ -364,9 +364,7 @@ fn headers_eof() { assert!(d.done()); } -fn bytes<'a, S>(bs: S) -> ByteString where S: Into> { - ByteString::from_bytes(bs) -} +fn bytes<'a, S>(bs: S) -> ByteString where S: Into> { bs.into() } #[test] fn byte_strings() { @@ -416,7 +414,7 @@ fn seeking() { fn raw_access() { let mut rdr = Reader::from_string("1,2"); let mut fields = vec![]; - while let Some(field) = rdr.next_field().into_iter_result() { + while let Some(field) = rdr.next_bytes().into_iter_result() { fields.push(field.unwrap().to_vec()); } assert_eq!(fields[0], b"1".to_vec()); diff --git a/src/writer.rs b/src/writer.rs index 9bd02c4..e3e8e25 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -444,6 +444,6 @@ impl Writer { } } buf.push(self.quote); - ByteString::from_bytes(buf) + buf } }