#![allow(clippy::cast_precision_loss)]
static USAGE: &str = r#"
Does streaming compression/decompression of the input using the Snappy framing format.
https://github.com/google/snappy/blob/main/framing_format.txt
It has four subcommands:
compress: Compress the input (multithreaded).
decompress: Decompress the input (single-threaded).
check: Quickly check if the input is a Snappy file by inspecting the
first 50 bytes of the input is valid Snappy data.
Returns exitcode 0 if the first 50 bytes is valid Snappy data,
exitcode 1 otherwise.
validate: Validate if the ENTIRE input is a valid Snappy file.
Returns exitcode 0 if valid, exitcode 1 otherwise.
Note that most qsv commands already automatically decompresses Snappy files if the
input file has an ".sz" extension. It will also automatically compress the output
file (though only single-threaded) if the --output file has an ".sz" extension.
This command's multithreaded compression is 5-6x faster than qsv's automatic
single-threaded compression.
Also, this command is not specific to CSV data, it can compress/decompress ANY file.
For examples, see https://github.com/jqnatividad/qsv/blob/master/tests/test_snappy.rs.
Usage:
qsv snappy compress [options] [ ]
qsv snappy decompress [options] [ ]
qsv snappy check [options] [ ]
qsv snappy validate [options] [ ]
qsv snappy --help
snappy arguments:
The input file to compress/decompress. This can be a local file, stdin,
or a URL (http and https schemes supported).
snappy options:
--user-agent Specify custom user agent to use when the input is a URL.
It supports the following variables -
$QSV_VERSION, $QSV_TARGET, $QSV_BIN_NAME, $QSV_KIND and $QSV_COMMAND.
Try to follow the syntax here -
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/User-Agent
--timeout Timeout for downloading URLs in seconds.
[default: 60]
Common options:
-h, --help Display this message
-o, --output Write output to instead of stdout.
-j, --jobs The number of jobs to run in parallel when compressing.
When not set, its set to the number of CPUs - 1
-Q, --quiet Suppress status messages to stderr.
-p, --progressbar Show download progress bars. Only valid for URL input.
"#;
use std::{
fs,
io::{self, stdin, BufRead, Read, Write},
};
use gzp::{par::compress::ParCompressBuilder, snap::Snap, ZWriter};
use serde::Deserialize;
use tempfile::NamedTempFile;
use url::Url;
use crate::{config, util, CliError, CliResult};
#[derive(Deserialize)]
struct Args {
arg_input: Option,
flag_output: Option,
cmd_compress: bool,
cmd_decompress: bool,
cmd_check: bool,
cmd_validate: bool,
flag_user_agent: Option,
flag_timeout: u16,
flag_jobs: Option,
flag_quiet: bool,
flag_progressbar: bool,
}
impl From for CliError {
fn from(err: snap::Error) -> CliError {
CliError::Other(format!("Snap error: {err:?}"))
}
}
impl From for CliError {
fn from(err: gzp::GzpError) -> CliError {
CliError::Other(format!("Gzp error: {err:?}"))
}
}
pub fn run(argv: &[&str]) -> CliResult<()> {
let args: Args = util::get_args(USAGE, argv)?;
let input_bytes;
// create a temporary file to write the download file to
// this is automatically deleted when temp_download goes out of scope
let temp_download = NamedTempFile::new()?;
let input_reader: Box = if let Some(uri) = &args.arg_input {
let path = if Url::parse(uri).is_ok() && uri.starts_with("http") {
// its a remote file, download it first
let future = util::download_file(
uri,
temp_download.path().to_path_buf(),
args.flag_progressbar && !args.cmd_check && !args.flag_quiet,
args.flag_user_agent,
Some(args.flag_timeout),
if args.cmd_check {
Some(50) // only download 50 bytes when checking for a snappy header
} else {
None
},
);
tokio::runtime::Runtime::new()?.block_on(future)?;
// safety: temp_download is a NamedTempFile, so we know that it can be converted
let temp_download_path = temp_download.path().to_str().unwrap().to_string();
temp_download_path
} else {
// its a local file
uri.to_string()
};
let file = fs::File::open(path)?;
input_bytes = file.metadata()?.len();
Box::new(io::BufReader::with_capacity(
config::DEFAULT_RDR_BUFFER_CAPACITY,
file,
))
} else {
input_bytes = 0;
Box::new(io::BufReader::new(stdin().lock()))
};
let output_writer: Box = match &args.flag_output {
Some(output_path) => Box::new(io::BufWriter::with_capacity(
config::DEFAULT_WTR_BUFFER_CAPACITY,
fs::File::create(output_path)?,
)),
None => Box::new(io::BufWriter::with_capacity(
config::DEFAULT_WTR_BUFFER_CAPACITY,
io::stdout(),
)),
};
if args.cmd_compress {
let mut jobs = util::njobs(args.flag_jobs);
if jobs > 1 {
jobs -= 1; // save one thread for other tasks
}
compress(input_reader, output_writer, jobs, gzp::BUFSIZE * 2)?;
let compressed_bytes = if let Some(path) = &args.flag_output {
fs::metadata(path)?.len()
} else {
0
};
if !args.flag_quiet && compressed_bytes > 0 {
let compression_ratio = input_bytes as f64 / compressed_bytes as f64;
winfo!(
"Compression successful. Compressed bytes: {}, Decompressed bytes: {}, \
Compression ratio: {:.3}:1, Space savings: {} - {:.2}%",
indicatif::HumanBytes(compressed_bytes),
indicatif::HumanBytes(input_bytes),
compression_ratio,
indicatif::HumanBytes(
input_bytes
.checked_sub(compressed_bytes)
.unwrap_or_default()
),
(1.0 - (compressed_bytes as f64 / input_bytes as f64)) * 100.0
);
}
} else if args.cmd_decompress {
let decompressed_bytes = decompress(input_reader, output_writer)?;
if !args.flag_quiet {
let compression_ratio = decompressed_bytes as f64 / input_bytes as f64;
winfo!(
"Decompression successful. Compressed bytes: {}, Decompressed bytes: {}, \
Compression ratio: {:.3}:1",
indicatif::HumanBytes(input_bytes),
indicatif::HumanBytes(decompressed_bytes),
compression_ratio,
);
}
} else if args.cmd_validate {
if args.arg_input.is_none() {
return fail_incorrectusage_clierror!(
"stdin is not supported by the snappy validate subcommand."
);
}
let Ok(decompressed_bytes) = validate(input_reader) else {
return fail_clierror!("Not a valid snappy file.");
};
if !args.flag_quiet {
let compression_ratio = decompressed_bytes as f64 / input_bytes as f64;
winfo!(
"Valid snappy file. Compressed bytes: {}, Decompressed bytes: {}, Compression \
ratio: {:.3}:1, Space savings: {} - {:.2}%",
indicatif::HumanBytes(input_bytes),
indicatif::HumanBytes(decompressed_bytes),
compression_ratio,
indicatif::HumanBytes(
decompressed_bytes
.checked_sub(input_bytes)
.unwrap_or_default()
),
(1.0 - (input_bytes as f64 / decompressed_bytes as f64)) * 100.0
);
}
} else if args.cmd_check {
let check_ok = check(input_reader);
if args.flag_quiet {
if check_ok {
return Ok(());
}
return fail!("Not a snappy file.");
} else if check_ok {
winfo!("Snappy file.");
} else {
return fail!("Not a snappy file.");
}
}
Ok(())
}
// multithreaded streaming snappy compression
pub fn compress(
mut src: R,
dst: W,
jobs: usize,
buf_size: usize,
) -> CliResult<()> {
let mut writer = ParCompressBuilder::::new()
.num_threads(jobs)?
// the buffer size must be at least gzp::DICT_SIZE
.buffer_size(if buf_size < gzp::DICT_SIZE {
gzp::DICT_SIZE
} else {
buf_size
})?
.pin_threads(Some(0))
.from_writer(dst);
io::copy(&mut src, &mut writer)?;
writer.finish()?;
Ok(())
}
// single-threaded streaming snappy decompression
fn decompress(src: R, mut dst: W) -> CliResult {
let mut src = snap::read::FrameDecoder::new(src);
let decompressed_bytes = io::copy(&mut src, &mut dst)?;
Ok(decompressed_bytes)
}
// quickly check if a file is a snappy file
// note that the fn only reads the first 50 bytes of the file
// and does not check the entire file for validity
fn check(src: R) -> bool {
let src = snap::read::FrameDecoder::new(src);
// read the first 50 or less bytes of a file. The snap decoder will return an error
// if the file does not start with a valid snappy header
let mut buffer = Vec::with_capacity(50);
src.take(50).read_to_end(&mut buffer).is_ok()
}
// validate an entire snappy file by decompressing it to sink (i.e. /dev/null). This is useful for
// checking if a snappy file is corrupted.
// Note that this is more expensive than check() as it has to decompress the entire file.
fn validate(src: R) -> CliResult {
let mut src = snap::read::FrameDecoder::new(src);
let mut sink = io::sink();
match io::copy(&mut src, &mut sink) {
Ok(decompressed_bytes) => Ok(decompressed_bytes),
Err(err) => fail_clierror!("Error validating snappy file: {err:?}"),
}
}