Skip to content

Commit

Permalink
Deprecate SessionContext physical plan methods (apache#4617) (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Dec 28, 2022
1 parent 40bf559 commit c9350d1
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 56 deletions.
13 changes: 7 additions & 6 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,30 +325,31 @@ async fn execute_query(
enable_scheduler: bool,
) -> Result<Vec<RecordBatch>> {
let plan = ctx.sql(sql).await?;
let plan = plan.into_unoptimized_plan();
let (state, plan) = plan.into_parts();

if debug {
println!("=== Logical plan ===\n{:?}\n", plan);
}

let plan = ctx.optimize(&plan)?;
let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{:?}\n", plan);
}
let physical_plan = ctx.create_physical_plan(&plan).await?;
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent()
);
}
let task_ctx = ctx.task_ctx();
let result = if enable_scheduler {
let scheduler = Scheduler::new(num_cpus::get());
let results = scheduler.schedule(physical_plan.clone(), task_ctx).unwrap();
let results = scheduler
.schedule(physical_plan.clone(), state.task_ctx())
.unwrap();
results.stream().try_collect().await?
} else {
collect(physical_plan.clone(), task_ctx).await?
collect(physical_plan.clone(), state.task_ctx()).await?
};
if debug {
println!(
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ impl DataFrame {
&self.plan
}

/// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`]
pub fn into_parts(self) -> (SessionState, LogicalPlan) {
(self.session_state, self.plan)
}

/// Return the logical plan represented by this DataFrame without running the optimizers
///
/// Note: This method should not be used outside testing, as it loses the snapshot
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1019,11 +1019,17 @@ impl SessionContext {
}

/// Optimizes the logical plan by applying optimizer rules.
#[deprecated(
note = "Use SessionState::optimize to ensure a consistent state for planning and execution"
)]
pub fn optimize(&self, plan: &LogicalPlan) -> Result<LogicalPlan> {
self.state.read().optimize(plan)
}

/// Creates a physical plan from a logical plan.
#[deprecated(
note = "Use SessionState::create_physical_plan or DataFrame::create_physical_plan to ensure a consistent state for planning and execution"
)]
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand Down
15 changes: 6 additions & 9 deletions datafusion/core/tests/custom_sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,12 @@ async fn custom_source_dataframe() -> Result<()> {
let ctx = SessionContext::new();

let table = ctx.read_table(Arc::new(CustomTableProvider))?;
let logical_plan = LogicalPlanBuilder::from(table.into_optimized_plan()?)
let (state, plan) = table.into_parts();
let logical_plan = LogicalPlanBuilder::from(plan)
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
let optimized_plan = state.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -235,13 +236,12 @@ async fn custom_source_dataframe() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = state.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());

