Skip to content

Commit

Permalink
Upgrade to polars 0.40 (#13069)
Browse files Browse the repository at this point in the history
Upgrading to polars 0.40
  • Loading branch information
ayax79 committed Jun 5, 2024
1 parent 96493b2 commit a6b1d1f
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 268 deletions.
155 changes: 75 additions & 80 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions crates/nu_plugin_polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ indexmap = { version = "2.2" }
mimalloc = { version = "0.1.42" }
num = {version = "0.4"}
serde = { version = "1.0", features = ["derive"] }
sqlparser = { version = "0.45"}
polars-io = { version = "0.39", features = ["avro"]}
polars-arrow = { version = "0.39"}
polars-ops = { version = "0.39"}
polars-plan = { version = "0.39", features = ["regex"]}
polars-utils = { version = "0.39"}
sqlparser = { version = "0.47"}
polars-io = { version = "0.40", features = ["avro"]}
polars-arrow = { version = "0.40"}
polars-ops = { version = "0.40"}
polars-plan = { version = "0.40", features = ["regex"]}
polars-utils = { version = "0.40"}
typetag = "0.2"
uuid = { version = "1.7", features = ["v4", "serde"] }

Expand Down Expand Up @@ -70,7 +70,7 @@ features = [
"to_dummies",
]
optional = false
version = "0.39"
version = "0.40"

[dev-dependencies]
nu-cmd-lang = { path = "../nu-cmd-lang", version = "0.94.3" }
Expand Down
81 changes: 27 additions & 54 deletions crates/nu_plugin_polars/src/dataframe/eager/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ use std::{
fs::File,
io::BufReader,
path::{Path, PathBuf},
sync::Arc,
};

use polars::prelude::{
CsvEncoding, CsvReader, IpcReader, JsonFormat, JsonReader, LazyCsvReader, LazyFileListReader,
LazyFrame, ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader,
CsvEncoding, IpcReader, JsonFormat, JsonReader, LazyCsvReader, LazyFileListReader, LazyFrame,
ParquetReader, ScanArgsIpc, ScanArgsParquet, SerReader,
};

use polars_io::{avro::AvroReader, prelude::ParallelStrategy, HiveOptions};
use polars_io::{
avro::AvroReader, csv::read::CsvReadOptions, prelude::ParallelStrategy, HiveOptions,
};

#[derive(Clone)]
pub struct OpenDataFrame;
Expand Down Expand Up @@ -175,6 +178,7 @@ fn from_parquet(
cloud_options: None,
use_statistics: false,
hive_options: HiveOptions::default(),
glob: true,
};

let df: NuLazyFrame = LazyFrame::scan_parquet(file, args)
Expand Down Expand Up @@ -445,7 +449,7 @@ fn from_csv(
}
};

let csv_reader = csv_reader.has_header(!no_header);
let csv_reader = csv_reader.with_has_header(!no_header);

