Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: respect time range when building parquet reader #3947

Merged
merged 5 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/common/recordbatch/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Util record batch stream wrapper that can perform precise filter.

use datafusion::logical_expr::{Expr, Operator};
use datafusion::logical_expr::{Expr, Literal, Operator};
use datafusion_common::arrow::array::{ArrayRef, Datum, Scalar};
use datafusion_common::arrow::buffer::BooleanBuffer;
use datafusion_common::arrow::compute::kernels::cmp;
Expand Down Expand Up @@ -43,6 +43,28 @@ pub struct SimpleFilterEvaluator {
}

impl SimpleFilterEvaluator {
pub fn new<T: Literal>(column_name: String, lit: T, op: Operator) -> Option<Self> {
match op {
Operator::Eq
| Operator::NotEq
| Operator::Lt
| Operator::LtEq
| Operator::Gt
| Operator::GtEq => {}
_ => return None,
}

let Expr::Literal(val) = lit.lit() else {
return None;
};

Some(Self {
column_name,
literal: val.to_scalar().ok()?,
op,
})
}

pub fn try_new(predicate: &Expr) -> Option<Self> {
match predicate {
Expr::BinaryExpr(binary) => {
Expand Down
19 changes: 10 additions & 9 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};

async fn check_prune_row_groups(expr: Expr, expected: &str) {
async fn check_prune_row_groups(exprs: Vec<Expr>, expected: &str) {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

Expand Down Expand Up @@ -55,7 +55,7 @@ async fn check_prune_row_groups(expr: Expr, expected: &str) {
.scan_to_stream(
region_id,
ScanRequest {
filters: vec![expr],
filters: exprs,
..Default::default()
},
)
Expand All @@ -70,7 +70,9 @@ async fn test_read_parquet_stats() {
common_telemetry::init_default_ut_logging();

check_prune_row_groups(
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))),
vec![
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None)))
],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
Expand All @@ -94,7 +96,7 @@ async fn test_read_parquet_stats() {
async fn test_prune_tag() {
// prune result: only row group 1&2
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
vec![datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string()))))],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
Expand All @@ -114,18 +116,17 @@ async fn test_prune_tag_and_field() {
common_telemetry::init_default_ut_logging();
// prune result: only row group 1
check_prune_row_groups(
col("tag_0")
.gt(lit(ScalarValue::Utf8(Some("4".to_string()))))
.and(col("field_0").lt(lit(8.0))),
vec![
col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
col("field_0").lt(lit(8.0)),
],
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_runtime::JoinError;
use common_time::Timestamp;
use datatypes::arrow::error::ArrowError;
use datatypes::prelude::ConcreteDataType;
use object_store::ErrorKind;
Expand Down Expand Up @@ -693,6 +694,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
BuildTimeRangeFilter {
timestamp: Timestamp,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -802,6 +810,7 @@ impl ErrorExt for Error {
EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal,
ChecksumMismatch { .. } => StatusCode::Unexpected,
RegionStopped { .. } => StatusCode::RegionNotReady,
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
}
}

Expand Down
13 changes: 8 additions & 5 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;

Expand Down Expand Up @@ -235,7 +235,7 @@ impl ScanRegion {
}

/// Creates a scan input.
fn scan_input(self, filter_deleted: bool) -> Result<ScanInput> {
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
let time_range = self.build_time_range_predicate();

let ssts = &self.version.ssts;
Expand Down Expand Up @@ -300,16 +300,19 @@ impl ScanRegion {
}

/// Build time range predicate from filters.
fn build_time_range_predicate(&self) -> TimestampRange {
fn build_time_range_predicate(&mut self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
.column_schema
.data_type
.as_timestamp()
.expect("Time index must have timestamp-compatible type")
.unit();
TimeRangePredicateBuilder::new(&time_index.column_schema.name, unit, &self.request.filters)
.build()
build_time_range_predicate(
&time_index.column_schema.name,
unit,
&mut self.request.filters,
)
}

/// Use the latest schema to build the index applier.
Expand Down
80 changes: 78 additions & 2 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use datafusion_expr::Expr;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::{Expr, Operator};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
Expand All @@ -38,6 +41,7 @@ use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::cache::CacheManagerRef;
use crate::error;
use crate::error::{
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadParquetSnafu, Result,
};
Expand Down Expand Up @@ -225,7 +229,7 @@ impl ParquetReaderBuilder {

metrics.build_cost = start.elapsed();

let filters = if let Some(predicate) = &self.predicate {
let mut filters = if let Some(predicate) = &self.predicate {
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
predicate
.exprs()
.iter()
Expand All @@ -240,6 +244,11 @@ impl ParquetReaderBuilder {
} else {
vec![]
};

if let Some(time_range) = &self.time_range {
filters.extend(time_range_to_predicate(*time_range, &region_meta)?);
}

let codec = McmpRowCodec::new(
read_format
.metadata()
Expand Down Expand Up @@ -449,6 +458,59 @@ impl ParquetReaderBuilder {
}
}

/// Transforms time range into [SimpleFilterEvaluator].
fn time_range_to_predicate(
time_range: TimestampRange,
metadata: &RegionMetadataRef,
) -> Result<Vec<SimpleFilterContext>> {
let ts_col = metadata.time_index_column();
let ts_col_id = ts_col.column_id;

let ts_to_filter = |op: Operator, timestamp: &Timestamp| {
let value = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
TimeUnit::Millisecond => {
ScalarValue::TimestampMillisecond(Some(timestamp.value()), None)
}
TimeUnit::Microsecond => {
ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None)
}
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op)
.context(error::BuildTimeRangeFilterSnafu {
timestamp: *timestamp,
})?;
Ok(SimpleFilterContext::new(
evaluator,
ts_col_id,
SemanticType::Timestamp,
ts_col.column_schema.data_type.clone(),
))
};

let predicates = match (time_range.start(), time_range.end()) {
(Some(start), Some(end)) => {
vec![
ts_to_filter(Operator::GtEq, start)?,
ts_to_filter(Operator::Lt, end)?,
]
}

(Some(start), None) => {
vec![ts_to_filter(Operator::GtEq, start)?]
}

(None, Some(end)) => {
vec![ts_to_filter(Operator::Lt, end)?]
}
(None, None) => {
vec![]
}
};
Ok(predicates)
}

/// Parquet reader metrics.
#[derive(Debug, Default)]
struct Metrics {
Expand Down Expand Up @@ -570,6 +632,20 @@ pub(crate) struct SimpleFilterContext {
}

impl SimpleFilterContext {
fn new(
filter: SimpleFilterEvaluator,
column_id: ColumnId,
semantic_type: SemanticType,
data_type: ConcreteDataType,
) -> Self {
Self {
filter,
column_id,
semantic_type,
data_type,
}
}

/// Creates a context for the `expr`.
///
/// Returns None if the column to filter doesn't exist in the SST metadata or the
Expand Down
10 changes: 5 additions & 5 deletions src/query/src/tests/time_range_filter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datatypes::vectors::{Int64Vector, TimestampMillisecondVector};
use store_api::data_source::{DataSource, DataSourceRef};
use store_api::storage::ScanRequest;
use table::metadata::FilterPushDownType;
use table::predicate::TimeRangePredicateBuilder;
use table::predicate::build_time_range_predicate;
use table::test_util::MemTable;
use table::{Table, TableRef};

Expand Down Expand Up @@ -114,14 +114,14 @@ struct TimeRangeTester {
impl TimeRangeTester {
async fn check(&self, sql: &str, expect: TimestampRange) {
let _ = exec_selection(self.engine.clone(), sql).await;
let filters = self.get_filters();
let mut filters = self.take_filters();

let range = TimeRangePredicateBuilder::new("ts", TimeUnit::Millisecond, &filters).build();
let range = build_time_range_predicate("ts", TimeUnit::Millisecond, &mut filters);
assert_eq!(expect, range);
}

fn get_filters(&self) -> Vec<Expr> {
self.filter.write().unwrap().drain(..).collect()
fn take_filters(&self) -> Vec<Expr> {
std::mem::take(&mut self.filter.write().unwrap())
}
}

Expand Down
Loading