Skip to content

Commit

Permalink
feat: support simple "_row_key = xxx" query
Browse files Browse the repository at this point in the history
  • Loading branch information
jychen7 committed Mar 7, 2022
1 parent 27436aa commit 364a986
Showing 1 changed file with 91 additions and 51 deletions.
142 changes: 91 additions & 51 deletions src/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::Expr;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::file_format::DEFAULT_PARTITION_COLUMN_DATATYPE;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::project_schema;
use datafusion::physical_plan::DisplayFormatType;
use datafusion::physical_plan::Distribution;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::physical_plan::Statistics;

use arrow::array::Int64Array;
// use arrow::array::Int64Array;
use arrow::array::ArrayRef;
use arrow::array::StringArray;
use arrow::array::TimestampMicrosecondArray;
use arrow::record_batch::RecordBatch;
Expand All @@ -36,7 +37,7 @@ use bigtable_rs::google::bigtable::v2::RowFilter;
use bigtable_rs::google::bigtable::v2::RowRange;
use bigtable_rs::google::bigtable::v2::RowSet;

use byteorder::{BigEndian, ByteOrder};
// use byteorder::{BigEndian, ByteOrder};

mod composer;

Expand Down Expand Up @@ -87,19 +88,18 @@ impl BigtableDataSource {
columns: Vec<Field>,
only_read_latest: bool,
) -> Result<Self> {
let mut table_fields = columns.clone();
let mut table_fields: Vec<Field> = vec![];
for part in &table_partition_cols {
table_fields.push(Field::new(
part,
DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
false,
));
table_fields.push(Field::new(part, DataType::Utf8, false));
}
table_fields.push(Field::new(
RESERVED_TIMESTAMP,
DataType::Timestamp(TimeUnit::Second, None),
DataType::Timestamp(TimeUnit::Microsecond, None),
false,
));
for column in &columns {
table_fields.push(column.clone());
}

match BigTableConnection::new(
&project,
Expand All @@ -116,14 +116,27 @@ impl BigtableDataSource {
table: table,
column_family: column_family,
table_partition_cols: table_partition_cols,
table_partition_separator: DEFAULT_SEPARATOR.to_string(),
table_partition_separator: DEFAULT_SEPARATOR.to_owned(),
only_read_latest: only_read_latest,
schema: Arc::new(Schema::new(table_fields)),
connection: connection,
}),
Err(err) => Err(DataFusionError::External(Box::new(err))),
}
}

fn is_qualifier(&self, field_name: &String) -> bool {
if field_name == RESERVED_ROWKEY {
return false;
}
if field_name == RESERVED_TIMESTAMP {
return false;
}
if self.table_partition_cols.contains(field_name) {
return false;
}
return true;
}
}

