Skip to content

Commit

Permalink
refactor code about subquery_alias and expr-alias. (apache#4451)
Browse files Browse the repository at this point in the history
* remove project_with_alias.

refactor: cleanup code of `subqueryAlias` and `expr-alias`.

* separate project and subquery_alias

* expr alias

* replace with `.alias()`

* fix comment
  • Loading branch information
jackwener committed Dec 5, 2022
1 parent 394c5ee commit 44c20ef
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 140 deletions.
24 changes: 10 additions & 14 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,21 +1137,17 @@ mod ci {
Box::new(trim(col(Field::name(field)))),
DataType::Float64,
)));
Expr::Alias(
Box::new(Expr::Cast(Cast::new(
inner_cast,
Field::data_type(field).to_owned(),
))),
Field::name(field).to_string(),
)
}
_ => Expr::Alias(
Box::new(Expr::Cast(Cast::new(
Box::new(trim(col(Field::name(field)))),
Expr::Cast(Cast::new(
inner_cast,
Field::data_type(field).to_owned(),
))),
Field::name(field).to_string(),
),
))
.alias(Field::name(field))
}
_ => Expr::Cast(Cast::new(
Box::new(trim(col(Field::name(field)))),
Field::data_type(field).to_owned(),
))
.alias(Field::name(field)),
}
})
.collect::<Vec<Expr>>(),
Expand Down
12 changes: 3 additions & 9 deletions benchmarks/src/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,20 +498,14 @@ pub async fn transform_actual_result(
fun: datafusion::logical_expr::BuiltinScalarFunction::Round,
args: vec![col(Field::name(field)).mul(lit(100))],
}.div(lit(100)));
Expr::Alias(
Box::new(Expr::Cast(Cast::new(
Expr::Cast(Cast::new(
round,
DataType::Decimal128(15, 2),
))),
field.name().to_string(),
)
)).alias(field.name())
}
DataType::Utf8 => {
// if string, then trim it like the answers got trimmed
Expr::Alias(
Box::new(trim(col(Field::name(field)))),
field.name().to_string(),
)
trim(col(Field::name(field))).alias(field.name())
}
_ => {
col(field.name())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ impl DataFrame {
LogicalPlanBuilder::window_plan(self.plan.clone(), window_func_exprs)?
};

let new_column = Expr::Alias(Box::new(expr), name.to_string());
let new_column = expr.alias(name);
let mut col_exists = false;
let mut fields: Vec<Expr> = plan
.schema()
Expand Down
5 changes: 1 addition & 4 deletions datafusion/expr/src/expr_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,7 @@ pub fn coerce_plan_expr_for_schema(
(
LogicalPlan::Projection(Projection { input, .. }),
Expr::Alias(e, alias),
) => Ok(Expr::Alias(
Box::new(e.clone().cast_to(new_type, input.schema())?),
alias.clone(),
)),
) => Ok(e.clone().cast_to(new_type, input.schema())?.alias(alias)),
_ => expr.cast_to(new_type, plan.schema()),
}
} else {
Expand Down
57 changes: 18 additions & 39 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,20 +267,7 @@ impl LogicalPlanBuilder {
&self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<Self> {
self.project_with_alias(expr, None)
}

/// Apply a projection with alias
pub fn project_with_alias(
&self,
expr: impl IntoIterator<Item = impl Into<Expr>>,
alias: Option<String>,
) -> Result<Self> {
Ok(Self::from(project_with_alias(
self.plan.clone(),
expr,
alias,
)?))
Ok(Self::from(project(self.plan.clone(), expr)?))
}

/// Apply a filter
Expand Down Expand Up @@ -308,14 +295,7 @@ impl LogicalPlanBuilder {

/// Apply an alias
pub fn alias(&self, alias: &str) -> Result<Self> {
let schema: Schema = self.schema().as_ref().clone().into();
let schema =
DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(Self::from(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(self.plan.clone()),
alias: alias.to_string(),
schema,
})))
Ok(Self::from(subquery_alias(&self.plan, alias)?))
}

/// Add missing sort columns to all downstream projection
Expand All @@ -342,7 +322,7 @@ impl LogicalPlanBuilder {
// projected alias.
missing_exprs.retain(|e| !expr.contains(e));
expr.extend(missing_exprs);
Ok(project_with_alias((*input).clone(), expr, None)?)
Ok(project((*input).clone(), expr)?)
}
_ => {
let new_inputs = curr_plan
Expand Down Expand Up @@ -948,15 +928,14 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalP
}))
}

/// Project with optional alias
/// Create Projection
/// # Errors
/// This function errors under any of the following conditions:
/// * Two or more expressions have the same name
/// * An invalid expression is used (e.g. a `sort` expression)
pub fn project_with_alias(
pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
alias: Option<String>,
) -> Result<LogicalPlan> {
let input_schema = plan.schema();
let mut projected_expr = vec![];
Expand All @@ -978,26 +957,26 @@ pub fn project_with_alias(
plan.schema().metadata().clone(),
)?;

let projection = LogicalPlan::Projection(Projection::try_new_with_schema(
Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
projected_expr,
Arc::new(plan.clone()),
DFSchemaRef::new(input_schema),
)?);
match alias {
Some(alias) => Ok(with_alias(projection, alias)),
None => Ok(projection),
}
)?))
}

