Skip to content

Commit

Permalink
Merge pull request #925 from jqnatividad/sniff_sz_files
Browse files Browse the repository at this point in the history
`sniff`: can now sniff snappy-compressed files - on the local file system and on URLs
  • Loading branch information
jqnatividad committed Apr 14, 2023
2 parents 5462a55 + fd0c67c commit 7e0ec1f
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 67 deletions.
192 changes: 126 additions & 66 deletions src/cmd/sniff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ Common options:
-p, --progressbar Show progress bars. Only valid for URL input.
"#;

use std::{cmp::min, fmt, fs, io::Write, time::Duration};
use std::{cmp::min, fmt, fs, io::Write, path::Path, time::Duration};

use bytes::Bytes;
use futures::executor::block_on;
use futures_util::StreamExt;
use indicatif::{HumanCount, ProgressBar, ProgressDrawTarget, ProgressStyle};
use indicatif::{HumanBytes, HumanCount, ProgressBar, ProgressDrawTarget, ProgressStyle};
use qsv_sniffer::{DatePreference, SampleSize, Sniffer};
use reqwest::Client;
use serde::{Deserialize, Serialize};
Expand All @@ -75,7 +75,7 @@ use url::Url;

use crate::{
config::{Config, Delimiter},
util, CliResult,
util, CliError, CliResult,
};

