From 44114c6d6698929da43b2b63559ea0718ae20bde Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sun, 29 Jan 2023 20:33:13 -0600 Subject: [PATCH 1/2] Improve get_meet_of_orderings to check for common prefixes among the given orderings --- datafusion/core/src/physical_plan/common.rs | 197 +++++++++++++++++--- 1 file changed, 172 insertions(+), 25 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index c3e243d83904..f3f237985f19 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -28,7 +28,6 @@ use arrow::error::ArrowError; use arrow::error::Result as ArrowResult; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; -use datafusion_physical_expr::utils::ordering_satisfy; use datafusion_physical_expr::PhysicalSortExpr; use futures::{Future, Stream, StreamExt, TryStreamExt}; use log::debug; @@ -324,34 +323,37 @@ pub fn transpose(original: Vec>) -> Vec> { } } -/// Calculates the "meet" of children orderings -/// The meet is the finest ordering that satisfied by all the input +/// Calculates the "meet" of given orderings. +/// The meet is the finest ordering that satisfied by all the given /// orderings, see https://en.wikipedia.org/wiki/Join_and_meet. pub fn get_meet_of_orderings( - children: &[Arc], + given: &[Arc], ) -> Option<&[PhysicalSortExpr]> { - // To find the meet, we first find the smallest input ordering. - let mut smallest: Option<&[PhysicalSortExpr]> = None; - for item in children.iter() { - if let Some(ordering) = item.output_ordering() { - smallest = match smallest { - None => Some(ordering), - Some(expr) if ordering.len() < expr.len() => Some(ordering), - _ => continue, + given + .iter() + .map(|item| item.output_ordering()) + .collect::>>() + .and_then(get_meet_of_orderings_helper) +} + +fn get_meet_of_orderings_helper( + orderings: Vec<&[PhysicalSortExpr]>, +) -> Option<&[PhysicalSortExpr]> { + let mut idx = 0; + let first = orderings[0]; + loop { + for ordering in orderings.iter() { + if idx >= ordering.len() { + return Some(ordering); + } else if ordering[idx] != first[idx] { + return if idx > 0 { + Some(&ordering[..idx]) + } else { + None + }; } - } else { - return None; } - } - // Check if the smallest ordering is a meet or not: - if children.iter().all(|child| { - ordering_satisfy(child.output_ordering(), smallest, || { - child.equivalence_properties() - }) - }) { - smallest - } else { - None + idx += 1; } } @@ -368,7 +370,152 @@ mod tests { datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }; - use datafusion_physical_expr::expressions::col; + use datafusion_physical_expr::expressions::{col, Column}; + + #[test] + fn get_meet_of_orderings_helper_common_prefix_test() -> Result<()> { + let input1: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("c", 2)), + options: SortOptions::default(), + }, + ]; + + let input2: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 2)), + options: SortOptions::default(), + }, + ]; + + let input3: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("x", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 2)), + options: SortOptions::default(), + }, + ]; + + let expected = vec![PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }]; + + let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]); + assert_eq!(result.unwrap(), expected); + Ok(()) + } + + #[test] + fn get_meet_of_orderings_helper_subset_test() -> Result<()> { + let input1: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + ]; + + let input2: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("c", 2)), + options: SortOptions::default(), + }, + ]; + + let input3: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("d", 2)), + options: SortOptions::default(), + }, + ]; + + let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]); + assert_eq!(result.unwrap(), input1); + Ok(()) + } + + #[test] + fn get_meet_of_orderings_helper_no_overlap_test() -> Result<()> { + let input1: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("b", 1)), + options: SortOptions::default(), + }, + ]; + + let input2: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("x", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 1)), + options: SortOptions::default(), + }, + ]; + + let input3: Vec = vec![ + PhysicalSortExpr { + expr: Arc::new(Column::new("a", 0)), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: Arc::new(Column::new("y", 1)), + options: SortOptions::default(), + }, + ]; + + let result = get_meet_of_orderings_helper(vec![&input1, &input2, &input3]); + assert!(result.is_none()); + Ok(()) + } #[test] fn test_meet_of_orderings() -> Result<()> { From 21d00a1c1348a5347d1935640e56e24f08bdd7bc Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sun, 29 Jan 2023 22:39:31 -0600 Subject: [PATCH 2/2] Incorporate review suggestions --- datafusion/core/src/physical_plan/union.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/datafusion/core/src/physical_plan/union.rs b/datafusion/core/src/physical_plan/union.rs index df78058082f5..2ac438a3c19f 100644 --- a/datafusion/core/src/physical_plan/union.rs +++ b/datafusion/core/src/physical_plan/union.rs @@ -48,7 +48,6 @@ use crate::{ error::Result, physical_plan::{expressions, metrics::BaselineMetrics}, }; -use datafusion_physical_expr::utils::ordering_satisfy; use tokio::macros::support::thread_rng_n; /// `UnionExec`: `UNION ALL` execution plan. @@ -240,14 +239,20 @@ impl ExecutionPlan for UnionExec { // which is the "meet" of all input orderings. In this example, this // function will return vec![false, true, true], indicating that we // preserve the orderings for the 2nd and the 3rd children. - self.inputs() - .iter() - .map(|child| { - ordering_satisfy(self.output_ordering(), child.output_ordering(), || { - child.equivalence_properties() + if let Some(output_ordering) = self.output_ordering() { + self.inputs() + .iter() + .map(|child| { + if let Some(child_ordering) = child.output_ordering() { + output_ordering.len() == child_ordering.len() + } else { + false + } }) - }) - .collect() + .collect() + } else { + vec![false; self.inputs().len()] + } } fn with_new_children(