/// Create a SubqueryAlias to wrap a LogicalPlan.
pub fn with_alias(plan: LogicalPlan, alias: String) -> LogicalPlan {
let plan_schema = &**plan.schema();
let schema = (plan_schema.clone()).replace_qualifier(alias.as_str());
LogicalPlan::SubqueryAlias(SubqueryAlias {
pub fn subquery_alias(plan: &LogicalPlan, alias: &str) -> Result<LogicalPlan> {
subquery_alias_owned(plan.clone(), alias)
}

pub fn subquery_alias_owned(plan: LogicalPlan, alias: &str) -> Result<LogicalPlan> {
let schema: Schema = plan.schema().as_ref().clone().into();
let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
input: Arc::new(plan),
alias,
schema: Arc::new(schema),
})
alias: alias.to_string(),
schema,
}))
}

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
match e {
Expr::Column(_) => e,
Expr::Alias(inner_expr, name) => {
Expr::Alias(Box::new(columnize_expr(*inner_expr, input_schema)), name)
columnize_expr(*inner_expr, input_schema).alias(name)
}
Expr::Cast(Cast { expr, data_type }) => Expr::Cast(Cast {
expr: Box::new(columnize_expr(*expr, input_schema)),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/decorrelate_where_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ fn optimize_where_in(
}
let projection = alias_cols(&subqry_cols);
let subqry_plan = subqry_plan
.project_with_alias(projection, Some(subqry_alias.clone()))?
.project(projection)?
.alias(&subqry_alias)?
.build()?;
debug!("subquery plan:\n{}", subqry_plan.display_indent());

Expand Down
7 changes: 3 additions & 4 deletions datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ impl OptimizerRule for InlineTableScan {
// Recursively apply optimization
let plan =
utils::optimize_children(self, sub_plan, _optimizer_config)?;
let plan = LogicalPlanBuilder::from(plan).project_with_alias(
vec![Expr::Wildcard],
Some(table_name.to_string()),
)?;
let plan = LogicalPlanBuilder::from(plan)
.project(vec![Expr::Wildcard])?
.alias(table_name)?;
plan.build()
} else {
// No plan available, return with table scan as is
Expand Down
9 changes: 6 additions & 3 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,8 @@ mod tests {
fn union_all_on_projection() -> Result<()> {
let table_scan = test_table_scan()?;
let table = LogicalPlanBuilder::from(table_scan)
.project_with_alias(vec![col("a").alias("b")], Some("test2".to_string()))?;
.project(vec![col("a").alias("b")])?
.alias("test2")?;

let plan = table
.union(table.build()?)?
Expand Down Expand Up @@ -2217,8 +2218,10 @@ mod tests {
fn test_propagation_of_optimized_inner_filters_with_projections() -> Result<()> {
// SELECT a FROM (SELECT 1 AS a) b WHERE b.a = 1
let plan = LogicalPlanBuilder::empty(true)
.project_with_alias(vec![lit(0i64).alias("a")], Some("b".to_owned()))?
.project_with_alias(vec![col("b.a")], Some("b".to_owned()))?
.project(vec![lit(0i64).alias("a")])?
.alias("b")?
.project(vec![col("b.a")])?
.alias("b")?
.filter(col("b.a").eq(lit(1i64)))?
.project(vec![col("b.a")])?
.build()?;
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ fn optimize_scalar(
.collect();
let subqry_plan = subqry_plan
.aggregate(group_by, aggr.aggr_expr.clone())?
.project_with_alias(proj, Some(subqry_alias.clone()))?
.project(proj)?
.alias(&subqry_alias)?
.build()?;

// qualify the join columns for outside the subquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,7 @@ mod tests {
fn multiple_now_expr() {
let table_scan = test_table_scan();
let time = Utc::now();
let proj = vec![
now_expr(),
Expr::Alias(Box::new(now_expr()), "t2".to_string()),
];
let proj = vec![now_expr(), now_expr().alias("t2")];
let plan = LogicalPlanBuilder::from(table_scan)
.project(proj)
.unwrap()
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/subquery_filter_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ mod tests {
let table_scan = test_table_scan()?;
let plan = LogicalPlanBuilder::from(table_scan)
.filter(in_subquery(col("c"), test_subquery_with_name("sq_inner")?))?
.project_with_alias(vec![col("b"), col("c")], Some("wrapped".to_string()))?
.project(vec![col("b"), col("c")])?
.alias("wrapped")?
.filter(or(
binary_expr(col("b"), Operator::Lt, lit(30_u32)),
in_subquery(col("c"), test_subquery_with_name("sq_outer")?),
Expand Down
23 changes: 12 additions & 11 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_common::{context, Column, DataFusionError};
use datafusion_expr::logical_plan::builder::{project, subquery_alias_owned};
use datafusion_expr::{
logical_plan::{
Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateView,
Expand Down Expand Up @@ -323,21 +324,21 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Projection(projection) => {
let input: LogicalPlan =
into_logical_plan!(projection.input, ctx, extension_codec)?;
let x: Vec<Expr> = projection
let expr: Vec<Expr> = projection
.expr
.iter()
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
LogicalPlanBuilder::from(input)
.project_with_alias(
x,
projection.optional_alias.as_ref().map(|a| match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
alias.clone()
}
}),
)?
.build()

let new_proj = project(input, expr)?;
match projection.optional_alias.as_ref() {
Some(a) => match a {
protobuf::projection_node::OptionalAlias::Alias(alias) => {
subquery_alias_owned(new_proj, alias)
}
},
_ => Ok(new_proj),
}
}
LogicalPlanType::Selection(selection) => {
let input: LogicalPlan =
Expand Down
Loading

0 comments on commit 44c20ef

Please sign in to comment.