let task_ctx = ctx.task_ctx();
let batches = collect(physical_plan, task_ctx).await?;
let batches = collect(physical_plan, state.task_ctx()).await?;
let origin_rec_batch = TEST_CUSTOM_RECORD_BATCH!()?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
Expand All @@ -261,10 +261,7 @@ async fn optimizers_catch_all_statistics() {
.await
.unwrap();

let physical_plan = ctx
.create_physical_plan(&df.into_optimized_plan().unwrap())
.await
.unwrap();
let physical_plan = df.create_physical_plan().await.unwrap();

// when the optimization kicks in, the source is replaced by an EmptyExec
assert!(
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,15 @@ impl ContextWithParquet {
.expect("getting input");
let pretty_input = pretty_format_batches(&input).unwrap().to_string();

let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan");
let state = self.ctx.state();
let logical_plan = state.optimize(&logical_plan).expect("optimizing plan");

let physical_plan = self
.ctx
let physical_plan = state
.create_physical_plan(&logical_plan)
.await
.expect("creating physical plan");

let task_ctx = self.ctx.task_ctx();
let task_ctx = state.task_ctx();
let results = datafusion::physical_plan::collect(physical_plan.clone(), task_ctx)
.await
.expect("Running");
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,7 @@ async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()>
#[tokio::test]
async fn aggregate_with_alias() -> Result<()> {
let ctx = SessionContext::new();
let state = ctx.state();

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Expand All @@ -1120,9 +1121,8 @@ async fn aggregate_with_alias() -> Result<()> {
.project(vec![col("c1"), sum(col("c2")).alias("total_salary")])?
.build()?;

let plan = ctx.optimize(&plan)?;

let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
let plan = state.optimize(&plan)?;
let physical_plan = state.create_physical_plan(&Arc::new(plan)).await?;
assert_eq!("c1", physical_plan.schema().field(0).name().as_str());
assert_eq!(
"total_salary",
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/tests/sql/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ fn optimize_explain() {
panic!("plan was not an explain: {:?}", plan);
}

let ctx = SessionContext::new();
let state = ctx.state();

// now optimize the plan and expect to see more plans
let optimized_plan = SessionContext::new().optimize(&plan).unwrap();
let optimized_plan = state.optimize(&plan).unwrap();
if let LogicalPlan::Explain(e) = &optimized_plan {
// should have more than one plan
assert!(
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ async fn csv_explain_plans() {
);

// Optimized logical plan
//
let state = ctx.state();
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
let plan = ctx.optimize(plan).expect(&msg);
let plan = state.optimize(plan).expect(&msg);
let optimized_logical_schema = plan.schema();
// Both schema has to be the same
assert_eq!(logical_schema, optimized_logical_schema.as_ref());
Expand Down Expand Up @@ -334,12 +334,11 @@ async fn csv_explain_plans() {
// Physical plan
// Create plan
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
let plan = state.create_physical_plan(&plan).await.expect(&msg);
//
// Execute plan
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let task_ctx = ctx.task_ctx();
let results = collect(plan, task_ctx).await.expect(&msg);
let results = collect(plan, state.task_ctx()).await.expect(&msg);
let actual = result_vec(&results);
// flatten to a single string
let actual = actual.into_iter().map(|r| r.join("\t")).collect::<String>();
Expand Down Expand Up @@ -481,9 +480,9 @@ async fn csv_explain_verbose_plans() {
);

// Optimized logical plan
//
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, dataframe);
let plan = dataframe.into_optimized_plan().expect(&msg);
let (state, plan) = dataframe.into_parts();
let plan = state.optimize(&plan).expect(&msg);
let optimized_logical_schema = plan.schema();
// Both schema has to be the same
assert_eq!(&logical_schema, optimized_logical_schema.as_ref());
Expand Down Expand Up @@ -558,7 +557,7 @@ async fn csv_explain_verbose_plans() {
// Physical plan
// Create plan
let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx.create_physical_plan(&plan).await.expect(&msg);
let plan = state.create_physical_plan(&plan).await.expect(&msg);
//
// Execute plan
let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,13 +1022,11 @@ async fn try_execute_to_batches(
) -> Result<Vec<RecordBatch>> {
let dataframe = ctx.sql(sql).await?;
let logical_schema = dataframe.schema().clone();
let (state, plan) = dataframe.into_parts();

let optimized = ctx.optimize(dataframe.logical_plan())?;
let optimized_logical_schema = optimized.schema();
let results = dataframe.collect().await?;

assert_eq!(&logical_schema, optimized_logical_schema.as_ref());
Ok(results)
let optimized = state.optimize(&plan)?;
assert_eq!(&logical_schema, optimized.schema().as_ref());
DataFrame::new(state, optimized).collect().await
}

/// Execute query and return results as a Vec of RecordBatches
Expand Down
25 changes: 12 additions & 13 deletions datafusion/core/tests/sql/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ async fn projection_on_table_scan() -> Result<()> {
.project(vec![col("c2")])?
.build()?;

let optimized_plan = ctx.optimize(&logical_plan)?;
let state = ctx.state();
let optimized_plan = state.optimize(&logical_plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -192,12 +193,11 @@ async fn projection_on_table_scan() -> Result<()> {
\n TableScan: test projection=[c2]";
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = state.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("c2", physical_plan.schema().field(0).name().as_str());
let task_ctx = ctx.task_ctx();
let batches = collect(physical_plan, task_ctx).await?;
let batches = collect(physical_plan, state.task_ctx()).await?;
assert_eq!(40, batches.iter().map(|x| x.num_rows()).sum::<usize>());

Ok(())
Expand All @@ -215,8 +215,8 @@ async fn preserve_nullability_on_projection() -> Result<()> {
.project(vec![col("c1")])?
.build()?;

let plan = ctx.optimize(&plan)?;
let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?;
let dataframe = DataFrame::new(ctx.state(), plan);
let physical_plan = dataframe.create_physical_plan().await?;
assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable());
Ok(())
}
Expand Down Expand Up @@ -247,9 +247,8 @@ async fn project_cast_dictionary() {
.unwrap();

let logical_plan = builder.project(vec![expr]).unwrap().build().unwrap();

let physical_plan = ctx.create_physical_plan(&logical_plan).await.unwrap();
let actual = collect(physical_plan, ctx.task_ctx()).await.unwrap();
let df = DataFrame::new(ctx.state(), logical_plan);
let actual = df.collect().await.unwrap();

let expected = vec![
"+----------------------------------------------------------------------------------+",
Expand Down Expand Up @@ -289,7 +288,8 @@ async fn projection_on_memory_scan() -> Result<()> {
assert_fields_eq(&plan, vec!["b"]);

let ctx = SessionContext::new();
let optimized_plan = ctx.optimize(&plan)?;
let state = ctx.state();
let optimized_plan = state.optimize(&plan)?;
match &optimized_plan {
LogicalPlan::Projection(Projection { input, .. }) => match &**input {
LogicalPlan::TableScan(TableScan {
Expand All @@ -312,13 +312,12 @@ async fn projection_on_memory_scan() -> Result<()> {
);
assert_eq!(format!("{:?}", optimized_plan), expected);

let physical_plan = ctx.create_physical_plan(&optimized_plan).await?;
let physical_plan = state.create_physical_plan(&optimized_plan).await?;

assert_eq!(1, physical_plan.schema().fields().len());
assert_eq!("b", physical_plan.schema().field(0).name().as_str());

let task_ctx = ctx.task_ctx();
let batches = collect(physical_plan, task_ctx).await?;
let batches = collect(physical_plan, state.task_ctx()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
assert_eq!(4, batches[0].num_rows());
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/tests/sql/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ async fn scalar_udf() -> Result<()> {
"Projection: t.a, t.b, my_add(t.a, t.b)\n TableScan: t projection=[a, b]"
);

let plan = ctx.optimize(&plan)?;
let plan = ctx.create_physical_plan(&plan).await?;
let task_ctx = ctx.task_ctx();
let result = collect(plan, task_ctx).await?;
let result = DataFrame::new(ctx.state(), plan).collect().await?;

let expected = vec![
"+-----+-----+-----------------+",
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/tpcds_planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,9 +1067,10 @@ async fn regression_test(query_no: u8, create_physical: bool) -> Result<()> {

for sql in &sql {
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
let (state, plan) = df.into_parts();
let plan = state.optimize(&plan)?;
if create_physical {
let _ = ctx.create_physical_plan(&plan).await?;
let _ = state.create_physical_plan(&plan).await?;
}
}

Expand Down

0 comments on commit c9350d1

Please sign in to comment.