Skip to content

Commit

Permalink
Merge polars sink and polars to-* to polars save (nushell#13568)
Browse files Browse the repository at this point in the history
# Description
This pull request merges `polars sink` and `polars to-*` into one
command `polars save`.


# User-Facing Changes
- `polars to-*` commands have all been replaced with `polars save`. When
saving a lazy frame to a type that supports a polars sink operation, a
sink operation will be performed. Sink operations are much more
performant, performing a collect while streaming to the file system.
  • Loading branch information
ayax79 authored Aug 8, 2024
1 parent 035308b commit 4ff3393
Show file tree
Hide file tree
Showing 17 changed files with 795 additions and 1,008 deletions.
17 changes: 2 additions & 15 deletions crates/nu_plugin_polars/src/dataframe/eager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,16 @@ mod pivot;
mod query_df;
mod rename;
mod sample;
mod save;
mod schema;
mod shape;
mod slice;
mod sql_context;
mod sql_expr;
mod summary;
mod take;
mod to_arrow;
mod to_avro;
mod to_csv;
mod to_df;
mod to_json_lines;
mod to_nu;
mod to_parquet;
mod unpivot;
mod with_column;

Expand Down Expand Up @@ -55,13 +51,8 @@ pub use slice::SliceDF;
pub use sql_context::SQLContext;
pub use summary::Summary;
pub use take::TakeDF;
pub use to_arrow::ToArrow;
pub use to_avro::ToAvro;
pub use to_csv::ToCSV;
pub use to_df::ToDataFrame;
pub use to_json_lines::ToJsonLines;
pub use to_nu::ToNu;
pub use to_parquet::ToParquet;
pub use unpivot::UnpivotDF;
pub use with_column::WithColumn;

Expand Down Expand Up @@ -89,13 +80,9 @@ pub(crate) fn eager_commands() -> Vec<Box<dyn PluginCommand<Plugin = PolarsPlugi
Box::new(SchemaCmd),
Box::new(TakeDF),
Box::new(ToNu),
Box::new(ToArrow),
Box::new(ToAvro),
Box::new(ToDataFrame),
Box::new(ToCSV),
Box::new(ToJsonLines),
Box::new(ToParquet),
Box::new(QueryDf),
Box::new(WithColumn),
Box::new(save::SaveDF),
]
}
76 changes: 38 additions & 38 deletions crates/nu_plugin_polars/src/dataframe/eager/open.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
dataframe::values::NuSchema,
values::{CustomValueSupport, NuLazyFrame},
values::{CustomValueSupport, NuLazyFrame, PolarsFileType},
EngineWrapper, PolarsPlugin,
};
use nu_path::expand_path_with;
Expand Down Expand Up @@ -46,7 +46,7 @@ impl PluginCommand for OpenDataFrame {
}

fn usage(&self) -> &str {
"Opens CSV, JSON, JSON lines, arrow, avro, or parquet file to create dataframe. A lazy dataframe will be created by default, if supported."
"Opens CSV, JSON, NDJSON/JSON lines, arrow, avro, or parquet file to create dataframe. A lazy dataframe will be created by default, if supported."
}

