Skip to content

Commit

Permalink
Merge pull request #1809 from jqnatividad/stats-cache-threshold-negative
Browse files Browse the repository at this point in the history
`stats`: add `--cache-threshold` autoindex creation/deletion  logic

@mhkeller @chadbaldwin you may want to check this out for your use cases as it allows you to automatically create and delete index files to support faster `stats` calculations.
  • Loading branch information
jqnatividad authored May 10, 2024
2 parents ea0cb2f + 6d51bd5 commit fe9c052
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 29 deletions.
47 changes: 39 additions & 8 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,16 @@ stats options:
by using this option BEFORE running the `schema` and `tojsonl`
commands and they will automatically load the binary encoded
stats file if it exists.
--cache-threshold <arg> The threshold in milliseconds to cache the stats results.
If a stats run takes longer than this threshold, the stats
results will be cached. Set to 0 to suppress caching. Set
to 1 to force caching.
-c, --cache-threshold <arg> When greater than 1, the threshold in milliseconds before caching
stats results. If a stats run takes longer than this threshold,
the stats results will be cached.
Set to 0 to suppress caching.
Set to 1 to force caching.
Set to a negative number to automatically create an index
when the input file size is greater than abs(arg) in bytes.
If the negative number ends with 5, it will delete the index
file and the stats cache file after the stats run. Otherwise,
the index file and the cache files are kept.
[default: 5000]
Common options:
Expand Down Expand Up @@ -174,7 +180,7 @@ It's type inferences are also used by the `tojsonl` command to generate properly
JSONL files.
To safeguard against undefined behavior, `stats` is the most extensively tested command,
with >480 tests.
with ~500 tests.
*/

