Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: add --cache-threshold autoindex creation/deletion logic #1809

Merged
merged 4 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 39 additions & 8 deletions src/cmd/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,16 @@ stats options:
by using this option BEFORE running the `schema` and `tojsonl`
commands and they will automatically load the binary encoded
stats file if it exists.
--cache-threshold <arg> The threshold in milliseconds to cache the stats results.
If a stats run takes longer than this threshold, the stats
results will be cached. Set to 0 to suppress caching. Set
to 1 to force caching.
-c, --cache-threshold <arg> When greater than 1, the threshold in milliseconds before caching
stats results. If a stats run takes longer than this threshold,
the stats results will be cached.
Set to 0 to suppress caching.
Set to 1 to force caching.
Set to a negative number to automatically create an index
when the input file size is greater than abs(arg) in bytes.
If the negative number ends with 5, it will delete the index
file and the stats cache file after the stats run. Otherwise,
the index file and the cache files are kept.
[default: 5000]

Common options:
Expand Down Expand Up @@ -174,7 +180,7 @@ It's type inferences are also used by the `tojsonl` command to generate properly
JSONL files.

To safeguard against undefined behavior, `stats` is the most extensively tested command,
with >480 tests.
with ~500 tests.
*/

use std::{
Expand Down Expand Up @@ -224,7 +230,7 @@ pub struct Args {
pub flag_force: bool,
pub flag_jobs: Option<usize>,
pub flag_stats_binout: bool,
pub flag_cache_threshold: u64,
pub flag_cache_threshold: isize,
pub flag_output: Option<String>,
pub flag_no_headers: bool,
pub flag_delimiter: Option<Delimiter>,
Expand Down Expand Up @@ -375,6 +381,7 @@ pub fn run(argv: &[&str]) -> CliResult<()> {

let mut compute_stats = true;
let mut create_cache = args.flag_cache_threshold > 0 || args.flag_stats_binout;
let mut autoindex_set = false;

let write_stats_binout = args.flag_stats_binout;

Expand Down Expand Up @@ -478,6 +485,13 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
util::mem_file_check(&path, false, args.flag_memcheck)?;
}

// check if flag_cache_threshold is a negative number,
// if so, set the autoindex_size to absolute of the number
if args.flag_cache_threshold.is_negative() {
fconfig.autoindex_size = args.flag_cache_threshold.unsigned_abs() as u64;
autoindex_set = true;
}

// we need to count the number of records in the file to calculate sparsity
let record_count = RECORD_COUNT.get_or_init(|| util::count_rows(&fconfig).unwrap());
log::info!("scanning {record_count} records...");
Expand Down Expand Up @@ -520,7 +534,9 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
// update the stats args json metadata
current_stats_args.compute_duration_ms = start_time.elapsed().as_millis() as u64;

if create_cache && current_stats_args.compute_duration_ms > args.flag_cache_threshold {
if create_cache
&& current_stats_args.compute_duration_ms > args.flag_cache_threshold as u64
{
// if the stats run took longer than the cache threshold and the threshold > 0,
// cache the stats so we don't have to recompute it next time
current_stats_args.canonical_input_path =
Expand All @@ -532,7 +548,8 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
}

// ensure create_cache is also true if the user specified --cache-threshold 1
create_cache = create_cache || args.flag_cache_threshold == 1;
create_cache =
create_cache || args.flag_cache_threshold == 1 || args.flag_cache_threshold.is_negative();

wtr.flush()?;

Expand Down Expand Up @@ -572,6 +589,20 @@ pub fn run(argv: &[&str]) -> CliResult<()> {
fs::copy(currstats_filename.clone(), stats_pathbuf.clone())?;
}

if args.flag_cache_threshold.is_negative() && args.flag_cache_threshold % 10 == -5 {
// if the cache threshold is a negative number ending in 5,
// delete both the index file and the stats cache file
if autoindex_set {
let index_file = path.with_extension("csv.idx");
log::debug!("deleting index file: {}", index_file.display());
if std::fs::remove_file(index_file.clone()).is_err() {
// fails silently if it can't remove the index file
log::warn!("Could not remove index file: {}", index_file.display());
}
}
create_cache = false;
}

if !create_cache {
// remove the stats cache file
if fs::remove_file(stats_pathbuf.clone()).is_err() {
Expand Down
40 changes: 20 additions & 20 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,26 @@ impl<'de> Deserialize<'de> for Delimiter {

#[derive(Clone, Debug)]
pub struct Config {
pub path: Option<PathBuf>, // None implies <stdin>
idx_path: Option<PathBuf>,
select_columns: Option<SelectColumns>,
delimiter: u8,
pub no_headers: bool,
pub flexible: bool,
terminator: csv::Terminator,
pub quote: u8,
quote_style: csv::QuoteStyle,
double_quote: bool,
escape: Option<u8>,
quoting: bool,
pub preamble_rows: u64,
trim: csv::Trim,
autoindex_size: u64,
prefer_dmy: bool,
pub comment: Option<u8>,
snappy: bool, // flag to enable snappy compression/decompression
pub read_buffer: u32,
pub write_buffer: u32,
pub path: Option<PathBuf>, // None implies <stdin>
idx_path: Option<PathBuf>,
select_columns: Option<SelectColumns>,
delimiter: u8,
pub no_headers: bool,
pub flexible: bool,
terminator: csv::Terminator,
pub quote: u8,
quote_style: csv::QuoteStyle,
double_quote: bool,
escape: Option<u8>,
quoting: bool,
pub preamble_rows: u64,
trim: csv::Trim,
pub autoindex_size: u64,
prefer_dmy: bool,
pub comment: Option<u8>,
snappy: bool, // flag to enable snappy compression/decompression
pub read_buffer: u32,
pub write_buffer: u32,
}

// Empty trait as an alias for Seek and Read that avoids auto trait errors
Expand Down
115 changes: 114 additions & 1 deletion tests/test_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ fn stats_cache() {
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to 1 byte to force cache creation
// set cache threshold to 1 to force cache creation
.args(["--cache-threshold", "1"])
.arg(test_file);

Expand All @@ -918,6 +918,119 @@ fn stats_cache() {
assert!(Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_cache_negative_threshold() {
use std::path::Path;

let wrk = Workdir::new("stats_cache_negative_threshold");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("stats");
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to -10240 to set autoindex_size to 10 kb
// and to force cache creation
.args(["-c", "-10240"])
.arg(test_file.clone());

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);

// the index file SHOULD have been created as the input file size > 10 kb
assert!(Path::new(&format!("{test_file}.idx")).exists());

wrk.create("in2.csv", got);

// removed variance & stddev columns as its causing flaky CI test for float values
let mut cmd = wrk.command("select");
cmd.arg("1-9,12-").arg("in2.csv");

let got2: String = wrk.stdout(&mut cmd);
let expected2 = wrk.load_test_resource("boston311-100-stats.csv");

assert_eq!(dos2unix(&got2), dos2unix(&expected2).trim_end());

// check that the stats cache files were created
assert!(Path::new(&wrk.path("boston311-100.stats.csv")).exists());
assert!(Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_cache_negative_threshold_unmet() {
use std::path::Path;

let wrk = Workdir::new("stats_cache_negative_threshold_unmet");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("stats");
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to -51200 to set autoindex_size to 50 kb
// and to force cache creation
.args(["--cache-threshold", "-51200"])
.arg(test_file.clone());

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);

// the index file SHOULD NOT have been created as the input file < 50 kb
assert!(!Path::new(&format!("{test_file}.idx")).exists());

wrk.create("in2.csv", got);

// removed variance & stddev columns as its causing flaky CI test for float values
let mut cmd = wrk.command("select");
cmd.arg("1-9,12-").arg("in2.csv");

let got2: String = wrk.stdout(&mut cmd);
let expected2 = wrk.load_test_resource("boston311-100-stats.csv");

assert_eq!(dos2unix(&got2), dos2unix(&expected2).trim_end());

// check that the stats cache files were created
assert!(Path::new(&wrk.path("boston311-100.stats.csv")).exists());
assert!(Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_cache_negative_threshold_five() {
use std::path::Path;

let wrk = Workdir::new("stats_cache_negative_threshold_five");
let test_file = wrk.load_test_file("boston311-100.csv");

let mut cmd = wrk.command("stats");
cmd.arg("--infer-dates")
.arg("--dates-whitelist")
.arg("all")
// set cache threshold to -10245 to set autoindex_size to 10 kb
// this creates an index file, and then autodeletes it AND the stats cache files
.args(["-c", "-10245"])
.arg(test_file.clone());

let got: Vec<Vec<String>> = wrk.read_stdout(&mut cmd);

// the index file WAS CREATED as the input file is > 10k
// but the index file WAS DELETED after stats exits as the threshold was negative
// and ends with a 5
assert!(!Path::new(&format!("{test_file}.idx")).exists());

wrk.create("in2.csv", got);

// removed variance & stddev columns as its causing flaky CI test for float values
let mut cmd = wrk.command("select");
cmd.arg("1-9,12-").arg("in2.csv");

let got2: String = wrk.stdout(&mut cmd);
let expected2 = wrk.load_test_resource("boston311-100-stats.csv");

assert_eq!(dos2unix(&got2), dos2unix(&expected2).trim_end());

// check that the stats cache files were created
assert!(!Path::new(&wrk.path("boston311-100.stats.csv")).exists());
assert!(!Path::new(&wrk.path("boston311-100.stats.csv.json")).exists());
}

#[test]
fn stats_infer_boolean_1_0() {
let wrk = Workdir::new("stats_infer_boolean_1_0");
Expand Down
Loading