Skip to content

Commit

Permalink
Merge pull request #931 from jqnatividad/stats_bincode
Browse files Browse the repository at this point in the history
`schema` & `stats`: stats now has a `--stats-binout` option which `schema` takes advantage of
  • Loading branch information
jqnatividad committed Apr 17, 2023
2 parents c764fe0 + c6d5bba commit 026e21e
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 57 deletions.
15 changes: 13 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ panic = "abort"

[dependencies]
ahash = "0.8"
bincode = "1.3.3"
byteorder = "1.4"
bytes = "1"
cached = { version = "0.43", default-features = false, features = [
Expand Down Expand Up @@ -134,7 +135,7 @@ polars = { version = "0.28", features = [
pyo3 = { version = "0.18", features = ["auto-initialize"], optional = true }
qsv-dateparser = "0.7"
qsv_docopt = "1.2"
qsv-stats = "0.7"
qsv-stats = "0.8"
qsv_currency = { version = "0.6", optional = true }
qsv-sniffer = { version = "0.8", default-features = false, features = [
"runtime-dispatch-simd",
Expand Down
3 changes: 2 additions & 1 deletion src/clitypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ macro_rules! werr {
($($arg:tt)*) => ({
use std::io::Write;
use log::error;
error!("{}", $($arg)*);
let error = format!($($arg)*);
error!("{error}");
(writeln!(&mut ::std::io::stderr(), $($arg)*)).unwrap();
});
}
Expand Down
81 changes: 63 additions & 18 deletions src/cmd/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
match infer_schema_from_stats(&args, &input_filename) {
Ok(map) => map,
Err(e) => {
return fail_clierror!("Failed to infer schema via stats and frequency: {e}");
return fail_clierror!(
"Failed to infer schema via stats and frequency from {input_filename}: {e}"
);
}
};

Expand Down Expand Up @@ -427,30 +429,73 @@ fn get_stats_records(args: &Args) -> CliResult<(ByteRecord, Vec<Stats>, AHashMap
flag_no_headers: args.flag_no_headers,
flag_delimiter: args.flag_delimiter,
flag_no_memcheck: args.flag_no_memcheck,
flag_stats_binout: None,
};

let (csv_fields, csv_stats) = match stats_args.rconfig().indexed() {
Ok(o) => {
if let Some(idx) = o {
winfo!("index found... triggering parallel stats");
let idx_count = idx.count();
stats_args.parallel_stats(&stats_args.flag_dates_whitelist, idx_count)
let tempfile = tempfile::Builder::new()
.suffix("-stats.csv")
.tempfile()
.unwrap();
let tempfile_path = tempfile.path().to_str().unwrap().to_string();

let tempfile_statsbin = tempfile::Builder::new()
.suffix("-stats.bin")
.tempfile()
.unwrap();
let tempfile_statsbin_path = tempfile_statsbin.path().to_str().unwrap().to_string();

let mut stats_args_str = format!(
"stats {input} --infer-dates --dates-whitelist {dates_whitelist} --round 4 --cardinality \
--output {output} --stats-binout {binout} --force",
input = {
if let Some(arg_input) = stats_args.arg_input.clone() {
arg_input
} else {
winfo!(
"no index, triggering sequential stats. Consider indexing your data if schema \
inferencing is slow. "
);
stats_args.sequential_stats(&stats_args.flag_dates_whitelist)
"-".to_string()
}
},
dates_whitelist = stats_args.flag_dates_whitelist,
output = tempfile_path,
binout = tempfile_statsbin_path,
);
if args.flag_prefer_dmy {
stats_args_str = format!("{stats_args_str} --prefer-dmy");
}
if args.flag_no_headers {
stats_args_str = format!("{stats_args_str} --no-headers");
}
if let Some(delimiter) = args.flag_delimiter {
let delim = delimiter.as_byte() as char;
stats_args_str = format!("{stats_args_str} --delimiter {delim}");
}
if args.flag_no_memcheck {
stats_args_str = format!("{stats_args_str} --no-memcheck");
}
if let Some(mut jobs) = stats_args.flag_jobs {
if jobs > 2 {
jobs -= 1; // leave one core for the main thread
}
Err(e) => {
wwarn!("error determining if indexed, triggering sequential stats: {e}");
stats_args.sequential_stats(&stats_args.flag_dates_whitelist)
}
}?;
stats_args_str = format!("{stats_args_str} --jobs {jobs}");
}
debug!("stats_args_str: {stats_args_str}");

let stats_args_vec: Vec<&str> = stats_args_str.split_whitespace().collect();

let qsv_bin = std::env::current_exe().unwrap();
let mut stats_cmd = std::process::Command::new(qsv_bin);
stats_cmd.args(stats_args_vec);
let stats_output = stats_cmd.output().unwrap();
debug!("stats_output: {stats_output:?}");

let mut bin_file = File::open(tempfile_statsbin_path).unwrap();
let csv_stats: Vec<Stats> = bincode::deserialize_from(&mut bin_file).unwrap();

// get the headers from the input file
let mut rdr = csv::Reader::from_path(stats_args.arg_input.clone().unwrap()).unwrap();
let csv_fields = rdr.byte_headers()?.clone();
drop(rdr);

let stats_columns = stats_args.stat_headers();
debug!("stats columns: {stats_columns:?}");

let mut stats_col_index_map = AHashMap::new();

Expand Down
71 changes: 36 additions & 35 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ stats options:
Note that a file handle is opened for each job.
When not set, the number of jobs is set to the
number of CPUs detected.
--stats-binout <file> Write the stats to <file> in binary format.
Common options:
-h, --help Display this message
Expand Down Expand Up @@ -175,7 +176,7 @@ use crate::{
};

#[allow(clippy::unsafe_derive_deserialize)]
#[derive(Clone, Deserialize)]
#[derive(Clone, Deserialize, Debug)]
pub struct Args {
pub arg_input: Option<String>,
pub flag_select: SelectColumns,
Expand All @@ -197,6 +198,7 @@ pub struct Args {
pub flag_no_headers: bool,
pub flag_delimiter: Option<Delimiter>,
pub flag_no_memcheck: bool,
pub flag_stats_binout: Option<String>,
}

// this struct is used to serialize/deserialize the stats to
Expand Down Expand Up @@ -402,6 +404,16 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}
}
}?;

// write binary version of the computed stats to file if --stats_binout is set
// we do this so we can load the stats into memory for a CSV without having to recompute
// them, if it hasn't changed if the CSV is large, this can save a lot of time
if let Some(stats_binout) = args.flag_stats_binout.clone() {
let mut f = fs::File::create(stats_binout)?;
bincode::serialize_into(&mut f, &stats).unwrap();
f.flush()?;
}

let stats = args.stats_to_records(stats);

wtr.write_record(&args.stat_headers())?;
Expand Down Expand Up @@ -768,8 +780,8 @@ fn init_date_inference(
Ok(())
}

#[derive(Clone, Debug, Eq, PartialEq)]
struct WhichStats {
#[derive(Clone, Debug, Eq, PartialEq, Default, Serialize, Deserialize)]
pub struct WhichStats {
include_nulls: bool,
sum: bool,
range: bool,
Expand All @@ -789,7 +801,7 @@ impl Commute for WhichStats {
}
}

#[derive(Clone)]
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq)]
pub struct Stats {
typ: FieldType,
sum: Option<TypedSum>,
Expand Down Expand Up @@ -1264,7 +1276,7 @@ impl Commute for Stats {
}

#[allow(clippy::enum_variant_names)]
#[derive(Clone, Copy, PartialEq, Default)]
#[derive(Clone, Copy, PartialEq, Default, Serialize, Deserialize)]
pub enum FieldType {
// The default - TNull, is the most specific type.
// Type inference proceeds by assuming the most specific type and then
Expand Down Expand Up @@ -1389,7 +1401,7 @@ impl fmt::Debug for FieldType {

/// `TypedSum` keeps a rolling sum of the data seen.
/// It sums integers until it sees a float, at which point it sums floats.
#[derive(Clone, Default)]
#[derive(Clone, Default, Serialize, Deserialize, Debug, PartialEq)]
struct TypedSum {
integer: i64,
float: Option<f64>,
Expand Down Expand Up @@ -1467,7 +1479,7 @@ impl Commute for TypedSum {

/// `TypedMinMax` keeps track of minimum/maximum/range values for each possible type
/// where min/max/range makes sense.
#[derive(Clone, Default)]
#[derive(Clone, Default, Serialize, Deserialize, Debug, PartialEq)]
struct TypedMinMax {
strings: MinMax<Vec<u8>>,
str_len: MinMax<usize>,
Expand All @@ -1485,35 +1497,24 @@ impl TypedMinMax {
return;
}
self.strings.add(sample.to_vec());
// safety: we can use unwrap_unchecked below since we know the data type of the sample
unsafe {
match typ {
TString | TNull => {}
TFloat => {
let n = from_utf8(sample)
.unwrap_unchecked()
.parse::<f64>()
.unwrap_unchecked();
// safety: we can use unwrap below since we know the data type of the sample
match typ {
TString | TNull => {}
TFloat => {
let n = from_utf8(sample).unwrap().parse::<f64>().unwrap();

self.floats.add(n);
self.integers.add(n as i64);
}
TInteger => {
let n = from_utf8(sample)
.unwrap_unchecked()
.parse::<i64>()
.unwrap_unchecked();
self.integers.add(n);
#[allow(clippy::cast_precision_loss)]
self.floats.add(n as f64);
}
TDate | TDateTime => {
let n = from_utf8(sample)
.unwrap_unchecked()
.parse::<i64>()
.unwrap_unchecked();
self.dates.add(n);
}
self.floats.add(n);
self.integers.add(n as i64);
}
TInteger => {
let n = from_utf8(sample).unwrap().parse::<i64>().unwrap();
self.integers.add(n);
#[allow(clippy::cast_precision_loss)]
self.floats.add(n as f64);
}
TDate | TDateTime => {
let n = from_utf8(sample).unwrap().parse::<i64>().unwrap();
self.dates.add(n);
}
}
}
Expand Down

0 comments on commit 026e21e

Please sign in to comment.