Skip to content

Commit

Permalink
feat: projection pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 10, 2022
1 parent 4c9760b commit 83fa64b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 12 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@ ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons whe

## Roadmap

### Bigtable

- ✅ UTF8 string
- [ ] 64-bit big-endian signed integer

### SQL
- ✅ select by `"_row_key" =`
- ✅ select by `"_row_key" IN`
- ✅ select by `"_row_key" BETWEEN`
- [ ] select by composite row keys (via `table_partition_cols` and `table_partition_separator`)
- [ ] Projection pushdown
- Projection pushdown
- [ ] Predicate push down ([Value range](https://cloud.google.com/bigtable/docs/using-filters#value-range))
- [ ] Limit Pushdown

Expand Down
10 changes: 10 additions & 0 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ mod tests {
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT * FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?;
expected = vec![
"+-------------------------------+-------------------------+----------+",
"| _row_key | _timestamp | pressure |",
"+-------------------------------+-------------------------+----------+",
"| us-west2#3698#2021-03-05-1200 | 2021-03-05 12:00:05.100 | 94558 |",
"+-------------------------------+-------------------------+----------+",
];
assert_batches_eq!(expected, &batches);

batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" IN ('us-west2#3698#2021-03-05-1200', 'us-west2#3698#2021-03-05-1201') ORDER BY \"_row_key\"").await?.collect().await?;
expected = vec![
"+-------------------------------+----------+-------------------------+",
Expand Down
39 changes: 28 additions & 11 deletions src/datasource/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,38 @@ use datafusion::scalar::ScalarValue;

pub fn compose(
datasource: BigtableDataSource,
_projection: &Option<Vec<usize>>,
projection: &Option<Vec<usize>>,
filters: &[Expr],
) -> Result<(Vec<RowRange>, Vec<RowFilter>)> {
let mut row_ranges = vec![];
let mut row_filters = if datasource.only_read_latest {
vec![RowFilter {
filter: Some(Filter::CellsPerColumnLimitFilter(1)),
}]
} else {
vec![]
};
row_filters.push(RowFilter {
let mut row_filters = vec![RowFilter {
filter: Some(Filter::FamilyNameRegexFilter(datasource.column_family)),
});
}];
if datasource.only_read_latest {
row_filters.push(RowFilter {
filter: Some(Filter::CellsPerColumnLimitFilter(1)),
});
}

let mut qualifiers: Vec<String> = vec![];
let fields = datasource.schema.fields();
match projection {
Some(positions) => {
for &position in positions {
let field = fields[position].clone();
if !datasource.table_partition_cols.contains(&field.name()) {
qualifiers.push(field.name().clone())
}
}
row_filters.push(RowFilter {
filter: Some(Filter::ColumnQualifierRegexFilter(
qualifiers.join("|").into_bytes(),
)),
});
}
_ => (),
}

let mut row_ranges = vec![];
for filter in filters {
match filter {
Expr::BinaryExpr { left, op, right } => match left.as_ref() {
Expand Down

0 comments on commit 83fa64b

Please sign in to comment.