fn signature(&self) -> Signature {
Expand Down Expand Up @@ -130,33 +130,37 @@ fn command(
let file_path = expand_path_with(&spanned_file.item, engine.get_current_dir()?, true);
let file_span = spanned_file.span;

let type_option: Option<Spanned<String>> = call.get_flag("type")?;

let type_id = match &type_option {
Some(ref t) => Some((t.item.to_owned(), "Invalid type", t.span)),
None => file_path.extension().map(|e| {
(
e.to_string_lossy().into_owned(),
"Invalid extension",
spanned_file.span,
)
}),
};

match type_id {
Some((e, msg, blamed)) => match e.as_str() {
"csv" | "tsv" => from_csv(plugin, engine, call, &file_path, file_span),
"parquet" | "parq" => from_parquet(plugin, engine, call, &file_path, file_span),
"ipc" | "arrow" => from_ipc(plugin, engine, call, &file_path, file_span),
"json" => from_json(plugin, engine, call, &file_path, file_span),
"jsonl" => from_jsonl(plugin, engine, call, &file_path, file_span),
"avro" => from_avro(plugin, engine, call, &file_path, file_span),
_ => Err(ShellError::FileNotFoundCustom {
msg: format!(
"{msg}. Supported values: csv, tsv, parquet, ipc, arrow, json, jsonl, avro"
),
span: blamed,
}),
let type_option: Option<(String, Span)> = call
.get_flag("type")?
.map(|t: Spanned<String>| (t.item, t.span))
.or_else(|| {
file_path
.extension()
.map(|e| (e.to_string_lossy().into_owned(), spanned_file.span))
});

match type_option {
Some((ext, blamed)) => match PolarsFileType::from(ext.as_str()) {
PolarsFileType::Csv | PolarsFileType::Tsv => {
from_csv(plugin, engine, call, &file_path, file_span)
}
PolarsFileType::Parquet => from_parquet(plugin, engine, call, &file_path, file_span),
PolarsFileType::Arrow => from_arrow(plugin, engine, call, &file_path, file_span),
PolarsFileType::Json => from_json(plugin, engine, call, &file_path, file_span),
PolarsFileType::NdJson => from_ndjson(plugin, engine, call, &file_path, file_span),
PolarsFileType::Avro => from_avro(plugin, engine, call, &file_path, file_span),
_ => Err(PolarsFileType::build_unsupported_error(
&ext,
&[
PolarsFileType::Csv,
PolarsFileType::Tsv,
PolarsFileType::Parquet,
PolarsFileType::Arrow,
PolarsFileType::NdJson,
PolarsFileType::Avro,
],
blamed,
)),
},
None => Err(ShellError::FileNotFoundCustom {
msg: "File without extension".into(),
Expand Down Expand Up @@ -268,7 +272,7 @@ fn from_avro(
df.cache_and_to_value(plugin, engine, call.head)
}

fn from_ipc(
fn from_arrow(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
Expand Down Expand Up @@ -370,7 +374,7 @@ fn from_json(
df.cache_and_to_value(plugin, engine, call.head)
}

fn from_jsonl(
fn from_ndjson(
plugin: &PolarsPlugin,
engine: &nu_plugin::EngineInterface,
call: &nu_plugin::EvaluatedCall,
Expand All @@ -397,18 +401,14 @@ fn from_jsonl(
.with_schema(maybe_schema.map(|s| s.into()))
.finish()
.map_err(|e| ShellError::GenericError {
error: format!("Json lines reader error: {e}"),
error: format!("NDJSON reader error: {e}"),
msg: "".into(),
span: Some(call.head),
help: None,
inner: vec![],
})?;

perf!(
"Lazy json lines dataframe open",
start_time,
engine.use_color()
);
perf!("Lazy NDJSON dataframe open", start_time, engine.use_color());

let df = NuLazyFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head)
Expand Down Expand Up @@ -444,7 +444,7 @@ fn from_jsonl(
.into();

perf!(
"Eager json lines dataframe open",
"Eager NDJSON dataframe open",
start_time,
engine.use_color()
);
Expand Down
62 changes: 62 additions & 0 deletions crates/nu_plugin_polars/src/dataframe/eager/save/arrow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::{fs::File, path::Path};

use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span};
use polars::prelude::{IpcWriter, SerWriter};
use polars_io::ipc::IpcWriterOptions;

use crate::values::{NuDataFrame, NuLazyFrame};

use super::polars_file_save_error;

pub(crate) fn command_lazy(
_call: &EvaluatedCall,
lazy: &NuLazyFrame,
file_path: &Path,
file_span: Span,
) -> Result<(), ShellError> {
lazy.to_polars()
.sink_ipc(file_path, IpcWriterOptions::default())
.map_err(|e| polars_file_save_error(e, file_span))
}

pub(crate) fn command_eager(
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
) -> Result<(), ShellError> {
let mut file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: format!("Error with file name: {e}"),
msg: "".into(),
span: Some(file_span),
help: None,
inner: vec![],
})?;

IpcWriter::new(&mut file)
.finish(&mut df.to_polars())
.map_err(|e| ShellError::GenericError {
error: "Error saving file".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?;
Ok(())
}

#[cfg(test)]
pub mod test {

use crate::eager::save::test::{test_eager_save, test_lazy_save};

#[test]
pub fn test_arrow_eager_save() -> Result<(), Box<dyn std::error::Error>> {
test_eager_save("arrow")
}

#[test]
pub fn test_arrow_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
test_lazy_save("arrow")
}
}
74 changes: 74 additions & 0 deletions crates/nu_plugin_polars/src/dataframe/eager/save/avro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use std::fs::File;
use std::path::Path;

use nu_plugin::EvaluatedCall;
use nu_protocol::{ShellError, Span};
use polars_io::avro::{AvroCompression, AvroWriter};
use polars_io::SerWriter;

use crate::values::NuDataFrame;

fn get_compression(call: &EvaluatedCall) -> Result<Option<AvroCompression>, ShellError> {
if let Some((compression, span)) = call
.get_flag_value("avro-compression")
.map(|e| e.as_str().map(|s| (s.to_owned(), e.span())))
.transpose()?
{
match compression.as_ref() {
"snappy" => Ok(Some(AvroCompression::Snappy)),
"deflate" => Ok(Some(AvroCompression::Deflate)),
_ => Err(ShellError::IncorrectValue {
msg: "compression must be one of deflate or snappy".to_string(),
val_span: span,
call_span: span,
}),
}
} else {
Ok(None)
}
}

pub(crate) fn command_eager(
call: &EvaluatedCall,
df: &NuDataFrame,
file_path: &Path,
file_span: Span,
) -> Result<(), ShellError> {
let compression = get_compression(call)?;

let file = File::create(file_path).map_err(|e| ShellError::GenericError {
error: format!("Error with file name: {e}"),
msg: "".into(),
span: Some(file_span),
help: None,
inner: vec![],
})?;

AvroWriter::new(file)
.with_compression(compression)
.finish(&mut df.to_polars())
.map_err(|e| ShellError::GenericError {
error: "Error saving file".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?;

Ok(())
}

#[cfg(test)]
pub mod test {
use crate::eager::save::test::{test_eager_save, test_lazy_save};

#[test]
pub fn test_avro_eager_save() -> Result<(), Box<dyn std::error::Error>> {
test_eager_save("avro")
}

#[test]
pub fn test_avro_lazy_save() -> Result<(), Box<dyn std::error::Error>> {
test_lazy_save("avro")
}
}
Loading

0 comments on commit 4ff3393

Please sign in to comment.