#[async_trait]
Expand Down Expand Up @@ -156,6 +169,7 @@ impl TableProvider for BigtableDataSource {
let (row_ranges, row_filters) = composer::compose(self.clone(), projection, filters);
Ok(Arc::new(BigtableExec::new(
self.clone(),
projection,
row_ranges,
row_filters,
)))
Expand All @@ -171,18 +185,22 @@ impl TableProvider for BigtableDataSource {
#[derive(Debug, Clone)]
struct BigtableExec {
datasource: BigtableDataSource,
projected_schema: SchemaRef,
row_ranges: Vec<RowRange>,
row_filters: Vec<RowFilter>,
}

impl BigtableExec {
fn new(
datasource: BigtableDataSource,
projection: &Option<Vec<usize>>,
row_ranges: Vec<RowRange>,
row_filters: Vec<RowFilter>,
) -> Self {
let projected_schema = project_schema(&datasource.schema, projection.as_ref()).unwrap();
Self {
datasource,
projected_schema,
row_ranges,
row_filters,
}
Expand All @@ -199,7 +217,7 @@ impl ExecutionPlan for BigtableExec {

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
self.datasource.schema.clone()
self.projected_schema.clone()
}

/// Specifies the output partitioning scheme of this plan
Expand Down Expand Up @@ -240,8 +258,7 @@ impl ExecutionPlan for BigtableExec {
) -> Result<SendableRecordBatchStream> {
let mut bigtable = self.datasource.connection.client();
let request = ReadRowsRequest {
app_profile_id: "default".to_owned(),
table_name: self.datasource.table.to_string(),
table_name: bigtable.get_full_table_name(&self.datasource.table),
rows: Some(RowSet {
row_keys: vec![],
row_ranges: self.row_ranges.clone(),
Expand All @@ -259,7 +276,7 @@ impl ExecutionPlan for BigtableExec {
Ok(resp) => {
let mut rowkey_timestamp_qualifier_value: HashMap<
String,
HashMap<i64, HashMap<String, i64>>,
HashMap<i64, HashMap<String, String>>,
> = HashMap::new();
resp.into_iter().for_each(|(key, data)| {
let row_key = String::from_utf8(key.clone()).unwrap();
Expand All @@ -275,7 +292,8 @@ impl ExecutionPlan for BigtableExec {
.or_insert(HashMap::new());

let qualifier = String::from_utf8(row_cell.qualifier).unwrap();
let cell_value = BigEndian::read_i64(&row_cell.value);
// let cell_value = BigEndian::read_i64(&row_cell.value);
let cell_value = String::from_utf8(row_cell.value).unwrap();
rowkey_timestamp_qualifier_value
.get_mut(&row_key)
.unwrap()
Expand All @@ -286,40 +304,59 @@ impl ExecutionPlan for BigtableExec {
})
});

let mut field_array = vec![
Field::new(RESERVED_ROWKEY, DataType::Utf8, false),
Field::new(RESERVED_TIMESTAMP, DataType::Utf8, false),
];
field_array.push(Field::new("pressure", DataType::Int64, false));

let mut row_key_array = vec![];
let mut timestamp_array = vec![];
let mut pressure_array = vec![];
let mut row_keys = vec![];
let mut timestamps = vec![];
let mut qualifier_values: HashMap<String, Vec<String>> = HashMap::new();
for field in self.projected_schema.fields() {
if self.datasource.is_qualifier(field.name()) {
qualifier_values.insert(field.name().clone(), vec![]);
}
}

for (row_key, timestamp_qualifier_value) in rowkey_timestamp_qualifier_value.iter()
{
for (timestamp, qualifier_value) in timestamp_qualifier_value.iter() {
row_key_array.push(row_key.to_owned());
timestamp_array.push(timestamp.to_owned());
pressure_array.push(
qualifier_value
.get(&"pressure".to_owned())
.unwrap()
.to_owned(),
);
row_keys.push(row_key.to_owned());
timestamps.push(timestamp.to_owned());

for field in self.projected_schema.fields() {
if self.datasource.is_qualifier(field.name()) {
let qualifier = field.name();
match qualifier_value.get(qualifier) {
Some(cell_value) => {
qualifier_values
.get_mut(qualifier)
.unwrap()
.push(cell_value.to_owned());
}
_ => {
qualifier_values
.get_mut(qualifier)
.unwrap()
.push("".to_owned());
}
}
}
}
}
}

let mut data: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(row_keys)),
Arc::new(TimestampMicrosecondArray::from(timestamps)),
];
for field in self.projected_schema.fields() {
if self.datasource.is_qualifier(field.name()) {
let qualifier = field.name();
data.push(Arc::new(StringArray::from(
qualifier_values.get(qualifier).unwrap().clone(),
)));
}
}

Ok(Box::pin(MemoryStream::try_new(
vec![RecordBatch::try_new(
Arc::new(Schema::new(field_array)),
vec![
Arc::new(StringArray::from(row_key_array)),
Arc::new(TimestampMicrosecondArray::from(timestamp_array)),
Arc::new(Int64Array::from(pressure_array)),
],
)?],
self.datasource.schema(),
vec![RecordBatch::try_new(self.schema(), data)?],
self.schema(),
None,
)?))
}
Expand Down Expand Up @@ -372,12 +409,13 @@ mod tests {
#[tokio::test]
async fn test_sql_query() -> Result<()> {
let bigtable_datasource = BigtableDataSource::new(
"emulator".to_string(),
"dev".to_string(),
"weather_balloons".to_string(),
"measurements".to_string(),
vec![RESERVED_ROWKEY.to_string()],
vec![Field::new("pressure", DataType::UInt64, false)],
"emulator".to_owned(),
"dev".to_owned(),
"weather_balloons".to_owned(),
"measurements".to_owned(),
vec![RESERVED_ROWKEY.to_owned()],
// vec![Field::new("pressure", DataType::Int64, false)],
vec![Field::new("pressure", DataType::Utf8, false)],
true,
)
.await
Expand All @@ -387,9 +425,11 @@ mod tests {
.unwrap();
let batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?;
let expected = vec![
"+----------+----------+------------+",
"| _row_key | pressure | _timestamp |",
"+----------+----------+------------+",
"+-------------------------------+----------+-------------------------+",
"| _row_key | pressure | _timestamp |",
"+-------------------------------+----------+-------------------------+",
"| us-west2#3698#2021-03-05-1200 | 94558 | 2022-03-07 07:15:17.700 |",
"+-------------------------------+----------+-------------------------+",
];
assert_batches_eq!(expected, &batches);
Ok(())
Expand Down

0 comments on commit 364a986

Please sign in to comment.