Skip to content

Commit

Permalink
Multithreading commands using threadpool call util::njobs
Browse files Browse the repository at this point in the history
  • Loading branch information
jqnatividad committed Apr 4, 2022
1 parent 26d3b56 commit 0a304bc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 45 deletions.
20 changes: 5 additions & 15 deletions src/cmd/frequency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ frequency options:
This works better when the given CSV data has
an index already created. Note that a file handle
is opened for each job.
When set to '0', the number of jobs is set to the
When not set, the number of jobs is set to the
number of CPUs detected.
[default: 0]
Common options:
-h, --help Display this message
Expand All @@ -66,7 +65,7 @@ pub struct Args {
pub flag_limit: usize,
pub flag_asc: bool,
pub flag_no_nulls: bool,
pub flag_jobs: usize,
pub flag_jobs: Option<usize>,
pub flag_output: Option<String>,
pub flag_no_headers: bool,
pub flag_delimiter: Option<Delimiter>,
Expand All @@ -78,7 +77,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let mut wtr = Config::new(&args.flag_output).writer()?;
let (headers, tables) = match args.rconfig().indexed()? {
Some(ref mut idx) if args.njobs() > 1 => args.parallel_ftables(idx),
Some(ref mut idx) if util::njobs(args.flag_jobs) > 1 => args.parallel_ftables(idx),
_ => args.sequential_ftables(),
}?;

Expand Down Expand Up @@ -150,10 +149,10 @@ impl Args {
return Ok((headers, vec![]));
}

let chunk_size = util::chunk_size(idx.count() as usize, self.njobs());
let chunk_size = util::chunk_size(idx.count() as usize, util::njobs(self.flag_jobs));
let nchunks = util::num_of_chunks(idx.count() as usize, chunk_size);

let pool = ThreadPool::new(self.njobs());
let pool = ThreadPool::new(util::njobs(self.flag_jobs));
let (send, recv) = channel::bounded(0);
for i in 0..nchunks {
let (send, args, sel) = (send.clone(), self.clone(), sel.clone());
Expand Down Expand Up @@ -197,15 +196,6 @@ impl Args {
let sel = self.rconfig().selection(headers)?;
Ok((sel.select(headers).map(|h| h.to_vec()).collect(), sel))
}

fn njobs(&self) -> usize {
let num_cpus = util::num_cpus();
if self.flag_jobs == 0 || self.flag_jobs > num_cpus {
num_cpus
} else {
self.flag_jobs
}
}
}

fn trim(bs: ByteString) -> ByteString {
Expand Down
16 changes: 3 additions & 13 deletions src/cmd/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ split options:
This only works when the given CSV data has
an index already created. Note that a file handle
is opened for each job.
When set to '0', the number of jobs is set to the
When not set, the number of jobs is set to the
number of CPUs detected.
[default: 0]
--filename <filename> A filename template to use when constructing
the names of the output files. The string '{}'
will be replaced by a value based on the value
Expand All @@ -53,7 +52,7 @@ struct Args {
arg_input: Option<String>,
arg_outdir: String,
flag_size: usize,
flag_jobs: usize,
flag_jobs: Option<usize>,
flag_filename: FilenameTemplate,
flag_pad: usize,
flag_no_headers: bool,
Expand Down Expand Up @@ -96,7 +95,7 @@ impl Args {

fn parallel_split(&self, idx: Indexed<fs::File, fs::File>) -> CliResult<()> {
let nchunks = util::num_of_chunks(idx.count() as usize, self.flag_size);
let pool = ThreadPool::new(self.njobs());
let pool = ThreadPool::new(util::njobs(self.flag_jobs));
for i in 0..nchunks {
let args = self.clone();
pool.execute(move || {
Expand Down Expand Up @@ -144,13 +143,4 @@ impl Args {
.delimiter(self.flag_delimiter)
.no_headers(self.flag_no_headers)
}

fn njobs(&self) -> usize {
let num_cpus = util::num_cpus();
if self.flag_jobs == 0 || self.flag_jobs > num_cpus {
num_cpus
} else {
self.flag_jobs
}
}
}
28 changes: 11 additions & 17 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ stats options:
-j, --jobs <arg> The number of jobs to run in parallel.
This works only when the given CSV has an index.
Note that a file handle is opened for each job.
When set to '0', the number of jobs is set to the
When not set, the number of jobs is set to the
number of CPUs detected.
[default: 0]
Common options:
-h, --help Display this message
Expand All @@ -93,7 +92,7 @@ pub struct Args {
pub flag_nulls: bool,
pub flag_dates: bool,
pub flag_nullcount: bool,
pub flag_jobs: usize,
pub flag_jobs: Option<usize>,
pub flag_output: Option<String>,
pub flag_no_headers: bool,
pub flag_delimiter: Option<Delimiter>,
Expand All @@ -106,8 +105,12 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
let (headers, stats) = match args.rconfig().indexed()? {
None => args.sequential_stats(),
Some(idx) => {
if args.flag_jobs == 1 {
args.sequential_stats()
if let Some(num_jobs) = args.flag_jobs {
if num_jobs == 1 {
args.sequential_stats()
} else {
args.parallel_stats(idx)
}
} else {
args.parallel_stats(idx)
}
Expand Down Expand Up @@ -151,10 +154,10 @@ impl Args {
let mut rdr = self.rconfig().reader()?;
let (headers, sel) = self.sel_headers(&mut rdr)?;

let chunk_size = util::chunk_size(idx.count() as usize, self.njobs());
let chunk_size = util::chunk_size(idx.count() as usize, util::njobs(self.flag_jobs));
let nchunks = util::num_of_chunks(idx.count() as usize, chunk_size);

let pool = ThreadPool::new(self.njobs());
let pool = ThreadPool::new(util::njobs(self.flag_jobs));
let (send, recv) = channel::bounded(0);
for i in 0..nchunks {
let (send, args, sel) = (send.clone(), self.clone(), sel.clone());
Expand All @@ -171,7 +174,7 @@ impl Args {

pub fn stats_to_records(&self, stats: Vec<Stats>) -> Vec<csv::StringRecord> {
let mut records: Vec<_> = repeat(csv::StringRecord::new()).take(stats.len()).collect();
let pool = ThreadPool::new(self.njobs());
let pool = ThreadPool::new(util::njobs(self.flag_jobs));
let mut results = vec![];
for mut stat in stats {
let (send, recv) = channel::bounded(0);
Expand Down Expand Up @@ -221,15 +224,6 @@ impl Args {
.select(self.flag_select.clone())
}

fn njobs(&self) -> usize {
let num_cpus = util::num_cpus();
if self.flag_jobs == 0 || self.flag_jobs > num_cpus {
num_cpus
} else {
self.flag_jobs
}
}

#[inline]
fn new_stats(&self, record_len: usize) -> Vec<Stats> {
repeat(Stats::new(WhichStats {
Expand Down

0 comments on commit 0a304bc

Please sign in to comment.