Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: query engine impl on datafusion #10

Merged
merged 12 commits into from
Apr 26, 2022
1,464 changes: 1,374 additions & 90 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[workspace]
members = [
"src/common/base",
"src/common/query",
"src/common/recordbatch",
"src/datanode",
"src/datatypes",
"src/log-store",
Expand Down
7 changes: 7 additions & 0 deletions src/common/query/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[package]
name = "common-query"
version = "0.1.0"
edition = "2021"

[dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
1 change: 1 addition & 0 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod logical_plan;
18 changes: 18 additions & 0 deletions src/common/query/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use datafusion::logical_plan::Expr as DfExpr;

/// Central struct of query API.
/// Represent logical expressions such as `A + 1`, or `CAST(c1 AS int)`.
#[derive(Clone, PartialEq, Hash)]
pub struct Expr {
df_expr: DfExpr,
}

impl Expr {
pub fn new(df_expr: DfExpr) -> Self {
Self { df_expr }
}

pub fn df_expr(&self) -> &DfExpr {
&self.df_expr
}
}
3 changes: 3 additions & 0 deletions src/common/query/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod expr;

pub use self::expr::Expr;
16 changes: 16 additions & 0 deletions src/common/recordbatch/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "common-recordbatch"
version = "0.1.0"
edition = "2021"

[dependencies.arrow]
package = "arrow2"
version="0.10"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]

[dependencies]
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2"}
datatypes = {path ="../../datatypes" }
futures = "0.3"
snafu = "0.7.0"
10 changes: 10 additions & 0 deletions src/common/recordbatch/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use arrow::error::ArrowError;
use snafu::Snafu;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Arrow error: {}", source))]
Arrow { source: ArrowError },
}
pub type Result<T> = std::result::Result<T, Error>;
15 changes: 15 additions & 0 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
pub mod error;
mod recordbatch;

use std::pin::Pin;

use datatypes::schema::SchemaRef;
use error::Result;
use futures::Stream;
pub use recordbatch::RecordBatch;

pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}

pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
10 changes: 10 additions & 0 deletions src/common/recordbatch/src/recordbatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use std::sync::Arc;

use datafusion_common::record_batch::RecordBatch as DfRecordBatch;
use datatypes::schema::Schema;

#[derive(Clone, Debug)]
pub struct RecordBatch {
pub schema: Arc<Schema>,
pub df_recordbatch: DfRecordBatch,
}
3 changes: 1 addition & 2 deletions src/datatypes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ name = "datatypes"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow2 = "0.10"
common-base = { path = "../common/base" }
paste = "1.0"
serde ={ version = "1.0.136", features = ["derive"] }
2 changes: 1 addition & 1 deletion src/datatypes/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
mod data_type;
pub mod prelude;
mod scalars;
mod schema;
pub mod type_id;
mod types;
pub mod value;
Expand All @@ -13,3 +12,4 @@ use arrow2::array::{BinaryArray, MutableBinaryArray};

pub type LargeBinaryArray = BinaryArray<i64>;
pub type MutableLargeBinaryArray = MutableBinaryArray<i64>;
pub mod schema;
24 changes: 24 additions & 0 deletions src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
use std::sync::Arc;

use arrow2::datatypes::Schema as ArrowSchema;

#[derive(Debug, Clone)]
pub struct Schema {
arrow_schema: Arc<ArrowSchema>,
}

impl Schema {
pub fn new(arrow_schema: Arc<ArrowSchema>) -> Self {
Self { arrow_schema }
}
pub fn arrow_schema(&self) -> &Arc<ArrowSchema> {
&self.arrow_schema
}
}

pub type SchemaRef = Arc<Schema>;

impl From<Arc<ArrowSchema>> for Schema {
fn from(s: Arc<ArrowSchema>) -> Schema {
Schema::new(s)
}
}
8 changes: 8 additions & 0 deletions src/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1"
common-recordbatch = {path = "../common/recordbatch" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git" , branch = "arrow2", features = ["simd"]}
datatypes = {path = "../datatypes" }
futures = "0.3"
snafu = "0.7.0"
table = { path = "../table" }
tokio = "1.0"
39 changes: 39 additions & 0 deletions src/query/src/catalog.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
pub mod schema;
use std::any::Any;
use std::sync::Arc;

use crate::catalog::schema::SchemaProvider;

/// Represent a list of named catalogs
pub trait CatalogList: Sync + Send {
/// Returns the catalog list as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Adds a new catalog to this catalog list
/// If a catalog of the same name existed before, it is replaced in the list and returned.
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;

/// Retrieves the list of available catalog names
fn catalog_names(&self) -> Vec<String>;

/// Retrieves a specific catalog by name, provided it exists.
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}

/// Represents a catalog, comprising a number of named schemas.
pub trait CatalogProvider: Sync + Send {
/// Returns the catalog provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available schema names in this catalog.
fn schema_names(&self) -> Vec<String>;

/// Retrieves a specific schema from the catalog by name, provided it exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
}
35 changes: 35 additions & 0 deletions src/query/src/catalog/schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::any::Any;

use table::TableRef;

use crate::error::Result;

/// Represents a schema, comprising a number of named tables.
pub trait SchemaProvider: Sync + Send {
/// Returns the schema provider as [`Any`](std::any::Any)
/// so that it can be downcast to a specific implementation.
fn as_any(&self) -> &dyn Any;

/// Retrieves the list of available table names in this schema.
fn table_names(&self) -> Vec<String>;

/// Retrieves a specific table from the schema by name, provided it exists.
fn table(&self, name: &str) -> Option<TableRef>;

/// If supported by the implementation, adds a new table to this schema.
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, _name: String, _table: TableRef) -> Result<Option<TableRef>> {
todo!();
}

/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, _name: &str) -> Result<Option<TableRef>> {
todo!();
}

