Skip to content

Commit

Permalink
split: refactored to moved from threadpool to rayon
Browse files Browse the repository at this point in the history
refactored parallel_split:
* to use rayon instead of threadpool, which is getting a bit long in the tooth
* added extensive safety comments
* removed unnecessary clone
* go to sequential split if there's only one chunk
  • Loading branch information
jqnatividad committed Jan 21, 2024
1 parent e068f91 commit 0825fe0
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ Common options:

use std::{fs, io, path::Path};

use rayon::iter::{IntoParallelIterator, ParallelIterator};
use serde::Deserialize;
use threadpool::ThreadPool;

use crate::{
config::{Config, Delimiter},
Expand Down Expand Up @@ -95,31 +95,43 @@ impl Args {
Ok(())
}

#[allow(clippy::unnecessary_wraps)]
fn parallel_split(&self, idx: &Indexed<fs::File, fs::File>) -> CliResult<()> {
let nchunks = util::num_of_chunks(idx.count() as usize, self.flag_size);
let pool = ThreadPool::new(util::njobs(self.flag_jobs));
for i in 0..nchunks {
let args = self.clone();
pool.execute(move || {
let conf = args.rconfig();
let mut idx = conf.indexed().unwrap().unwrap();
let headers = idx.byte_headers().unwrap().clone();
let mut wtr = args
.new_writer(&headers, i * args.flag_size, args.flag_pad)
.unwrap();

idx.seek((i * args.flag_size) as u64).unwrap();
for row in idx.byte_records().take(args.flag_size) {
let row = row.unwrap();
wtr.write_byte_record(&row).unwrap();
}
// safety: safe to unwrap because we know the writer is a file
// we cannot use ? here because we're in a closure
wtr.flush().unwrap();
});
if nchunks == 1 {
// there's only one chunk, we can just do a sequential split
// which has less overhead and better error handling
return self.sequential_split();
}
pool.join();
let args = self.clone();
util::njobs(args.flag_jobs);

// safety: we cannot use ? here because we're in a closure
(0..nchunks).into_par_iter().for_each(|i| {
let conf = args.rconfig();
// safety: safe to unwrap because we know the file is indexed
let mut idx = conf.indexed().unwrap().unwrap();
// safety: the only way this can fail is if the file first row of the chunk
// is not a valid CSV record, which is impossible because we're reading
// from a file with a valid index
let headers = idx.byte_headers().unwrap();

let mut wtr = args
// safety: the only way this can fail is if we cannot create a file
.new_writer(headers, i * args.flag_size, args.flag_pad)
.unwrap();

// safety: we know that there is more than one chunk, so we can safely
// seek to the start of the chunk
idx.seek((i * args.flag_size) as u64).unwrap();
for row in idx.byte_records().take(args.flag_size) {
let row = row.unwrap();
wtr.write_byte_record(&row).unwrap();
}
// safety: safe to unwrap because we know the writer is a file
// the only way this can fail is if we cannot write to the file
wtr.flush().unwrap();
});

Ok(())
}

Expand Down

0 comments on commit 0825fe0

Please sign in to comment.