Skip to content

Commit

Permalink
sql: use simple functions for execution module
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 18, 2024
1 parent fb3a6e7 commit 9e6ace8
Show file tree
Hide file tree
Showing 12 changed files with 488 additions and 603 deletions.
73 changes: 32 additions & 41 deletions src/sql/execution/aggregation.rs → src/sql/execution/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,39 @@ use crate::sql::types::{Column, Value};
use std::cmp::Ordering;
use std::collections::HashMap;

/// An aggregation executor
pub struct Aggregation {
source: QueryIterator,
/// Aggregates rows (i.e. GROUP BY).
///
/// TODO: revisit this and clean it up.
pub(super) fn aggregate(
mut source: QueryIterator,
aggregates: Vec<Aggregate>,
accumulators: HashMap<Vec<Value>, Vec<Box<dyn Accumulator>>>,
}

impl Aggregation {
pub fn new(source: QueryIterator, aggregates: Vec<Aggregate>) -> Self {
Self { source, aggregates, accumulators: HashMap::new() }
}

#[allow(clippy::or_fun_call)]
pub fn execute(mut self) -> Result<QueryIterator> {
let agg_count = self.aggregates.len();
while let Some(mut row) = self.source.next().transpose()? {
self.accumulators
.entry(row.split_off(self.aggregates.len()))
.or_insert(self.aggregates.iter().map(<dyn Accumulator>::from).collect())
.iter_mut()
.zip(row)
.try_for_each(|(acc, value)| acc.accumulate(&value))?
}
// If there were no rows and no group-by columns, return a row of empty accumulators:
// SELECT COUNT(*) FROM t WHERE FALSE
if self.accumulators.is_empty() && self.aggregates.len() == self.source.columns.len() {
self.accumulators
.insert(Vec::new(), self.aggregates.iter().map(<dyn Accumulator>::from).collect());
}
Ok(QueryIterator {
columns: self
.source
.columns
.into_iter()
.enumerate()
.map(|(i, c)| if i < agg_count { Column { name: None } } else { c })
.collect(),
rows: Box::new(self.accumulators.into_iter().map(|(bucket, accs)| {
Ok(accs.into_iter().map(|acc| acc.aggregate()).chain(bucket).collect())
})),
})
}
) -> Result<QueryIterator> {
let mut accumulators: HashMap<Vec<Value>, Vec<Box<dyn Accumulator>>> = HashMap::new();
let agg_count = aggregates.len();
while let Some(mut row) = source.next().transpose()? {
accumulators
.entry(row.split_off(aggregates.len()))
.or_insert(aggregates.iter().map(<dyn Accumulator>::from).collect())
.iter_mut()
.zip(row)
.try_for_each(|(acc, value)| acc.accumulate(&value))?
}
// If there were no rows and no group-by columns, return a row of empty accumulators:
// SELECT COUNT(*) FROM t WHERE FALSE
if accumulators.is_empty() && aggregates.len() == source.columns.len() {
accumulators.insert(Vec::new(), aggregates.iter().map(<dyn Accumulator>::from).collect());
}
Ok(QueryIterator {
columns: source
.columns
.into_iter()
.enumerate()
.map(|(i, c)| if i < agg_count { Column { name: None } } else { c })
.collect(),
rows: Box::new(accumulators.into_iter().map(|(bucket, accs)| {
Ok(accs.into_iter().map(|acc| acc.aggregate()).chain(bucket).collect())
})),
})
}

// An accumulator
Expand Down
140 changes: 84 additions & 56 deletions src/sql/execution/execute.rs
Original file line number Diff line number Diff line change
@@ -1,135 +1,163 @@
use super::aggregation::Aggregation;
use super::join::{HashJoin, NestedLoopJoin};
use super::mutation::{Delete, Insert, Update};
use super::query::{Filter, Limit, Offset, Order, Projection};
use super::schema::{CreateTable, DropTable};
use super::source::{IndexLookup, KeyLookup, Nothing, Scan};
use super::aggregate;
use super::join;
use super::schema;
use super::source;
use super::transform;
use super::write;
use crate::error::Result;
use crate::sql::engine::Transaction;
use crate::sql::plan::{Node, Plan};
use crate::sql::types::{Columns, Row, Rows};

/// A plan execution result.
pub enum ExecutionResult {
CreateTable { name: String },
DropTable { name: String, existed: bool },
Delete { count: u64 },
Insert { count: u64 },
Update { count: u64 },
Select { iter: QueryIterator },
}

/// A query result iterator, containins the columns and row iterator.
pub struct QueryIterator {
// TODO: use a different type here.
pub columns: Columns,
pub rows: Rows,
}

impl Iterator for QueryIterator {
type Item = Result<Row>;

fn next(&mut self) -> Option<Self::Item> {
self.rows.next()
}
}

/// Executes a plan, returning an execution result.
pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result<ExecutionResult> {
Ok(match plan {
Plan::Select(node) => ExecutionResult::Select { iter: execute(node, txn)? },

Plan::CreateTable { schema } => {
let name = schema.name.clone();
CreateTable::new(schema).execute(txn)?;
schema::create_table(txn, schema)?;
ExecutionResult::CreateTable { name }
}

Plan::DropTable { table, if_exists } => {
let name = table.clone();
let existed = DropTable::new(table, if_exists).execute(txn)?;
ExecutionResult::DropTable { name, existed }
let existed = schema::drop_table(txn, &table, if_exists)?;
ExecutionResult::DropTable { name: table, existed }
}

Plan::Delete { table, source } => {
let source = execute(source, txn)?;
let count = Delete::new(table, source).execute(txn)?;
let count = write::delete(txn, &table, source)?;
ExecutionResult::Delete { count }
}

Plan::Insert { table, columns, expressions } => {
let count = Insert::new(table, columns, expressions).execute(txn)?;
let count = write::insert(txn, &table, columns, expressions)?;
ExecutionResult::Insert { count }
}

Plan::Select(node) => ExecutionResult::Select { iter: execute(node, txn)? },

Plan::Update { table, source, expressions } => {
let source = execute(source, txn)?;
let expressions = expressions.into_iter().map(|(i, _, expr)| (i, expr)).collect();
let count = Update::new(table, source, expressions).execute(txn)?;
let count = write::update(txn, &table, source, expressions)?;
ExecutionResult::Update { count }
}
})
}

/// Recursively executes a query plan node, returning a row iterator.
///
/// TODO: flatten the executor structs into functions where appropriate. Same
/// goes for all other execute functions.
/// Recursively executes a query plan node, returning a query iterator.
///
/// TODO: since iterators are lazy, make this infallible and move all catalog
/// lookups to planning.
pub fn execute(node: Node, txn: &mut impl Transaction) -> Result<QueryIterator> {
match node {
Node::Aggregation { source, aggregates } => {
let source = execute(*source, txn)?;
Aggregation::new(source, aggregates).execute()
aggregate::aggregate(source, aggregates)
}

Node::Filter { source, predicate } => {
let source = execute(*source, txn)?;
Ok(Filter::new(source, predicate).execute())
Ok(transform::filter(source, predicate))
}

Node::HashJoin { left, left_field, right, right_field, outer } => {
let left = execute(*left, txn)?;
let right = execute(*right, txn)?;
HashJoin::new(left, left_field.0, right, right_field.0, outer).execute()
join::hash(left, left_field.0, right, right_field.0, outer)
}

Node::IndexLookup { table, alias: _, column, values } => {
IndexLookup::new(table, column, values).execute(txn)
source::lookup_index(txn, &table, &column, values)
}

Node::KeyLookup { table, alias: _, keys } => KeyLookup::new(table, keys).execute(txn),
Node::KeyLookup { table, alias: _, keys } => source::lookup_key(txn, &table, keys),

Node::Limit { source, limit } => {
let source = execute(*source, txn)?;
Ok(Limit::new(source, limit).execute())
Ok(transform::limit(source, limit))
}

Node::NestedLoopJoin { left, left_size: _, right, predicate, outer } => {
let left = execute(*left, txn)?;
let right = execute(*right, txn)?;
NestedLoopJoin::new(left, right, predicate, outer).execute()
join::nested_loop(left, right, predicate, outer)
}

Node::Nothing => Ok(Nothing.execute()),
Node::Nothing => Ok(source::nothing()),

Node::Offset { source, offset } => {
let source = execute(*source, txn)?;
Ok(Offset::new(source, offset).execute())
Ok(transform::offset(source, offset))
}

Node::Order { source, orders } => {
let source = execute(*source, txn)?;
Order::new(source, orders).execute()
Ok(transform::order(source, orders))
}

Node::Projection { source, expressions } => {
let source = execute(*source, txn)?;
Ok(Projection::new(source, expressions).execute())
Ok(transform::project(source, expressions))
}

Node::Scan { table, alias: _, filter } => Scan::new(table, filter).execute(txn),
Node::Scan { table, alias: _, filter } => source::scan(txn, &table, filter),
}
}

/// A plan execution result.
pub enum ExecutionResult {
CreateTable { name: String },
DropTable { name: String, existed: bool },
Delete { count: u64 },
Insert { count: u64 },
Update { count: u64 },
Select { iter: QueryIterator },
}

/// A query result iterator, containing the columns and row iterator.
pub struct QueryIterator {
// TODO: use a different type here.
pub columns: Columns,
pub rows: Rows,
}

impl Iterator for QueryIterator {
type Item = Result<Row>;

fn next(&mut self) -> Option<Self::Item> {
self.rows.next()
}
}

impl QueryIterator {
/// Replaces the columns with the result of the closure.
pub fn map_columns(mut self, f: impl FnOnce(Columns) -> Columns) -> Self {
self.columns = f(self.columns);
self
}

/// Replaces the rows iterator with the result of the closure.
pub fn map_rows<F, I>(mut self, f: F) -> Self
where
I: Iterator<Item = Result<Row>> + 'static,
F: FnOnce(Rows) -> I,
{
self.rows = Box::new(f(self.rows));
self
}

/// Like map_rows, but if the closure errors the row iterator will yield a
/// single error item.
pub fn try_map_rows<F, I>(mut self, f: F) -> Self
where
I: Iterator<Item = Result<Row>> + 'static,
F: FnOnce(Rows) -> Result<I>,
{
self.rows = match f(self.rows) {
Ok(rows) => Box::new(rows),
Err(e) => Box::new(std::iter::once(Err(e))),
};
self
}
}
Loading

0 comments on commit 9e6ace8

Please sign in to comment.