Skip to content

Commit

Permalink
Show optimization errors in explain (apache#4819)
Browse files Browse the repository at this point in the history
* Explain show physical optimization errors

* Revert "Explain show physical optimization errors"

This reverts commit cdd21be.

* Don't fail explain if failure during planning
  • Loading branch information
Jefffrey committed Jan 27, 2023
1 parent dd09212 commit 7673fcc
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 36 deletions.
18 changes: 14 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
optimizer::PhysicalOptimizerRule,
},
};
use datafusion_expr::DescribeTable;
use datafusion_expr::{DescribeTable, StringifiedPlan};
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
Expand Down Expand Up @@ -1766,21 +1766,31 @@ impl SessionState {
let mut stringified_plans = e.stringified_plans.clone();

// optimize the child plan, capturing the output of each optimizer
let plan = self.optimizer.optimize(
let (plan, logical_optimization_succeeded) = match self.optimizer.optimize(
e.plan.as_ref(),
self,
|optimized_plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans.push(optimized_plan.to_stringified(plan_type));
},
)?;
) {
Ok(plan) => (Arc::new(plan), true),
Err(DataFusionError::Context(optimizer_name, err)) => {
let plan_type = PlanType::OptimizedLogicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, err.to_string()));
(e.plan.clone(), false)
}
Err(e) => return Err(e),
};

Ok(LogicalPlan::Explain(Explain {
verbose: e.verbose,
plan: Arc::new(plan),
plan,
stringified_plans,
schema: e.schema.clone(),
logical_optimization_succeeded,
}))
} else {
self.optimizer.optimize(plan, self, |_, _| {})
Expand Down
79 changes: 53 additions & 26 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ use datafusion_expr::expr::{
Like, TryCast, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::utils::expand_wildcard;
use datafusion_expr::{logical_plan, StringifiedPlan};
use datafusion_expr::{WindowFrame, WindowFrameBound};
use datafusion_optimizer::utils::unalias;
use datafusion_physical_expr::expressions::Literal;
Expand Down Expand Up @@ -1740,28 +1740,47 @@ impl DefaultPhysicalPlanner {

if !config.physical_plan_only {
stringified_plans = e.stringified_plans.clone();
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
if e.logical_optimization_succeeded {
stringified_plans.push(e.plan.to_stringified(FinalLogicalPlan));
}
}

if !config.logical_plan_only {
let input = self
if !config.logical_plan_only && e.logical_optimization_succeeded {
match self
.create_initial_plan(e.plan.as_ref(), session_state)
.await?;

stringified_plans.push(
displayable(input.as_ref()).to_stringified(InitialPhysicalPlan),
);

let input =
self.optimize_internal(input, session_state, |plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(displayable(plan).to_stringified(plan_type));
})?;
.await
{
Ok(input) => {
stringified_plans.push(
displayable(input.as_ref())
.to_stringified(InitialPhysicalPlan),
);

stringified_plans
.push(displayable(input.as_ref()).to_stringified(FinalPhysicalPlan));
match self.optimize_internal(
input,
session_state,
|plan, optimizer| {
let optimizer_name = optimizer.name().to_string();
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(displayable(plan).to_stringified(plan_type));
},
) {
Ok(input) => stringified_plans.push(
displayable(input.as_ref())
.to_stringified(FinalPhysicalPlan),
),
Err(DataFusionError::Context(optimizer_name, e)) => {
let plan_type = OptimizedPhysicalPlan { optimizer_name };
stringified_plans
.push(StringifiedPlan::new(plan_type, e.to_string()))
}
Err(e) => return Err(e),
}
}
Err(e) => stringified_plans
.push(StringifiedPlan::new(InitialPhysicalPlan, e.to_string())),
}
}

Ok(Some(Arc::new(ExplainExec::new(
Expand Down Expand Up @@ -1795,14 +1814,22 @@ impl DefaultPhysicalPlanner {
let mut new_plan = plan;
for optimizer in optimizers {
let before_schema = new_plan.schema();
new_plan = optimizer.optimize(new_plan, session_state.config_options())?;
new_plan = optimizer
.optimize(new_plan, session_state.config_options())
.map_err(|e| {
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
})?;
if optimizer.schema_check() && new_plan.schema() != before_schema {
return Err(DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
)));
let e = DataFusionError::Internal(format!(
"PhysicalOptimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
optimizer.name(),
before_schema,
new_plan.schema()
));
return Err(DataFusionError::Context(
optimizer.name().to_string(),
Box::new(e),
));
}
trace!(
"Optimized physical plan by {}:\n{}\n",
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ impl LogicalPlanBuilder {
plan: Arc::new(self.plan),
stringified_plans,
schema,
logical_optimization_succeeded: false,
})))
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,8 @@ pub struct Explain {
pub stringified_plans: Vec<StringifiedPlan>,
/// The output schema of the explain (2 columns of text)
pub schema: DFSchemaRef,
/// Used by physical planner to check if should proceed with planning
pub logical_optimization_succeeded: bool,
}

/// Runs the actual plan, and then prints the physical plan with
Expand Down
22 changes: 16 additions & 6 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,16 @@ impl Optimizer {
match result {
Ok(Some(plan)) => {
if !plan.schema().equivalent_names_and_types(new_plan.schema()) {
return Err(DataFusionError::Internal(format!(
let e = DataFusionError::Internal(format!(
"Optimizer rule '{}' failed, due to generate a different schema, original schema: {:?}, new schema: {:?}",
rule.name(),
new_plan.schema(),
plan.schema()
)));
));
return Err(DataFusionError::Context(
rule.name().to_string(),
Box::new(e),
));
}
new_plan = plan;
observer(&new_plan, rule.as_ref());
Expand All @@ -298,11 +302,15 @@ impl Optimizer {
e
);
} else {
return Err(DataFusionError::Internal(format!(
let e = DataFusionError::Internal(format!(
"Optimizer rule '{}' failed due to unexpected error: {}",
rule.name(),
e
)));
));
return Err(DataFusionError::Context(
rule.name().to_string(),
Box::new(e),
));
}
}
}
Expand Down Expand Up @@ -436,7 +444,8 @@ mod tests {
});
let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Internal error: Optimizer rule 'bad rule' failed due to unexpected error: \
"bad rule\ncaused by\n\
Internal error: Optimizer rule 'bad rule' failed due to unexpected error: \
Error during planning: rule failed. This was likely caused by a bug in \
DataFusion's code and we would welcome that you file an bug report in our issue tracker",
err.to_string()
Expand All @@ -453,7 +462,8 @@ mod tests {
});
let err = opt.optimize(&plan, &config, &observe).unwrap_err();
assert_eq!(
"Internal error: Optimizer rule 'get table_scan rule' failed, due to generate a different schema, \
"get table_scan rule\ncaused by\n\
Internal error: Optimizer rule 'get table_scan rule' failed, due to generate a different schema, \
original schema: DFSchema { fields: [], metadata: {} }, \
new schema: DFSchema { fields: [\
DFField { qualifier: Some(\"test\"), field: Field { name: \"a\", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} } }, \
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan,
stringified_plans,
schema,
logical_optimization_succeeded: false,
}))
}
}
Expand Down

0 comments on commit 7673fcc

Please sign in to comment.