use std::{
Expand Down Expand Up @@ -224,7 +230,7 @@ pub struct Args {
pub flag_force: bool,
pub flag_jobs: Option<usize>,
pub flag_stats_binout: bool,
pub flag_cache_threshold: u64,
pub flag_cache_threshold: isize,
pub flag_output: Option<String>,
pub flag_no_headers: bool,
pub flag_delimiter: Option<Delimiter>,
Expand Down Expand Up @@ -375,6 +381,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let mut compute_stats = true;
let mut create_cache = args.flag_cache_threshold > 0 || args.flag_stats_binout;
let mut autoindex_set = false;

let write_stats_binout = args.flag_stats_binout;

Expand Down Expand Up @@ -478,6 +485,13 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
util::mem_file_check(&path, false, args.flag_memcheck)?;
}

// check if flag_cache_threshold is a negative number,
// if so, set the autoindex_size to absolute of the number
if args.flag_cache_threshold.is_negative() {
fconfig.autoindex_size = args.flag_cache_threshold.unsigned_abs() as u64;
autoindex_set = true;
}

// we need to count the number of records in the file to calculate sparsity
let record_count = RECORD_COUNT.get_or_init(|| util::count_rows(&fconfig).unwrap());
log::info!("scanning {record_count} records...");
Expand Down Expand Up @@ -520,7 +534,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// update the stats args json metadata
current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64;

if create_cache && current_stats_args.compute_duration_ms > args.flag_cache_threshold {
if create_cache
&& current_stats_args.compute_duration_ms > args.flag_cache_threshold as u64
{
// if the stats run took longer than the cache threshold and the threshold > 0,
// cache the stats so we don't have to recompute it next time
current_stats_args.canonical_input_path =
Expand All @@ -532,7 +548,8 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}

// ensure create_cache is also true if the user specified --cache-threshold 1
create_cache = create_cache || args.flag_cache_threshold == 1;
create_cache =
create_cache || args.flag_cache_threshold == 1 || args.flag_cache_threshold.is_negative();

wtr.flush()?;

Expand Down Expand Up @@ -572,6 +589,20 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
fs::copy(currstats_filename.clone(), stats_pathbuf.clone())?;
}

if args.flag_cache_threshold.is_negative() && args.flag_cache_threshold % 10 == -5 {
// if the cache threshold is a negative number ending in 5,
// delete both the index file and the stats cache file
if autoindex_set {
let index_file = path.with_extension("csv.idx");
log::debug!("deleting index file: {}", index_file.display());
if std::fs::remove_file(index_file.clone()).is_err() {
// fails silently if it can't remove the index file
log::warn!("Could not remove index file: {}", index_file.display());
}
}
create_cache = false;
}

if !create_cache {
// remove the stats cache file
if fs::remove_file(stats_pathbuf.clone()).is_err() {
Expand Down
40 changes: 20 additions & 20 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ impl<'de> Deserialize<'de> for Delimiter {

#[derive(Clone, Debug)]
pub struct Config {
pub path: Option<PathBuf>, // None implies <stdin>
idx_path: Option<PathBuf>,
select_columns: Option<SelectColumns>,
delimiter: u8,
pub no_headers: bool,
pub flexible: bool,
terminator: csv::Terminator,
pub quote: u8,
quote_style: csv::QuoteStyle,
double_quote: bool,
escape: Option<u8>,
quoting: bool,
pub preamble_rows: u64,
trim: csv::Trim,
autoindex_size: u64,
prefer_dmy: bool,
pub comment: Option<u8>,
snappy: bool, // flag to enable snappy compression/decompression
pub read_buffer: u32,
pub write_buffer: u32,
pub path: Option<PathBuf>, // None implies <stdin>
idx_path: Option<PathBuf>,
select_columns: Option<SelectColumns>,
delimiter: u8,
pub no_headers: bool,
pub flexible: bool,
terminator: csv::Terminator,
pub quote: u8,
quote_style: csv::QuoteStyle,
double_quote: bool,
escape: Option<u8>,
quoting: bool,
pub preamble_rows: u64,
trim: csv::Trim,
pub autoindex_size: u64,
prefer_dmy: bool,
pub comment: Option<u8>,
snappy: bool, // flag to enable snappy compression/decompression
pub read_buffer: u32,
pub write_buffer: u32,
}

// Empty trait as an alias for Seek and Read that avoids auto trait errors
Expand Down
115 changes: 114 additions & 1 deletion tests/test_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ fn stats_cache() {
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to 1 byte to force cache creation
// set cache threshold to 1 to force cache creation
.args(["--cache-threshold", "1"])
.arg(test_file);

Expand All @@ -918,6 +918,119 @@ fn stats_cache() {
assert!(Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_cache_negative_threshold() {
use std::path::Path;

let wrk = Workdir::new("stats_cache_negative_threshold");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("stats");
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to -10240 to set autoindex_size to 10 kb
// and to force cache creation
.args(["-c", "-10240"])
.arg(test_file.clone());

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);

// the index file SHOULD have been created as the input file size > 10 kb
assert!(Path::new(&format!("{test_file}.idx")).exists());

wrk.create("in2.csv", got);

// removed variance & stddev columns as its causing flaky CI test for float values
let mut cmd = wrk.command("select");
cmd.arg("1-9,12-").arg("in2.csv");

let got2: String = wrk.stdout(&mut cmd);
let expected2 = wrk.load_test_resource("boston311-100-stats.csv");

assert_eq!(dos2unix(&got2), dos2unix(&expected2).trim_end());

// check that the stats cache files were created
assert!(Path::new(&wrk.path("boston311-100.stats.csv")).exists());
assert!(Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_cache_negative_threshold_unmet() {
use std::path::Path;

let wrk = Workdir::new("stats_cache_negative_threshold_unmet");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("stats");
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to -51200 to set autoindex_size to 50 kb
// and to force cache creation
.args(["--cache-threshold", "-51200"])
.arg(test_file.clone());

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);

// the index file SHOULD NOT have been created as the input file < 50 kb
assert!(!Path::new(&format!("{test_file}.idx")).exists());

wrk.create("in2.csv", got);

// removed variance & stddev columns as its causing flaky CI test for float values
let mut cmd = wrk.command("select");
cmd.arg("1-9,12-").arg("in2.csv");

let got2: String = wrk.stdout(&mut cmd);
let expected2 = wrk.load_test_resource("boston311-100-stats.csv");

assert_eq!(dos2unix(&got2), dos2unix(&expected2).trim_end());

// check that the stats cache files were created
assert!(Path::new(&wrk.path("boston311-100.stats.csv")).exists());
assert!(Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_cache_negative_threshold_five() {
use std::path::Path;

let wrk = Workdir::new("stats_cache_negative_threshold_five");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("stats");
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to -10245 to set autoindex_size to 10 kb
// this creates an index file, and then autodeletes it AND the stats cache files
.args(["-c", "-10245"])
.arg(test_file.clone());

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);

// the index file WAS CREATED as the input file is > 10k
// but the index file WAS DELETED after stats exits as the threshold was negative
// and ends with a 5
assert!(!Path::new(&format!("{test_file}.idx")).exists());

wrk.create("in2.csv", got);

// removed variance & stddev columns as its causing flaky CI test for float values
let mut cmd = wrk.command("select");
cmd.arg("1-9,12-").arg("in2.csv");

let got2: String = wrk.stdout(&mut cmd);
let expected2 = wrk.load_test_resource("boston311-100-stats.csv");

assert_eq!(dos2unix(&got2), dos2unix(&expected2).trim_end());

// check that the stats cache files were created
assert!(!Path::new(&wrk.path("boston311-100.stats.csv")).exists());
assert!(!Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_infer_boolean_1_0() {
let wrk = Workdir::new("stats_infer_boolean_1_0");
Expand Down

0 comments on commit fe9c052

Please sign in to comment.