Skip to content

Commit

Permalink
Don't share ConfigOptions (apache#3886) (apache#4712)
Browse files Browse the repository at this point in the history
* Don't share ConfigOptions (apache#3886)

* More tweaks

* Fix TestParquetFile

* Clippy

* Fix doctest

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb committed Dec 23, 2022
1 parent afb1ae2 commit 07f4980
Show file tree
Hide file tree
Showing 38 changed files with 384 additions and 731 deletions.
26 changes: 10 additions & 16 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use datafusion::common::Result;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use datafusion::prelude::{col, SessionContext};
use parquet::file::properties::WriterProperties;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
Expand Down Expand Up @@ -69,9 +69,6 @@ async fn main() -> Result<()> {
let opt: Opt = Opt::from_args();
println!("Running benchmarks with the following options: {:?}", opt);

let config = SessionConfig::new().with_target_partitions(opt.partitions);
let mut ctx = SessionContext::with_config(config);

let path = opt.path.join("logs.parquet");

let mut props_builder = WriterProperties::builder();
Expand All @@ -88,17 +85,12 @@ async fn main() -> Result<()> {

let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;

run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
run_benchmarks(opt, &test_file).await?;

Ok(())
}

async fn run_benchmarks(
ctx: &mut SessionContext,
test_file: &TestParquetFile,
iterations: usize,
debug: bool,
) -> Result<()> {
async fn run_benchmarks(opt: Opt, test_file: &TestParquetFile) -> Result<()> {
let scan_options_matrix = vec![
ParquetScanOptions {
pushdown_filters: false,
Expand Down Expand Up @@ -148,11 +140,14 @@ async fn run_benchmarks(
println!("Executing with filter '{}'", filter_expr);
for scan_options in &scan_options_matrix {
println!("Using scan options {:?}", scan_options);
for i in 0..iterations {
for i in 0..opt.iterations {
let start = Instant::now();

let config = scan_options.config().with_target_partitions(opt.partitions);
let ctx = SessionContext::with_config(config);

let rows =
exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
.await?;
exec_scan(&ctx, test_file, filter_expr.clone(), opt.debug).await?;
println!(
"Iteration {} returned {} rows in {} ms",
i,
Expand All @@ -170,10 +165,9 @@ async fn exec_scan(
ctx: &SessionContext,
test_file: &TestParquetFile,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
let exec = test_file.create_scan(filter, scan_options).await?;
let exec = test_file.create_scan(filter).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,7 @@ async fn get_table(
}
"parquet" => {
let path = format!("{}/{}", path, table);
let format = ParquetFormat::new(state.config_options())
.with_enable_pruning(Some(true));
let format = ParquetFormat::default().with_enable_pruning(Some(true));

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down
39 changes: 11 additions & 28 deletions datafusion-examples/examples/flight_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;
use std::sync::Arc;

use arrow_flight::SchemaAsIpc;
use datafusion::arrow::error::ArrowError;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use futures::Stream;
use futures::stream::BoxStream;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

Expand All @@ -39,37 +38,21 @@ pub struct FlightServiceImpl {}

#[tonic::async_trait]
impl FlightService for FlightServiceImpl {
type HandshakeStream = Pin<
Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send + Sync + 'static>,
>;
type ListFlightsStream =
Pin<Box<dyn Stream<Item = Result<FlightInfo, Status>> + Send + Sync + 'static>>;
type DoGetStream =
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
type DoPutStream =
Pin<Box<dyn Stream<Item = Result<PutResult, Status>> + Send + Sync + 'static>>;
type DoActionStream = Pin<
Box<
dyn Stream<Item = Result<arrow_flight::Result, Status>>
+ Send
+ Sync
+ 'static,
>,
>;
type ListActionsStream =
Pin<Box<dyn Stream<Item = Result<ActionType, Status>> + Send + Sync + 'static>>;
type DoExchangeStream =
Pin<Box<dyn Stream<Item = Result<FlightData, Status>> + Send + Sync + 'static>>;
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;

async fn get_schema(
&self,
request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
let request = request.into_inner();

let config = SessionConfig::new();
let listing_options =
ListingOptions::new(Arc::new(ParquetFormat::new(config.config_options())));
let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let table_path =
ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?;

Expand All @@ -79,10 +62,10 @@ impl FlightService for FlightServiceImpl {
.await
.unwrap();

let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let options = arrow::ipc::writer::IpcWriteOptions::default();
let schema_result = SchemaAsIpc::new(&schema, &options)
.try_into()
.map_err(|e: ArrowError| tonic::Status::internal(e.to_string()))?;
.map_err(|e: ArrowError| Status::internal(e.to_string()))?;

Ok(Response::new(schema_result))
}
Expand Down
3 changes: 1 addition & 2 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ async fn main() -> Result<()> {
let testdata = datafusion::test_util::parquet_test_data();

// Configure listing options
let file_format =
ParquetFormat::new(ctx.config_options()).with_enable_pruning(Some(true));
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());

Expand Down
42 changes: 20 additions & 22 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use arrow::{
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
};
use parking_lot::RwLock;

use datafusion_common::Result;

use crate::config::ConfigOptions;
use crate::datasource::streaming::{PartitionStream, StreamingTable};
use crate::datasource::TableProvider;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
Expand All @@ -55,20 +55,17 @@ const DF_SETTINGS: &str = "df_settings";
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
}
}
Expand All @@ -89,15 +86,10 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig {
catalog_list,
config_options,
},
}) as Arc<dyn SchemaProvider>
})
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider {
config: InformationSchemaConfig { catalog_list },
}) as Arc<dyn SchemaProvider>
})
} else {
self.inner.schema(name)
Expand Down Expand Up @@ -127,7 +119,6 @@ struct InformationSchemaProvider {
#[derive(Clone)]
struct InformationSchemaConfig {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}

impl InformationSchemaConfig {
Expand Down Expand Up @@ -220,8 +211,12 @@ impl InformationSchemaConfig {
}

/// Construct the `information_schema.df_settings` virtual table
fn make_df_settings(&self, builder: &mut InformationSchemaDfSettingsBuilder) {
for (name, setting) in self.config_options.read().options() {
fn make_df_settings(
&self,
config_options: &ConfigOptions,
builder: &mut InformationSchemaDfSettingsBuilder,
) {
for (name, setting) in config_options.options() {
builder.add_setting(name, setting.to_string());
}
}
Expand Down Expand Up @@ -298,7 +293,7 @@ impl PartitionStream for InformationSchemaTables {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -389,7 +384,7 @@ impl PartitionStream for InformationSchemaViews {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -503,7 +498,7 @@ impl PartitionStream for InformationSchemaColumns {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -690,15 +685,18 @@ impl PartitionStream for InformationSchemaDfSettings {
&self.schema
}

fn execute(&self) -> SendableRecordBatchStream {
let mut builder = self.builder();
fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let config = self.config.clone();
let mut builder = self.builder();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
// create a mem table with the names of tables
config.make_df_settings(&mut builder);
config.make_df_settings(
ctx.session_config().config_options(),
&mut builder,
);
Ok(builder.finish())
}),
))
Expand Down
7 changes: 0 additions & 7 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use std::collections::{BTreeMap, HashMap};
use std::env;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

/*-************************************
* Catalog related
Expand Down Expand Up @@ -484,11 +482,6 @@ impl ConfigOptions {
Self { options }
}

/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(self))
}

/// Create new ConfigOptions struct, taking values from
/// environment variables where possible.
///
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
config_options: state.config_options(),
output_ordering: None,
},
&[],
Expand Down
Loading

0 comments on commit 07f4980

Please sign in to comment.