Skip to content

Commit

Permalink
wip sql: clean up execution module
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 17, 2024
1 parent fb3a6e7 commit 3bfa005
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 278 deletions.
114 changes: 70 additions & 44 deletions src/sql/execution/execute.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,30 @@
use super::aggregation::Aggregation;
use super::join::{HashJoin, NestedLoopJoin};
use super::mutation::{Delete, Insert, Update};
use super::query::{Filter, Limit, Offset, Order, Projection};
use super::schema::{CreateTable, DropTable};
use super::source::{IndexLookup, KeyLookup, Nothing, Scan};
use super::source::{self, IndexLookup, KeyLookup};
use super::transform;
use crate::error::Result;
use crate::sql::engine::Transaction;
use crate::sql::plan::{Node, Plan};
use crate::sql::types::{Columns, Row, Rows};

/// A plan execution result.
pub enum ExecutionResult {
CreateTable { name: String },
DropTable { name: String, existed: bool },
Delete { count: u64 },
Insert { count: u64 },
Update { count: u64 },
Select { iter: QueryIterator },
}

/// A query result iterator, containins the columns and row iterator.
pub struct QueryIterator {
// TODO: use a different type here.
pub columns: Columns,
pub rows: Rows,
}

impl Iterator for QueryIterator {
type Item = Result<Row>;

fn next(&mut self) -> Option<Self::Item> {
self.rows.next()
}
}

/// Executes a plan, returning an execution result.
pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result<ExecutionResult> {
Ok(match plan {
Plan::Select(node) => ExecutionResult::Select { iter: execute(node, txn)? },

Plan::CreateTable { schema } => {
let name = schema.name.clone();
CreateTable::new(schema).execute(txn)?;
txn.create_table(schema)?;
ExecutionResult::CreateTable { name }
}

Plan::DropTable { table, if_exists } => {
let name = table.clone();
let existed = DropTable::new(table, if_exists).execute(txn)?;
ExecutionResult::DropTable { name, existed }
// TODO the planner should deal with this.
if if_exists && txn.get_table(&table)?.is_none() {
return Ok(ExecutionResult::DropTable { name, existed: false });
}
txn.drop_table(&table)?;
ExecutionResult::DropTable { name, existed: true }
}

Plan::Delete { table, source } => {
Expand All @@ -62,6 +38,8 @@ pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result<ExecutionR
ExecutionResult::Insert { count }
}

Plan::Select(node) => ExecutionResult::Select { iter: execute(node, txn)? },

Plan::Update { table, source, expressions } => {
let source = execute(source, txn)?;
let expressions = expressions.into_iter().map(|(i, _, expr)| (i, expr)).collect();
Expand All @@ -71,10 +49,7 @@ pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result<ExecutionR
})
}

/// Recursively executes a query plan node, returning a row iterator.
///
/// TODO: flatten the executor structs into functions where appropriate. Same
/// goes for all other execute functions.
/// Recursively executes a query plan node, returning a query iterator.
///
/// TODO: since iterators are lazy, make this infallible and move all catalog
/// lookups to planning.
Expand All @@ -87,7 +62,7 @@ pub fn execute(node: Node, txn: &mut impl Transaction) -> Result<QueryIterator>

Node::Filter { source, predicate } => {
let source = execute(*source, txn)?;
Ok(Filter::new(source, predicate).execute())
Ok(transform::filter(source, predicate))
}

Node::HashJoin { left, left_field, right, right_field, outer } => {
Expand All @@ -104,7 +79,7 @@ pub fn execute(node: Node, txn: &mut impl Transaction) -> Result<QueryIterator>

Node::Limit { source, limit } => {
let source = execute(*source, txn)?;
Ok(Limit::new(source, limit).execute())
Ok(transform::limit(source, limit))
}

Node::NestedLoopJoin { left, left_size: _, right, predicate, outer } => {
Expand All @@ -113,23 +88,74 @@ pub fn execute(node: Node, txn: &mut impl Transaction) -> Result<QueryIterator>
NestedLoopJoin::new(left, right, predicate, outer).execute()
}

Node::Nothing => Ok(Nothing.execute()),
Node::Nothing => Ok(source::nothing()),

Node::Offset { source, offset } => {
let source = execute(*source, txn)?;
Ok(Offset::new(source, offset).execute())
Ok(transform::offset(source, offset))
}

Node::Order { source, orders } => {
let source = execute(*source, txn)?;
Order::new(source, orders).execute()
Ok(transform::order(source, orders))
}

Node::Projection { source, expressions } => {
let source = execute(*source, txn)?;
Ok(Projection::new(source, expressions).execute())
Ok(transform::project(source, expressions))
}

Node::Scan { table, alias: _, filter } => Scan::new(table, filter).execute(txn),
Node::Scan { table, alias: _, filter } => source::scan(txn, &table, filter),
}
}

/// A plan execution result.
pub enum ExecutionResult {
CreateTable { name: String },
DropTable { name: String, existed: bool },
Delete { count: u64 },
Insert { count: u64 },
Update { count: u64 },
Select { iter: QueryIterator },
}

/// A query result iterator, containing the columns and row iterator.
pub struct QueryIterator {
// TODO: use a different type here.
pub columns: Columns,
pub rows: Rows,
}

impl Iterator for QueryIterator {
type Item = Result<Row>;

fn next(&mut self) -> Option<Self::Item> {
self.rows.next()
}
}

impl QueryIterator {
/// Replaces the rows iterator with the result of the closure.
pub(super) fn map_rows<F, I>(mut self, f: F) -> Self
where
I: Iterator<Item = Result<Row>> + 'static,
F: FnOnce(Rows) -> I,
{
self.rows = Box::new(f(self.rows));
self
}

/// Like map_rows, but if the closure errors the row iterator will yield a
/// single error item.
pub(super) fn try_map_rows<F, I>(mut self, f: F) -> Self
where
I: Iterator<Item = Result<Row>> + 'static,
F: FnOnce(Rows) -> Result<I>,
{
self.rows = match f(self.rows) {
Ok(rows) => Box::new(rows),
Err(e) => Box::new(std::iter::once(Err(e))),
};
self
}
}
3 changes: 1 addition & 2 deletions src/sql/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ mod aggregation;
mod execute;
mod join;
mod mutation;
mod query;
mod schema;
mod source;
mod transform;

pub use execute::{execute_plan, ExecutionResult, QueryIterator};
157 changes: 0 additions & 157 deletions src/sql/execution/query.rs

This file was deleted.

39 changes: 0 additions & 39 deletions src/sql/execution/schema.rs

This file was deleted.

0 comments on commit 3bfa005

Please sign in to comment.