let csv_reader = match maybe_schema {
Some(schema) => csv_reader.with_schema(Some(schema.into())),
Expand Down Expand Up @@ -475,70 +479,39 @@ fn from_csv(

df.cache_and_to_value(plugin, engine, call.head)
} else {
let csv_reader = CsvReader::from_path(file_path)
let df = CsvReadOptions::default()
.with_has_header(!no_header)
.with_infer_schema_length(infer_schema)
.with_skip_rows(skip_rows.unwrap_or_default())
.with_schema(maybe_schema.map(|s| s.into()))
.with_columns(columns.map(Arc::new))
.map_parse_options(|options| {
options
.with_separator(
delimiter
.as_ref()
.and_then(|d| d.item.chars().next().map(|c| c as u8))
.unwrap_or(b','),
)
.with_encoding(CsvEncoding::LossyUtf8)
})
.try_into_reader_with_file_path(Some(file_path.to_path_buf()))
.map_err(|e| ShellError::GenericError {
error: "Error creating CSV reader".into(),
msg: e.to_string(),
span: Some(file_span),
help: None,
inner: vec![],
})?
.with_encoding(CsvEncoding::LossyUtf8);

let csv_reader = match delimiter {
None => csv_reader,
Some(d) => {
if d.item.len() != 1 {
return Err(ShellError::GenericError {
error: "Incorrect delimiter".into(),
msg: "Delimiter has to be one character".into(),
span: Some(d.span),
help: None,
inner: vec![],
});
} else {
let delimiter = match d.item.chars().next() {
Some(d) => d as u8,
None => unreachable!(),
};
csv_reader.with_separator(delimiter)
}
}
};

let csv_reader = csv_reader.has_header(!no_header);

let csv_reader = match maybe_schema {
Some(schema) => csv_reader.with_schema(Some(schema.into())),
None => csv_reader,
};

let csv_reader = match infer_schema {
None => csv_reader,
Some(r) => csv_reader.infer_schema(Some(r)),
};

let csv_reader = match skip_rows {
None => csv_reader,
Some(r) => csv_reader.with_skip_rows(r),
};

let csv_reader = match columns {
None => csv_reader,
Some(columns) => csv_reader.with_columns(Some(columns)),
};

let df: NuDataFrame = csv_reader
.finish()
.map_err(|e| ShellError::GenericError {
error: "CSV reader error".into(),
msg: format!("{e:?}"),
span: Some(call.head),
help: None,
inner: vec![],
})?
.into();

})?;
let df = NuDataFrame::new(false, df);
df.cache_and_to_value(plugin, engine, call.head)
}
}
30 changes: 19 additions & 11 deletions crates/nu_plugin_polars/src/dataframe/eager/sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use polars::prelude::{col, lit, DataType, Expr, LiteralValue, PolarsResult as Re

use sqlparser::ast::{
ArrayElemTypeDef, BinaryOperator as SQLBinaryOperator, DataType as SQLDataType,
Expr as SqlExpr, Function as SQLFunction, Value as SqlValue, WindowType,
DuplicateTreatment, Expr as SqlExpr, Function as SQLFunction, FunctionArguments,
Value as SqlValue, WindowType,
};

fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
Expand Down Expand Up @@ -33,7 +34,7 @@ fn map_sql_polars_datatype(data_type: &SQLDataType) -> Result<DataType> {
SQLDataType::Interval => DataType::Duration(TimeUnit::Microseconds),
SQLDataType::Array(array_type_def) => match array_type_def {
ArrayElemTypeDef::AngleBracket(inner_type)
| ArrayElemTypeDef::SquareBracket(inner_type) => {
| ArrayElemTypeDef::SquareBracket(inner_type, _) => {
DataType::List(Box::new(map_sql_polars_datatype(inner_type)?))
}
_ => {
Expand Down Expand Up @@ -120,9 +121,7 @@ pub fn parse_sql_expr(expr: &SqlExpr) -> Result<Expr> {
}
SqlExpr::Function(sql_function) => parse_sql_function(sql_function)?,
SqlExpr::Cast {
expr,
data_type,
format: _,
expr, data_type, ..
} => cast_(parse_sql_expr(expr)?, data_type)?,
SqlExpr::Nested(expr) => parse_sql_expr(expr)?,
SqlExpr::Value(value) => literal_expr(value)?,
Expand Down Expand Up @@ -162,8 +161,17 @@ fn parse_sql_function(sql_function: &SQLFunction) -> Result<Expr> {
use sqlparser::ast::{FunctionArg, FunctionArgExpr};
// Function name mostly do not have name space, so it mostly take the first args
let function_name = sql_function.name.0[0].value.to_ascii_lowercase();
let args = sql_function
.args

// One day this should support the additional argument types supported with 0.40
let (args, distinct) = match &sql_function.args {
FunctionArguments::List(list) => (
list.args.clone(),
list.duplicate_treatment == Some(DuplicateTreatment::Distinct),
),
_ => (vec![], false),
};

let args = args
.iter()
.map(|arg| match arg {
FunctionArg::Named { arg, .. } => arg,
Expand All @@ -174,15 +182,15 @@ fn parse_sql_function(sql_function: &SQLFunction) -> Result<Expr> {
match (
function_name.as_str(),
args.as_slice(),
sql_function.distinct,
distinct,
) {
("sum", [FunctionArgExpr::Expr(expr)], false) => {
("sum", [FunctionArgExpr::Expr(ref expr)], false) => {
apply_window_spec(parse_sql_expr(expr)?, sql_function.over.as_ref())?.sum()
}
("count", [FunctionArgExpr::Expr(expr)], false) => {
("count", [FunctionArgExpr::Expr(ref expr)], false) => {
apply_window_spec(parse_sql_expr(expr)?, sql_function.over.as_ref())?.count()
}
("count", [FunctionArgExpr::Expr(expr)], true) => {
("count", [FunctionArgExpr::Expr(ref expr)], true) => {
apply_window_spec(parse_sql_expr(expr)?, sql_function.over.as_ref())?.n_unique()
}
// Special case for wildcard args to count function.
Expand Down
58 changes: 8 additions & 50 deletions crates/nu_plugin_polars/src/dataframe/eager/summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,53 +189,19 @@ fn command(
.map(|col| {
let count = col.len() as f64;

let sum = col.sum_as_series().ok().and_then(|series| {
series
.cast(&DataType::Float64)
.ok()
.and_then(|ca| match ca.get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
_ => None,
})
});

let mean = match col.mean_as_series().get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
_ => None,
};

let median = match col.median_as_series() {
Ok(v) => match v.get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
_ => None,
},
_ => None,
};

let std = match col.std_as_series(0) {
Ok(v) => match v.get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
_ => None,
},
_ => None,
};

let min = col.min_as_series().ok().and_then(|series| {
series
.cast(&DataType::Float64)
.ok()
.and_then(|ca| match ca.get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
_ => None,
})
});
let sum = col.sum::<f64>().ok();
let mean = col.mean();
let median = col.median();
let std = col.std(0);
let min = col.min::<f64>().ok().flatten();

let mut quantiles = quantiles
.clone()
.into_iter()
.map(|q| {
col.quantile_as_series(q, QuantileInterpolOptions::default())
col.quantile_reduce(q, QuantileInterpolOptions::default())
.ok()
.map(|s| s.into_series("quantile"))
.and_then(|ca| ca.cast(&DataType::Float64).ok())
.and_then(|ca| match ca.get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
Expand All @@ -244,15 +210,7 @@ fn command(
})
.collect::<Vec<Option<f64>>>();

let max = col.max_as_series().ok().and_then(|series| {
series
.cast(&DataType::Float64)
.ok()
.and_then(|ca| match ca.get(0) {
Ok(AnyValue::Float64(v)) => Some(v),
_ => None,
})
});
let max = col.max::<f64>().ok().flatten();

let mut descriptors = vec![Some(count), sum, mean, median, std, min];
descriptors.append(&mut quantiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::dataframe::values::{Column, NuDataFrame, NuExpression, NuLazyFrame};
use crate::values::CustomValueSupport;
use crate::PolarsPlugin;
use nu_plugin::{EngineInterface, EvaluatedCall, PluginCommand};
use nu_protocol::{
Category, Example, LabeledError, PipelineData, ShellError, Signature, Span, Type, Value,
};
use nu_protocol::{Category, Example, LabeledError, PipelineData, Signature, Span, Type, Value};

// The structs defined in this file are structs that form part of other commands
// since they share a similar name
Expand Down Expand Up @@ -60,6 +58,7 @@ macro_rules! expr_command {
mod $test {
use super::*;
use crate::test::test_polars_plugin_command;
use nu_protocol::ShellError;

#[test]
fn test_examples() -> Result<(), ShellError> {
Expand Down Expand Up @@ -163,19 +162,7 @@ macro_rules! lazy_expr_command {
if NuDataFrame::can_downcast(&value) || NuLazyFrame::can_downcast(&value) {
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)
.map_err(LabeledError::from)?;
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars()
.$func()
.map_err(|e| ShellError::GenericError {
error: "Dataframe Error".into(),
msg: e.to_string(),
help: None,
span: None,
inner: vec![],
})
.map_err(LabeledError::from)?,
);
let lazy = NuLazyFrame::new(lazy.from_eager, lazy.to_polars().$func());
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
} else {
Expand All @@ -192,6 +179,7 @@ macro_rules! lazy_expr_command {
mod $test {
use super::*;
use crate::test::test_polars_plugin_command;
use nu_protocol::ShellError;

#[test]
fn test_examples() -> Result<(), ShellError> {
Expand Down Expand Up @@ -244,19 +232,7 @@ macro_rules! lazy_expr_command {
if NuDataFrame::can_downcast(&value) || NuLazyFrame::can_downcast(&value) {
let lazy = NuLazyFrame::try_from_value_coerce(plugin, &value)
.map_err(LabeledError::from)?;
let lazy = NuLazyFrame::new(
lazy.from_eager,
lazy.to_polars()
.$func($ddof)
.map_err(|e| ShellError::GenericError {
error: "Dataframe Error".into(),
msg: e.to_string(),
help: None,
span: None,
inner: vec![],
})
.map_err(LabeledError::from)?,
);
let lazy = NuLazyFrame::new(lazy.from_eager, lazy.to_polars().$func($ddof));
lazy.to_pipeline_data(plugin, engine, call.head)
.map_err(LabeledError::from)
} else {
Expand All @@ -272,6 +248,7 @@ macro_rules! lazy_expr_command {
mod $test {
use super::*;
use crate::test::test_polars_plugin_command;
use nu_protocol::ShellError;

#[test]
fn test_examples() -> Result<(), ShellError> {
Expand Down
2 changes: 1 addition & 1 deletion crates/nu_plugin_polars/src/dataframe/expressions/lit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl PluginCommand for ExprLit {
example: "polars lit 2 | polars into-nu",
result: Some(Value::test_record(record! {
"expr" => Value::test_string("literal"),
"value" => Value::test_string("2"),
"value" => Value::test_string("dyn int: 2"),
})),
}]
}
Expand Down
1 change: 1 addition & 0 deletions crates/nu_plugin_polars/src/dataframe/lazy/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ fn get_col_name(expr: &Expr) -> Option<String> {
| Expr::Len
| Expr::Nth(_)
| Expr::SubPlan(_, _)
| Expr::IndexColumn(_)
| Expr::Selector(_) => None,
}
}
Expand Down
Loading

0 comments on commit a6b1d1f

Please sign in to comment.