Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic partition pruning #1102

Merged
merged 35 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
7ca1b39
q3 functionality
sarahyurick Mar 28, 2023
04845bc
style and minor functionality changes
sarahyurick Mar 29, 2023
2ac5693
some cleanup
sarahyurick Apr 4, 2023
1fbcc76
Merge branch 'main' into dpp
sarahyurick Apr 11, 2023
8ca673d
save progress
sarahyurick Apr 11, 2023
535302e
Merge branch 'main' into dpp
sarahyurick Apr 13, 2023
848a357
use inlist instead of binaryexpr
sarahyurick Apr 13, 2023
903533d
fix cargo test
sarahyurick Apr 13, 2023
d1c2b0b
fix some queries
sarahyurick Apr 14, 2023
b7ecfb5
use with_max_passes=1 and remove todos
sarahyurick Apr 20, 2023
5af3c3f
add warning
sarahyurick Apr 20, 2023
11e3d55
only run dpp once
sarahyurick Apr 21, 2023
f7ac414
null handling and double dtype
sarahyurick May 2, 2023
b3f82e5
minor style fixes
sarahyurick May 2, 2023
3fd16f6
clippy
sarahyurick May 2, 2023
da73605
Merge branch 'main' into dpp
sarahyurick May 4, 2023
4ad27c3
use adp imports
sarahyurick May 4, 2023
bef8517
add jeremy suggestions and better type logic
sarahyurick May 11, 2023
6d79107
style fix
sarahyurick May 11, 2023
dc9c5ca
MORE int/float logic
sarahyurick May 11, 2023
e3c364d
style fix
sarahyurick May 11, 2023
e65b1e3
Merge branch 'main' into dpp
sarahyurick May 15, 2023
b6ef201
fix some bugs
sarahyurick May 15, 2023
56ef3c4
add dask_config
sarahyurick May 16, 2023
cffc055
check for duplicate tablescans
sarahyurick May 30, 2023
26a7c62
fix row iterator
sarahyurick May 30, 2023
15dadf0
clippy
sarahyurick May 30, 2023
46e6c69
clippy again
sarahyurick May 30, 2023
cfacae0
Merge branch 'main' into dpp
jdye64 May 31, 2023
39872a9
Merge branch 'main' into dpp
sarahyurick Jun 7, 2023
4684acf
Merge branch 'main' into dpp
jdye64 Jun 13, 2023
567f44e
Merge branch 'main' into dpp
ayushdg Jun 20, 2023
182fe8e
add per-query config
sarahyurick Jun 21, 2023
80110fa
style
sarahyurick Jun 21, 2023
6c32d17
Merge branch 'main' into dpp
ayushdg Jun 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,13 +546,22 @@ impl DaskSQLContext {
warn!("This LogicalPlan does not support Optimization. Returning original");
Ok(existing_plan)
}
_ => optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp),
_ => {
let optimized_plan = optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);
optimizer::DaskSqlOptimizer::dynamic_partition_pruner()
.optimize_once(optimized_plan.unwrap().original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
}
}
}
Err(e) => Err(py_optimization_exp(e)),
Expand Down
22 changes: 22 additions & 0 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use datafusion_python::{
};
use log::{debug, trace};

mod dynamic_partition_pruning;
use dynamic_partition_pruning::DynamicPartitionPruning;

mod join_reorder;
use join_reorder::JoinReorder;

Expand Down Expand Up @@ -86,13 +89,32 @@ impl DaskSqlOptimizer {
}
}

// Create a separate instance of this optimization rule, since we want to ensure that it only
// runs one time
pub fn dynamic_partition_pruner() -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> =
vec![Arc::new(DynamicPartitionPruning::new())];

Self {
optimizer: Optimizer::with_rules(rule),
}
}

/// Iterates through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn optimize(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
let config = OptimizerContext::new();
self.optimizer.optimize(&plan, &config, Self::observe)
}

/// Iterates once through the configured `OptimizerRule`(s) to transform the input `LogicalPlan`
/// to its final optimized form
pub(crate) fn optimize_once(&self, plan: LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
let mut config = OptimizerContext::new();
config = OptimizerContext::with_max_passes(config, 1);
self.optimizer.optimize(&plan, &config, Self::observe)
}

fn observe(optimized_plan: &LogicalPlan, optimization: &dyn OptimizerRule) {
trace!(
"== AFTER APPLYING RULE {} ==\n{}\n",
Expand Down
Loading