Skip to content

Commit

Permalink
Use defaults for ListingOptions builder (apache#4243)
Browse files Browse the repository at this point in the history
* add from_options and use builder

* Don't specify defaults

* Fix doc

* cargo fmt

* Remove from_options

* Format
  • Loading branch information
mvanschellebeeck committed Nov 16, 2022
1 parent b935cff commit 715ed5d
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 46 deletions.
3 changes: 1 addition & 2 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ async fn get_table(
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
.with_collect_stat(ctx.config.collect_statistics)
.with_table_partition_cols(vec![]);
.with_collect_stat(ctx.config.collect_statistics);

let table_path = ListingTableUrl::parse(path)?;
let config = ListingTableConfig::new(table_path).with_listing_options(options);
Expand Down
5 changes: 1 addition & 4 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ async fn main() -> Result<()> {
// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext())
.with_table_partition_cols(vec![])
.with_collect_stat(true)
.with_target_partitions(1);
.with_file_extension(FileType::PARQUET.get_ext());

// Register a listing table - this will use all files in the directory as data sources
// for the query
Expand Down
31 changes: 10 additions & 21 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,9 @@ impl ListingTableConfig {
let (format, file_extension) =
ListingTableConfig::infer_format(file.location.as_ref())?;

let listing_options = ListingOptions {
format,
collect_stat: true,
file_extension,
target_partitions: ctx.config.target_partitions,
table_partition_cols: vec![],
file_sort_order: None,
};
let listing_options = ListingOptions::new(format)
.with_file_extension(file_extension)
.with_target_partitions(ctx.config.target_partitions);

Ok(Self {
table_paths: self.table_paths,
Expand Down Expand Up @@ -254,6 +249,7 @@ impl ListingOptions {
file_sort_order: None,
}
}

/// Set file extension on [`ListingOptions`] and returns self.
///
/// ```
Expand Down Expand Up @@ -322,7 +318,7 @@ impl ListingOptions {
self
}

/// Set number of target partitions on [`ListingOptions`] and returns self.
/// Set file sort order on [`ListingOptions`] and returns self.
///
/// ```
/// use std::sync::Arc;
Expand Down Expand Up @@ -773,10 +769,8 @@ mod tests {
];

for (file_sort_order, expected_result) in cases {
let options = ListingOptions {
file_sort_order,
..options.clone()
};
let options = options.clone().with_file_sort_order(file_sort_order);

let config = ListingTableConfig::new(table_path.clone())
.with_listing_options(options)
.with_schema(schema.clone());
Expand Down Expand Up @@ -814,8 +808,7 @@ mod tests {
let opt = ListingOptions::new(Arc::new(AvroFormat {}))
.with_file_extension(FileType::AVRO.get_ext())
.with_table_partition_cols(vec![String::from("p1")])
.with_target_partitions(4)
.with_collect_stat(true);
.with_target_partitions(4);

let table_path = ListingTableUrl::parse("test:https:///table/").unwrap();
let file_schema =
Expand Down Expand Up @@ -1013,9 +1006,7 @@ mod tests {

let opt = ListingOptions::new(Arc::new(format))
.with_file_extension("")
.with_table_partition_cols(vec![])
.with_target_partitions(target_partitions)
.with_collect_stat(true);
.with_target_partitions(target_partitions);

let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);

Expand Down Expand Up @@ -1048,9 +1039,7 @@ mod tests {

let opt = ListingOptions::new(Arc::new(format))
.with_file_extension("")
.with_table_partition_cols(vec![])
.with_target_partitions(target_partitions)
.with_collect_stat(true);
.with_target_partitions(target_partitions);

let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);

Expand Down
7 changes: 2 additions & 5 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ impl TableProviderFactory for ListingTableFactory {
FileType::JSON => Arc::new(JsonFormat::default()),
};

let options = ListingOptions::new(file_format)
.with_collect_stat(true)
.with_file_extension(file_extension)
.with_target_partitions(1)
.with_table_partition_cols(vec![]);
let options =
ListingOptions::new(file_format).with_file_extension(file_extension);

let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = options.infer_schema(state, &table_path).await?;
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ impl<'a> CsvReadOptions<'a> {
.with_file_compression_type(self.file_compression_type.to_owned());

ListingOptions::new(Arc::new(file_format))
.with_collect_stat(false)
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
Expand Down Expand Up @@ -218,7 +217,6 @@ impl<'a> ParquetReadOptions<'a> {
.with_skip_metadata(self.skip_metadata);

ListingOptions::new(Arc::new(file_format))
.with_collect_stat(true)
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
Expand Down Expand Up @@ -265,7 +263,6 @@ impl<'a> AvroReadOptions<'a> {
let file_format = AvroFormat::default();

ListingOptions::new(Arc::new(file_format))
.with_collect_stat(false)
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
Expand Down Expand Up @@ -336,7 +333,6 @@ impl<'a> NdJsonReadOptions<'a> {
.with_file_compression_type(self.file_compression_type.to_owned());

ListingOptions::new(Arc::new(file_format))
.with_collect_stat(false)
.with_file_extension(self.file_extension)
.with_target_partitions(target_partitions)
.with_table_partition_cols(self.table_partition_cols.clone())
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/tests/path_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,9 @@ async fn register_partitioned_alltypes_parquet(
);

let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
.with_table_partition_cols(partition_cols.iter().map(|&s| s.to_owned()).collect())
.with_collect_stat(true);
.with_table_partition_cols(
partition_cols.iter().map(|&s| s.to_owned()).collect(),
);

let table_path = ListingTableUrl::parse(table_path).unwrap();
let store_path =
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/tests/sql/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ async fn parquet_with_sort_order_specified() {
let target_partitions = 2;

// The sort order is not specified
let options_no_sort = ListingOptions {
file_sort_order: None,
..parquet_read_options.to_listing_options(target_partitions)
};
let options_no_sort = parquet_read_options
.to_listing_options(target_partitions)
.with_file_sort_order(None);

// The sort order is specified (not actually correct in this case)
let file_sort_order = [col("string_col"), col("int_col")]
Expand All @@ -72,10 +71,9 @@ async fn parquet_with_sort_order_specified() {
})
.collect::<Vec<_>>();

let options_sort = ListingOptions {
file_sort_order: Some(file_sort_order),
..parquet_read_options.to_listing_options(target_partitions)
};
let options_sort = parquet_read_options
.to_listing_options(target_partitions)
.with_file_sort_order(Some(file_sort_order));

// This string appears in ParquetExec if the output ordering is
// specified
Expand Down

0 comments on commit 715ed5d

Please sign in to comment.