#[derive(Deserialize)]
Expand Down Expand Up @@ -113,7 +113,24 @@ struct SniffStruct {
}
impl fmt::Display for SniffStruct {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "Path: {}", self.path)?;
writeln!(
f,
"Path: {}",
// when sniffing a snappy compressed file, it is first decompressed
// to a temporary file. The original file name is stored in the
// temporary file name, so we extract the original file name
if self.path.ends_with("__qsv_temp_decompressed") {
// use a regular expression to extract the original file name
// the original file name is between "qsv__" and "__qsv_temp_decompressed"
let re =
regex::Regex::new(r"qsv__(?P<filename>.*)__qsv_temp_decompressed").unwrap();
let caps = re.captures(&self.path).unwrap();
let filename = caps.name("filename").unwrap().as_str();
filename.to_string()
} else {
self.path.clone()
}
)?;
writeln!(f, "Sniff Timestamp: {}", self.sniff_timestamp)?;
writeln!(
f,
Expand Down Expand Up @@ -221,15 +238,14 @@ const fn rowcount(
(final_rowcount, estimated)
}

async fn get_file_to_sniff(args: &Args) -> CliResult<SniffFileStruct> {
async fn get_file_to_sniff(args: &Args, tmpdir: &tempfile::TempDir) -> CliResult<SniffFileStruct> {
if let Some(uri) = args.arg_input.clone() {
match uri {
// its a URL, download sample to temp file
url if Url::parse(&url).is_ok() && url.starts_with("http") => {
if url.to_lowercase().ends_with(".sz") {
return fail_clierror!("Cannot remotely sniff .sz files.");
}
let snappy_flag = url.to_lowercase().ends_with(".sz");

// setup the reqwest client
let client = match Client::builder()
.user_agent(util::DEFAULT_USER_AGENT)
.brotli(true)
Expand Down Expand Up @@ -264,7 +280,11 @@ async fn get_file_to_sniff(args: &Args) -> CliResult<SniffFileStruct> {
};

#[allow(clippy::cast_precision_loss)]
let lines_sample_size = if args.flag_sample > 1.0 {
let lines_sample_size = if snappy_flag {
// if it's a snappy compressed file, we need to download the entire file
// to sniff it
usize::MAX
} else if args.flag_sample > 1.0 {
args.flag_sample.round() as usize
} else if args.flag_sample.abs() < f64::EPSILON {
// sample size is zero, so we want to download the entire file
Expand Down Expand Up @@ -295,10 +315,17 @@ async fn get_file_to_sniff(args: &Args) -> CliResult<SniffFileStruct> {
)
.unwrap(),
);
progress.set_message(format!(
"Downloading {} samples...",
HumanCount(lines_sample_size as u64)
));
if lines_sample_size == usize::MAX {
progress.set_message(format!(
"Downloading {}...",
HumanBytes(total_size as u64)
));
} else {
progress.set_message(format!(
"Downloading {} samples...",
HumanCount(lines_sample_size as u64)
));
}
} else {
progress.set_draw_target(ProgressDrawTarget::hidden());
}
Expand All @@ -321,73 +348,92 @@ async fn get_file_to_sniff(args: &Args) -> CliResult<SniffFileStruct> {
progress.inc(chunk_len as u64);
}

// scan chunk for newlines
let num_lines = chunk.into_iter().filter(|&x| x == b'\n').count();
// and keep track of the number of lines downloaded which is ~= sample_size
downloaded_lines += num_lines;
// we downloaded enough samples, stop downloading
if downloaded_lines > lines_sample_size {
break;
// check if we're downloading the entire file
if lines_sample_size != usize::MAX {
// we're not downloading the entire file, so we need to
// scan chunk for newlines
let num_lines = chunk.into_iter().filter(|&x| x == b'\n').count();
// and keep track of the number of lines downloaded which is ~= sample_size
downloaded_lines += num_lines;
// we downloaded enough samples, stop downloading
if downloaded_lines > lines_sample_size {
downloaded_lines -= 1; // subtract 1 because we don't want to count the header row
break;
}
}
}
drop(client);

// we subtract 1 because we don't want to count the header row
downloaded_lines -= 1;

if show_progress {
progress.finish_with_message(format!(
"Downloaded {} samples.",
HumanCount(downloaded_lines as u64)
));
if snappy_flag {
progress.finish_with_message(format!(
"Downloaded {}.",
HumanBytes(downloaded as u64)
));
} else {
progress.finish_with_message(format!(
"Downloaded {} samples.",
HumanCount(downloaded_lines as u64)
));
}
}

// now we downloaded the file, rewrite it so we only have the exact sample size
// and truncate potentially incomplete lines. We streamed the download
// and the downloaded file may be more than the sample size, and the final
// line may be incomplete
let retrieved_name = file.path().to_str().unwrap().to_string();
let config = Config::new(&Some(retrieved_name))
.delimiter(args.flag_delimiter)
// we say no_headers so we can just copy the downloaded file over
// including headers, to the exact sanple size file
.no_headers(true)
.flexible(true);

let mut rdr = config.reader()?;
// create a temporary file to write the download file to
let wtr_file = NamedTempFile::new()?;

// keep the temporary file around so we can sniff it later
// we'll delete it when we're done
let (_file, path) = wtr_file
.keep()
.or(Err("Cannot keep temporary file".to_string()))?;
let wtr_file_path = path.to_str().unwrap().to_string();

let mut wtr = Config::new(&Some(wtr_file_path.clone()))
.no_headers(false)
.flexible(true)
.quote_style(csv::QuoteStyle::NonNumeric)
.writer()?;
let wtr_file_path;
let mut downloaded_records = 0_usize;

// amortize allocation
#[allow(unused_assignments)]
let mut record = csv::ByteRecord::with_capacity(100, 20);

let header_row = rdr.byte_headers()?;
wtr.write_byte_record(header_row)?;
rdr.byte_records().next();

for rec in rdr.byte_records() {
record = rec?;
if downloaded_records >= lines_sample_size {
break;
if snappy_flag {
// we downloaded a snappy compressed file, we need to decompress it
// before we can sniff it
wtr_file_path = decompress_snappy_file(file.path().to_str().unwrap(), tmpdir)?;
} else {
// we downloaded a non-snappy file, rewrite it so we only have the exact
// sample size and truncate potentially incomplete lines. We streamed the
// download and the downloaded file may be more than the sample size, and the
// final line may be incomplete.
wtr_file_path = path.to_str().unwrap().to_string();
let mut wtr = Config::new(&Some(wtr_file_path.clone()))
.no_headers(false)
.flexible(true)
.quote_style(csv::QuoteStyle::NonNumeric)
.writer()?;

let retrieved_name = file.path().to_str().unwrap().to_string();
let config = Config::new(&Some(retrieved_name))
.delimiter(args.flag_delimiter)
// we say no_headers so we can just copy the downloaded file over
// including headers, to the exact sanple size file
.no_headers(true)
.flexible(true);

let mut rdr = config.reader()?;

// amortize allocation
#[allow(unused_assignments)]
let mut record = csv::ByteRecord::with_capacity(100, 20);

let header_row = rdr.byte_headers()?;
wtr.write_byte_record(header_row)?;
rdr.byte_records().next();

for rec in rdr.byte_records() {
record = rec?;
if downloaded_records >= lines_sample_size {
break;
}
downloaded_records += 1;
wtr.write_byte_record(&record)?;
}
downloaded_records += 1;
wtr.write_byte_record(&record)?;
wtr.flush()?;
}
wtr.flush()?;

Ok(SniffFileStruct {
display_path: url,
Expand All @@ -407,10 +453,10 @@ async fn get_file_to_sniff(args: &Args) -> CliResult<SniffFileStruct> {
}
// its a file, passthrough the path along with its size
path => {
let mut path = path;

if path.to_lowercase().ends_with(".sz") {
return fail_clierror!(
"Cannot sniff .sz files. Use 'qsv snappy decompress' first."
);
path = decompress_snappy_file(&path, tmpdir)?;
}

let metadata = fs::metadata(&path)
Expand Down Expand Up @@ -462,6 +508,19 @@ async fn get_file_to_sniff(args: &Args) -> CliResult<SniffFileStruct> {
}
}

fn decompress_snappy_file(path: &str, tmpdir: &self_update::TempDir) -> Result<String, CliError> {
let mut snappy_file = std::fs::File::open(path)?;
let mut snappy_reader = snap::read::FrameDecoder::new(&mut snappy_file);
let file_stem = Path::new(path).file_stem().unwrap().to_str().unwrap();
let decompressed_filepath = tmpdir
.path()
.join(format!("qsv__{file_stem}__qsv_temp_decompressed"));
let mut decompressed_file = std::fs::File::create(decompressed_filepath.clone())?;
std::io::copy(&mut snappy_reader, &mut decompressed_file)?;
decompressed_file.flush()?;
Ok(format!("{}", decompressed_filepath.display()))
}

fn cleanup_tempfile(
tempfile_flag: bool,
tempfile: String,
Expand Down Expand Up @@ -491,8 +550,9 @@ pub async fn run(argv: &[&str]) -> CliResult<()> {
}

let sniffed_ts = chrono::Utc::now().to_rfc3339();
let tmpdir = tempfile::tempdir()?;

let future = get_file_to_sniff(&args);
let future = get_file_to_sniff(&args, &tmpdir);
let sfile_info = block_on(future)?;
let tempfile_to_delete = sfile_info.file_to_sniff.clone();

Expand Down Expand Up @@ -521,7 +581,7 @@ pub async fn run(argv: &[&str]) -> CliResult<()> {
// sfile_info.sampled_records
// usize::MAX is a sentinel value to let us
// know that we need to estimate the number of records
// since we only downloaded a sample,not the entire file
// since we only downloaded a sample, not the entire file
usize::MAX
};

Expand Down
58 changes: 57 additions & 1 deletion tests/test_sniff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Fields:

#[test]
fn sniff_url() {
let wrk = Workdir::new("sniff");
let wrk = Workdir::new("sniff_url");

let mut cmd = wrk.command("sniff");
cmd.arg("https://github.com/jqnatividad/qsv/raw/master/resources/test/boston311-100.csv");
Expand Down Expand Up @@ -100,6 +100,62 @@ Fields:
assert!(got.ends_with(expected_end.trim_end()));
}

#[test]
fn sniff_url_snappy() {
let wrk = Workdir::new("sniff_url_snappy");

let mut cmd = wrk.command("sniff");
cmd.arg("https://github.com/jqnatividad/qsv/raw/master/resources/test/boston311-100.csv.sz");

let got: String = wrk.stdout(&mut cmd);

let expected_end = r#"Delimiter: ,
Header Row: true
Preamble Rows: 0
Quote Char: none
Flexible: false
Is UTF8: true
Retrieved Size (bytes): 15,196
File Size (bytes): 15,196
Sampled Records: 100
Estimated: false
Num Records: 100
Avg Record Len (bytes): 444
Num Fields: 29
Fields:
0: Unsigned case_enquiry_id
1: DateTime open_dt
2: DateTime target_dt
3: DateTime closed_dt
4: Text ontime
5: Text case_status
6: Text closure_reason
7: Text case_title
8: Text subject
9: Text reason
10: Text type
11: Text queue
12: Text department
13: Text submittedphoto
14: Boolean closedphoto
15: Text location
16: Unsigned fire_district
17: Text pwd_district
18: Unsigned city_council_district
19: Text police_district
20: Text neighborhood
21: Unsigned neighborhood_services_district
22: Text ward
23: Unsigned precinct
24: Text location_street_name
25: Unsigned location_zipcode
26: Float latitude
27: Float longitude
28: Text source"#;

assert!(got.ends_with(expected_end.trim_end()));
}

#[test]
fn sniff_tab() {
let wrk = Workdir::new("sniff_tab");
Expand Down

0 comments on commit 7e0ec1f

Please sign in to comment.