/// If supported by the implementation, checks the table exist in the schema provider or not.
/// If no matched table in the schema provider, return false.
/// Otherwise, return true.
fn table_exist(&self, name: &str) -> bool;
}
1 change: 1 addition & 0 deletions src/query/src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

20 changes: 20 additions & 0 deletions src/query/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use datafusion::error::DataFusionError;
use snafu::Snafu;

/// business error of query engine
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Datafusion query engine error: {}", source))]
Datafusion { source: DataFusionError },
#[snafu(display("PhysicalPlan downcast_ref failed"))]
PhysicalPlanDowncast,
}

pub type Result<T> = std::result::Result<T, Error>;

impl From<Error> for DataFusionError {
fn from(e: Error) -> DataFusionError {
DataFusionError::External(Box::new(e))
}
}
8 changes: 8 additions & 0 deletions src/query/src/executor.rs
Original file line number Diff line number Diff line change
@@ -1 +1,9 @@
use std::sync::Arc;

use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};

/// Executor to run [ExecutionPlan].
#[async_trait::async_trait]
pub trait QueryExecutor {
async fn execute_stream(&self, ctx: &QueryContext, plan: &Arc<dyn PhysicalPlan>) -> Result<()>;
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 5 additions & 0 deletions src/query/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
pub mod catalog;
pub mod database;
pub mod error;
pub mod executor;
pub mod logical_optimizer;
pub mod physical_optimizer;
pub mod physical_planner;
mod plan;
pub mod query_engine;
10 changes: 10 additions & 0 deletions src/query/src/logical_optimizer.rs
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
use crate::error::Result;
use crate::plan::LogicalPlan;
use crate::query_engine::QueryContext;

pub trait LogicalOptimizer {
fn optimize_logical_plan(
&self,
ctx: &mut QueryContext,
plan: &LogicalPlan,
) -> Result<LogicalPlan>;
}
10 changes: 10 additions & 0 deletions src/query/src/physical_optimizer.rs
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
use std::sync::Arc;

use crate::{error::Result, plan::PhysicalPlan, query_engine::QueryContext};

pub trait PhysicalOptimizer {
fn optimize_physical_plan(
&self,
ctx: &mut QueryContext,
plan: Arc<dyn PhysicalPlan>,
) -> Result<Arc<dyn PhysicalPlan>>;
}
16 changes: 16 additions & 0 deletions src/query/src/physical_planner.rs
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
use std::sync::Arc;

use crate::error::Result;
use crate::plan::{LogicalPlan, PhysicalPlan};
use crate::query_engine::QueryContext;

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
#[async_trait::async_trait]
pub trait PhysicalPlanner {
/// Create a physical plan from a logical plan
async fn create_physical_plan(
&self,
ctx: &mut QueryContext,
logical_plan: &LogicalPlan,
) -> Result<Arc<dyn PhysicalPlan>>;
}
44 changes: 44 additions & 0 deletions src/query/src/plan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::any::Any;
use std::sync::Arc;

use common_recordbatch::SendableRecordBatchStream;
use datafusion::logical_plan::LogicalPlan as DfLogicalPlan;
use datatypes::schema::SchemaRef;

use crate::error::Result;

/// A LogicalPlan represents the different types of relational
/// operators (such as Projection, Filter, etc) and can be created by
/// the SQL query planner.
///
/// A LogicalPlan represents transforming an input relation (table) to
/// an output relation (table) with a (potentially) different
/// schema. A plan represents a dataflow tree where data flows
/// from leaves up to the root to produce the query result.
#[derive(Clone)]
pub enum LogicalPlan {
DfPlan(DfLogicalPlan),
}

#[async_trait::async_trait]
pub trait PhysicalPlan: Send + Sync + Any {
/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef;

/// Get a list of child execution plans that provide the input for this plan. The returned list
/// will be empty for leaf nodes, will contain a single value for unary nodes, or two
/// values for binary nodes (such as joins).
fn children(&self) -> Vec<Arc<dyn PhysicalPlan>>;

/// Returns a new plan where all children were replaced by new plans.
/// The size of `children` must be equal to the size of `ExecutionPlan::children()`.
fn with_new_children(
&self,
children: Vec<Arc<dyn PhysicalPlan>>,
) -> Result<Arc<dyn PhysicalPlan>>;

/// creates an iterator
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;

fn as_any(&self) -> &dyn Any;
}
Loading