Skip to content

Commit

Permalink
feat: support composite row keys with "BETWEEN" (only last table_part…
Browse files Browse the repository at this point in the history
…ition_cols)
  • Loading branch information
jychen7 committed Mar 12, 2022
1 parent ee1ed77 commit 249cfcd
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 41 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons whe
- ✅ select by `"_row_key" BETWEEN`
- ✅ select by composite row keys `=`
- ✅ select by composite row keys `IN`
- [ ] select by composite row keys `BETWEEN`
- select by composite row keys `BETWEEN` (only supported by last table_partition_cols)

### General

- ✅ Projection pushdown
- [ ] Predicate push down
+ [Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range)
+ [Value Regex](https://cloud.google.com/bigtable/docs/using-filters#value-regex)
+ [Timestamp range](https://cloud.google.com/bigtable/docs/using-filters#timestamp-range))
+ [Timestamp range](https://cloud.google.com/bigtable/docs/using-filters#timestamp-range)
- [ ] Multi Thread or Partition aware execution
- [ ] Production ready Bigtable SDK in Rust

Expand Down
11 changes: 11 additions & 0 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,17 @@ mod tests {
"+----------+------------+-----------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT region, balloon_id, event_minute, pressure, \"_timestamp\" FROM weather_balloons where region = 'us-west2' and balloon_id IN ('3698') and event_minute BETWEEN '2021-03-05-1200' AND '2021-03-05-1201' ORDER BY \"_timestamp\"").await?.collect().await?;
expected = vec![
"+----------+------------+-----------------+----------+-------------------------+",
"| region | balloon_id | event_minute | pressure | _timestamp |",
"+----------+------------+-----------------+----------+-------------------------+",
"| us-west2 | 3698 | 2021-03-05-1200 | 94558 | 2021-03-05 12:00:05.100 |",
"| us-west2 | 3698 | 2021-03-05-1201 | 94122 | 2021-03-05 12:01:05.200 |",
"+----------+------------+-----------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);
Ok(())
}
}
100 changes: 61 additions & 39 deletions src/datasource/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ pub fn compose(
_ => (),
}

let mut row_ranges = vec![];
let mut table_partition_col_mapping: HashMap<String, Vec<String>> = HashMap::new();
let mut partition_col_values: HashMap<String, Vec<String>> = HashMap::new();
let mut tail_partition_col_range: Option<(String, String)> = None;

for filter in filters {
match filter {
Expand All @@ -54,10 +54,10 @@ pub fn compose(
match op {
Operator::Eq => match right.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
table_partition_col_mapping
partition_col_values
.entry(col.name.to_owned())
.or_insert(vec![]);
table_partition_col_mapping
partition_col_values
.get_mut(&col.name)
.unwrap()
.push(key.clone())
Expand All @@ -76,19 +76,14 @@ pub fn compose(
negated,
} => match expr.as_ref() {
Expr::Column(col) => {
if datasource.table_partition_cols.contains(&col.name) {
if negated.to_owned() {
return Err(DataFusionError::Execution(
"_row_key: filter NOT IN is not supported".to_owned(),
));
}
if datasource.table_partition_cols.contains(&col.name) && !negated.to_owned() {
for right in list {
match right {
Expr::Literal(ScalarValue::Utf8(Some(key))) => {
table_partition_col_mapping
partition_col_values
.entry(col.name.to_owned())
.or_insert(vec![]);
table_partition_col_mapping
partition_col_values
.get_mut(&col.name)
.unwrap()
.push(key.clone())
Expand All @@ -107,24 +102,16 @@ pub fn compose(
high,
} => match expr.as_ref() {
Expr::Column(col) => {
if datasource.table_partition_cols.contains(&col.name) {
if negated.to_owned() {
return Err(DataFusionError::Execution(
"_row_key: filter NOT IN is not supported".to_owned(),
));
}
if datasource.table_partition_cols.last().unwrap() == &col.name
&& !negated.to_owned()
{
match low.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(low_key))) => {
match high.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(high_key))) => row_ranges
.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
low_key.clone().into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(
high_key.clone().into_bytes(),
)),
}),
Expr::Literal(ScalarValue::Utf8(Some(high_key))) => {
tail_partition_col_range =
Some((low_key.clone(), high_key.clone()));
}
_ => (),
}
}
Expand All @@ -138,10 +125,23 @@ pub fn compose(
}
}

if !table_partition_col_mapping.is_empty() {
let mut row_ranges = vec![];
if partition_col_values.is_empty() {
match tail_partition_col_range {
Some((low, high)) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(low.into_bytes())),
end_key: Some(EndKey::EndKeyClosed(high.into_bytes())),
});
}
_ => (),
}
} else {
let mut batch_parts: Vec<Vec<String>> = vec![];
for table_partition_col in datasource.table_partition_cols {
match table_partition_col_mapping.get(&table_partition_col) {
let tail_partition_col = datasource.table_partition_cols.last().unwrap();

for table_partition_col in datasource.table_partition_cols.clone() {
match partition_col_values.get(&table_partition_col) {
Some(list) => {
if batch_parts.is_empty() {
// initialize
Expand All @@ -158,32 +158,54 @@ pub fn compose(
}
}
_ => {
return Err(DataFusionError::Execution(format!(
"{}: filter is required",
table_partition_col
)));
if &table_partition_col == tail_partition_col
&& tail_partition_col_range.is_none()
{
return Err(DataFusionError::Execution(format!(
"{}: filter is required",
table_partition_col
)));
}
// tail_partition_col_range will be used when join batch_parts later
}
}
}

for parts in batch_parts {
let key = parts.join(&datasource.table_partition_separator);
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(key.clone().into_bytes())),
end_key: Some(EndKey::EndKeyClosed(key.clone().into_bytes())),
});
match tail_partition_col_range {
Some((ref low, ref high)) => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(
(key.clone() + &datasource.table_partition_separator + &low)
.into_bytes(),
)),
end_key: Some(EndKey::EndKeyClosed(
(key.clone() + &datasource.table_partition_separator + &high)
.into_bytes(),
)),
});
}
_ => {
row_ranges.push(RowRange {
start_key: Some(StartKey::StartKeyClosed(key.clone().into_bytes())),
end_key: Some(EndKey::EndKeyClosed(key.clone().into_bytes())),
});
}
}
}
}

if row_ranges.is_empty() {
return Err(DataFusionError::Execution(
"_row_key: filter is not provided or not supported".to_owned(),
"table_partition_cols: filter is not provided or not supported".to_owned(),
));
}

Ok((row_ranges, row_filters))
}

// https://gist.github.com/kylewlacy/115965b40e02a3325558
pub fn partial_cartesian<T: Clone>(a: Vec<Vec<T>>, b: &[T]) -> Vec<Vec<T>> {
a.into_iter()
.flat_map(|xs| {
Expand Down

0 comments on commit 249cfcd

Please sign in to comment.