Skip to content

Commit

Permalink
Parquet: omit min/max for interval columns when writing stats (apache…
Browse files Browse the repository at this point in the history
…#5147)

* Parquet: omit min/max for interval columns when writing stats

* Trigger
  • Loading branch information
Jefffrey authored Nov 30, 2023
1 parent 6d4b8bb commit f621d28
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 11 deletions.
7 changes: 5 additions & 2 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
use bytes::Bytes;
use half::f16;

use crate::basic::{Encoding, LogicalType, Type};
use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
Expand Down Expand Up @@ -137,7 +137,10 @@ pub struct ColumnValueEncoderImpl<T: DataType> {

impl<T: DataType> ColumnValueEncoderImpl<T> {
fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled == EnabledStatistics::Page {
if self.statistics_enabled == EnabledStatistics::Page
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
Expand Down
59 changes: 50 additions & 9 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// If only computing chunk-level statistics compute them here, page-level statistics
// are computed in [`Self::write_mini_batch`] and used to update chunk statistics in
// [`Self::add_data_page`]
if self.statistics_enabled == EnabledStatistics::Chunk {
if self.statistics_enabled == EnabledStatistics::Chunk
// INTERVAL has undefined sort order, so don't write min/max stats for it
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
match (min, max) {
(Some(min), Some(max)) => {
update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
Expand Down Expand Up @@ -1093,7 +1096,6 @@ fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
///
/// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with
/// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true`

fn update_stat<T: ParquetValueType, F>(
descr: &ColumnDescriptor,
val: &T,
Expand Down Expand Up @@ -3066,6 +3068,30 @@ mod tests {
Ok(())
}

#[test]
fn test_interval_stats_should_not_have_min_max() {
let input = [
vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
]
.into_iter()
.map(|s| ByteArray::from(s).into())
.collect::<Vec<_>>();

let page_writer = get_test_page_writer();
let mut writer = get_test_interval_column_writer(page_writer);
writer.write_batch(&input, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
stats.clone()
} else {
panic!("metadata missing statistics");
};
assert!(!stats.has_min_max_set());
}

fn write_multiple_pages<T: DataType>(
column_descr: &Arc<ColumnDescriptor>,
pages: &[&[Option<T::T>]],
Expand Down Expand Up @@ -3395,8 +3421,7 @@ mod tests {
values: &[FixedLenByteArray],
) -> ValueStatistics<FixedLenByteArray> {
let page_writer = get_test_page_writer();
let props = Default::default();
let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props);
let mut writer = get_test_float16_column_writer(page_writer);
writer.write_batch(values, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
Expand All @@ -3409,12 +3434,9 @@ mod tests {

fn get_test_float16_column_writer(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_float16_column_descr(max_def_level, max_rep_level));
let column_writer = get_column_writer(descr, props, page_writer);
let descr = Arc::new(get_test_float16_column_descr(0, 0));
let column_writer = get_column_writer(descr, Default::default(), page_writer);
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

Expand All @@ -3429,6 +3451,25 @@ mod tests {
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}

fn get_test_interval_column_writer(
page_writer: Box<dyn PageWriter>,
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_interval_column_descr());
let column_writer = get_column_writer(descr, Default::default(), page_writer);
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

fn get_test_interval_column_descr() -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe =
SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
.with_length(12)
.with_converted_type(ConvertedType::INTERVAL)
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
}

/// Returns column writer for UINT32 Column provided as ConvertedType only
fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
page_writer: Box<dyn PageWriter + 'a>,
Expand Down

0 comments on commit f621d28

Please sign in to comment.