diff --git a/src/datasource.rs b/src/datasource.rs index df26f83..4e09def 100644 --- a/src/datasource.rs +++ b/src/datasource.rs @@ -13,9 +13,9 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::logical_plan::Expr; use datafusion::physical_plan::expressions::PhysicalSortExpr; -use datafusion::physical_plan::file_format::DEFAULT_PARTITION_COLUMN_DATATYPE; use datafusion::physical_plan::memory::MemoryStream; use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::project_schema; use datafusion::physical_plan::DisplayFormatType; use datafusion::physical_plan::Distribution; use datafusion::physical_plan::ExecutionPlan; @@ -23,7 +23,8 @@ use datafusion::physical_plan::Partitioning; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::Statistics; -use arrow::array::Int64Array; +// use arrow::array::Int64Array; +use arrow::array::ArrayRef; use arrow::array::StringArray; use arrow::array::TimestampMicrosecondArray; use arrow::record_batch::RecordBatch; @@ -36,7 +37,7 @@ use bigtable_rs::google::bigtable::v2::RowFilter; use bigtable_rs::google::bigtable::v2::RowRange; use bigtable_rs::google::bigtable::v2::RowSet; -use byteorder::{BigEndian, ByteOrder}; +// use byteorder::{BigEndian, ByteOrder}; mod composer; @@ -87,19 +88,18 @@ impl BigtableDataSource { columns: Vec, only_read_latest: bool, ) -> Result { - let mut table_fields = columns.clone(); + let mut table_fields: Vec = vec![]; for part in &table_partition_cols { - table_fields.push(Field::new( - part, - DEFAULT_PARTITION_COLUMN_DATATYPE.clone(), - false, - )); + table_fields.push(Field::new(part, DataType::Utf8, false)); } table_fields.push(Field::new( RESERVED_TIMESTAMP, - DataType::Timestamp(TimeUnit::Second, None), + DataType::Timestamp(TimeUnit::Microsecond, None), false, )); + for column in &columns { + table_fields.push(column.clone()); + } match BigTableConnection::new( &project, @@ -116,7 +116,7 @@ impl BigtableDataSource { table: table, column_family: column_family, table_partition_cols: table_partition_cols, - table_partition_separator: DEFAULT_SEPARATOR.to_string(), + table_partition_separator: DEFAULT_SEPARATOR.to_owned(), only_read_latest: only_read_latest, schema: Arc::new(Schema::new(table_fields)), connection: connection, @@ -124,6 +124,19 @@ impl BigtableDataSource { Err(err) => Err(DataFusionError::External(Box::new(err))), } } + + fn is_qualifier(&self, field_name: &String) -> bool { + if field_name == RESERVED_ROWKEY { + return false; + } + if field_name == RESERVED_TIMESTAMP { + return false; + } + if self.table_partition_cols.contains(field_name) { + return false; + } + return true; + } } #[async_trait] @@ -156,6 +169,7 @@ impl TableProvider for BigtableDataSource { let (row_ranges, row_filters) = composer::compose(self.clone(), projection, filters); Ok(Arc::new(BigtableExec::new( self.clone(), + projection, row_ranges, row_filters, ))) @@ -171,6 +185,7 @@ impl TableProvider for BigtableDataSource { #[derive(Debug, Clone)] struct BigtableExec { datasource: BigtableDataSource, + projected_schema: SchemaRef, row_ranges: Vec, row_filters: Vec, } @@ -178,11 +193,14 @@ struct BigtableExec { impl BigtableExec { fn new( datasource: BigtableDataSource, + projection: &Option>, row_ranges: Vec, row_filters: Vec, ) -> Self { + let projected_schema = project_schema(&datasource.schema, projection.as_ref()).unwrap(); Self { datasource, + projected_schema, row_ranges, row_filters, } @@ -199,7 +217,7 @@ impl ExecutionPlan for BigtableExec { /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { - self.datasource.schema.clone() + self.projected_schema.clone() } /// Specifies the output partitioning scheme of this plan @@ -240,8 +258,7 @@ impl ExecutionPlan for BigtableExec { ) -> Result { let mut bigtable = self.datasource.connection.client(); let request = ReadRowsRequest { - app_profile_id: "default".to_owned(), - table_name: self.datasource.table.to_string(), + table_name: bigtable.get_full_table_name(&self.datasource.table), rows: Some(RowSet { row_keys: vec![], row_ranges: self.row_ranges.clone(), @@ -259,7 +276,7 @@ impl ExecutionPlan for BigtableExec { Ok(resp) => { let mut rowkey_timestamp_qualifier_value: HashMap< String, - HashMap>, + HashMap>, > = HashMap::new(); resp.into_iter().for_each(|(key, data)| { let row_key = String::from_utf8(key.clone()).unwrap(); @@ -275,7 +292,8 @@ impl ExecutionPlan for BigtableExec { .or_insert(HashMap::new()); let qualifier = String::from_utf8(row_cell.qualifier).unwrap(); - let cell_value = BigEndian::read_i64(&row_cell.value); + // let cell_value = BigEndian::read_i64(&row_cell.value); + let cell_value = String::from_utf8(row_cell.value).unwrap(); rowkey_timestamp_qualifier_value .get_mut(&row_key) .unwrap() @@ -286,40 +304,59 @@ impl ExecutionPlan for BigtableExec { }) }); - let mut field_array = vec![ - Field::new(RESERVED_ROWKEY, DataType::Utf8, false), - Field::new(RESERVED_TIMESTAMP, DataType::Utf8, false), - ]; - field_array.push(Field::new("pressure", DataType::Int64, false)); - - let mut row_key_array = vec![]; - let mut timestamp_array = vec![]; - let mut pressure_array = vec![]; + let mut row_keys = vec![]; + let mut timestamps = vec![]; + let mut qualifier_values: HashMap> = HashMap::new(); + for field in self.projected_schema.fields() { + if self.datasource.is_qualifier(field.name()) { + qualifier_values.insert(field.name().clone(), vec![]); + } + } for (row_key, timestamp_qualifier_value) in rowkey_timestamp_qualifier_value.iter() { for (timestamp, qualifier_value) in timestamp_qualifier_value.iter() { - row_key_array.push(row_key.to_owned()); - timestamp_array.push(timestamp.to_owned()); - pressure_array.push( - qualifier_value - .get(&"pressure".to_owned()) - .unwrap() - .to_owned(), - ); + row_keys.push(row_key.to_owned()); + timestamps.push(timestamp.to_owned()); + + for field in self.projected_schema.fields() { + if self.datasource.is_qualifier(field.name()) { + let qualifier = field.name(); + match qualifier_value.get(qualifier) { + Some(cell_value) => { + qualifier_values + .get_mut(qualifier) + .unwrap() + .push(cell_value.to_owned()); + } + _ => { + qualifier_values + .get_mut(qualifier) + .unwrap() + .push("".to_owned()); + } + } + } + } + } + } + + let mut data: Vec = vec![ + Arc::new(StringArray::from(row_keys)), + Arc::new(TimestampMicrosecondArray::from(timestamps)), + ]; + for field in self.projected_schema.fields() { + if self.datasource.is_qualifier(field.name()) { + let qualifier = field.name(); + data.push(Arc::new(StringArray::from( + qualifier_values.get(qualifier).unwrap().clone(), + ))); } } Ok(Box::pin(MemoryStream::try_new( - vec![RecordBatch::try_new( - Arc::new(Schema::new(field_array)), - vec![ - Arc::new(StringArray::from(row_key_array)), - Arc::new(TimestampMicrosecondArray::from(timestamp_array)), - Arc::new(Int64Array::from(pressure_array)), - ], - )?], - self.datasource.schema(), + vec![RecordBatch::try_new(self.schema(), data)?], + self.schema(), None, )?)) } @@ -372,12 +409,13 @@ mod tests { #[tokio::test] async fn test_sql_query() -> Result<()> { let bigtable_datasource = BigtableDataSource::new( - "emulator".to_string(), - "dev".to_string(), - "weather_balloons".to_string(), - "measurements".to_string(), - vec![RESERVED_ROWKEY.to_string()], - vec![Field::new("pressure", DataType::UInt64, false)], + "emulator".to_owned(), + "dev".to_owned(), + "weather_balloons".to_owned(), + "measurements".to_owned(), + vec![RESERVED_ROWKEY.to_owned()], + // vec![Field::new("pressure", DataType::Int64, false)], + vec![Field::new("pressure", DataType::Utf8, false)], true, ) .await @@ -387,9 +425,11 @@ mod tests { .unwrap(); let batches = ctx.sql("SELECT \"_row_key\", pressure, \"_timestamp\" FROM weather_balloons where \"_row_key\" = 'us-west2#3698#2021-03-05-1200'").await?.collect().await?; let expected = vec![ - "+----------+----------+------------+", - "| _row_key | pressure | _timestamp |", - "+----------+----------+------------+", + "+-------------------------------+----------+-------------------------+", + "| _row_key | pressure | _timestamp |", + "+-------------------------------+----------+-------------------------+", + "| us-west2#3698#2021-03-05-1200 | 94558 | 2022-03-07 07:15:17.700 |", + "+-------------------------------+----------+-------------------------+", ]; assert_batches_eq!(expected, &batches); Ok(())