diff --git a/src/sql/execution/execute.rs b/src/sql/execution/execute.rs index 7015820d..44f4d83f 100644 --- a/src/sql/execution/execute.rs +++ b/src/sql/execution/execute.rs @@ -1,54 +1,30 @@ use super::aggregation::Aggregation; use super::join::{HashJoin, NestedLoopJoin}; use super::mutation::{Delete, Insert, Update}; -use super::query::{Filter, Limit, Offset, Order, Projection}; -use super::schema::{CreateTable, DropTable}; -use super::source::{IndexLookup, KeyLookup, Nothing, Scan}; +use super::source; +use super::transform; use crate::error::Result; use crate::sql::engine::Transaction; use crate::sql::plan::{Node, Plan}; use crate::sql::types::{Columns, Row, Rows}; -/// A plan execution result. -pub enum ExecutionResult { - CreateTable { name: String }, - DropTable { name: String, existed: bool }, - Delete { count: u64 }, - Insert { count: u64 }, - Update { count: u64 }, - Select { iter: QueryIterator }, -} - -/// A query result iterator, containins the columns and row iterator. -pub struct QueryIterator { - // TODO: use a different type here. - pub columns: Columns, - pub rows: Rows, -} - -impl Iterator for QueryIterator { - type Item = Result; - - fn next(&mut self) -> Option { - self.rows.next() - } -} - /// Executes a plan, returning an execution result. pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result { Ok(match plan { - Plan::Select(node) => ExecutionResult::Select { iter: execute(node, txn)? }, - Plan::CreateTable { schema } => { let name = schema.name.clone(); - CreateTable::new(schema).execute(txn)?; + txn.create_table(schema)?; ExecutionResult::CreateTable { name } } Plan::DropTable { table, if_exists } => { let name = table.clone(); - let existed = DropTable::new(table, if_exists).execute(txn)?; - ExecutionResult::DropTable { name, existed } + // TODO the planner should deal with this. + if if_exists && txn.get_table(&table)?.is_none() { + return Ok(ExecutionResult::DropTable { name, existed: false }); + } + txn.drop_table(&table)?; + ExecutionResult::DropTable { name, existed: true } } Plan::Delete { table, source } => { @@ -62,6 +38,8 @@ pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result ExecutionResult::Select { iter: execute(node, txn)? }, + Plan::Update { table, source, expressions } => { let source = execute(source, txn)?; let expressions = expressions.into_iter().map(|(i, _, expr)| (i, expr)).collect(); @@ -71,10 +49,7 @@ pub fn execute_plan(plan: Plan, txn: &mut impl Transaction) -> Result Result Node::Filter { source, predicate } => { let source = execute(*source, txn)?; - Ok(Filter::new(source, predicate).execute()) + Ok(transform::filter(source, predicate)) } Node::HashJoin { left, left_field, right, right_field, outer } => { @@ -97,14 +72,14 @@ pub fn execute(node: Node, txn: &mut impl Transaction) -> Result } Node::IndexLookup { table, alias: _, column, values } => { - IndexLookup::new(table, column, values).execute(txn) + source::lookup_index(txn, &table, &column, values) } - Node::KeyLookup { table, alias: _, keys } => KeyLookup::new(table, keys).execute(txn), + Node::KeyLookup { table, alias: _, keys } => source::lookup_key(txn, &table, keys), Node::Limit { source, limit } => { let source = execute(*source, txn)?; - Ok(Limit::new(source, limit).execute()) + Ok(transform::limit(source, limit)) } Node::NestedLoopJoin { left, left_size: _, right, predicate, outer } => { @@ -113,23 +88,80 @@ pub fn execute(node: Node, txn: &mut impl Transaction) -> Result NestedLoopJoin::new(left, right, predicate, outer).execute() } - Node::Nothing => Ok(Nothing.execute()), + Node::Nothing => Ok(source::nothing()), Node::Offset { source, offset } => { let source = execute(*source, txn)?; - Ok(Offset::new(source, offset).execute()) + Ok(transform::offset(source, offset)) } Node::Order { source, orders } => { let source = execute(*source, txn)?; - Order::new(source, orders).execute() + Ok(transform::order(source, orders)) } Node::Projection { source, expressions } => { let source = execute(*source, txn)?; - Ok(Projection::new(source, expressions).execute()) + Ok(transform::project(source, expressions)) } - Node::Scan { table, alias: _, filter } => Scan::new(table, filter).execute(txn), + Node::Scan { table, alias: _, filter } => source::scan(txn, &table, filter), + } +} + +/// A plan execution result. +pub enum ExecutionResult { + CreateTable { name: String }, + DropTable { name: String, existed: bool }, + Delete { count: u64 }, + Insert { count: u64 }, + Update { count: u64 }, + Select { iter: QueryIterator }, +} + +/// A query result iterator, containing the columns and row iterator. +pub struct QueryIterator { + // TODO: use a different type here. + pub columns: Columns, + pub rows: Rows, +} + +impl Iterator for QueryIterator { + type Item = Result; + + fn next(&mut self) -> Option { + self.rows.next() + } +} + +impl QueryIterator { + /// Replaces the columns with the result of the closure. + pub(super) fn map_columns(mut self, f: impl FnOnce(Columns) -> Columns) -> Self { + self.columns = f(self.columns); + self + } + + /// Replaces the rows iterator with the result of the closure. + pub(super) fn map_rows(mut self, f: F) -> Self + where + I: Iterator> + 'static, + F: FnOnce(Rows) -> I, + { + self.rows = Box::new(f(self.rows)); + self + } + + /// Like map_rows, but if the closure errors the row iterator will yield a + /// single error item. + pub(super) fn try_map_rows(mut self, f: F) -> Self + where + I: Iterator> + 'static, + F: FnOnce(Rows) -> Result, + { + self.rows = match f(self.rows) { + Ok(rows) => Box::new(rows), + Err(e) => Box::new(std::iter::once(Err(e))), + }; + self } } diff --git a/src/sql/execution/mod.rs b/src/sql/execution/mod.rs index 0c0f4b66..6d1afce7 100644 --- a/src/sql/execution/mod.rs +++ b/src/sql/execution/mod.rs @@ -2,8 +2,7 @@ mod aggregation; mod execute; mod join; mod mutation; -mod query; -mod schema; mod source; +mod transform; pub use execute::{execute_plan, ExecutionResult, QueryIterator}; diff --git a/src/sql/execution/query.rs b/src/sql/execution/query.rs deleted file mode 100644 index ea66dacd..00000000 --- a/src/sql/execution/query.rs +++ /dev/null @@ -1,157 +0,0 @@ -use super::QueryIterator; -use crate::errinput; -use crate::error::Result; -use crate::sql::plan::Direction; -use crate::sql::types::{Column, Expression, Row, Value}; - -/// A filter executor -pub struct Filter { - source: QueryIterator, - predicate: Expression, -} - -impl Filter { - pub fn new(source: QueryIterator, predicate: Expression) -> Self { - Self { source, predicate } - } - - pub fn execute(self) -> QueryIterator { - QueryIterator { - columns: self.source.columns, - rows: Box::new(self.source.rows.filter_map(move |r| { - r.and_then(|row| match self.predicate.evaluate(Some(&row))? { - Value::Boolean(true) => Ok(Some(row)), - Value::Boolean(false) => Ok(None), - Value::Null => Ok(None), - value => errinput!("filter returned {value}, expected boolean",), - }) - .transpose() - })), - } - } -} - -/// A projection executor -pub struct Projection { - source: QueryIterator, - expressions: Vec<(Expression, Option)>, -} - -impl Projection { - pub fn new(source: QueryIterator, expressions: Vec<(Expression, Option)>) -> Self { - Self { source, expressions } - } - - pub fn execute(self) -> QueryIterator { - let (expressions, labels): (Vec, Vec>) = - self.expressions.into_iter().unzip(); - let columns = expressions - .iter() - .enumerate() - .map(|(i, e)| { - if let Some(Some(label)) = labels.get(i) { - Column { name: Some(label.clone()) } - } else if let Expression::Field(i, _) = e { - self.source.columns.get(*i).cloned().unwrap_or(Column { name: None }) - } else { - Column { name: None } - } - }) - .collect(); - let rows = Box::new(self.source.rows.map(move |r| { - r.and_then(|row| { - expressions.iter().map(|e| e.evaluate(Some(&row))).collect::>() - }) - })); - QueryIterator { columns, rows } - } -} - -/// An ORDER BY executor -pub struct Order { - source: QueryIterator, - order: Vec<(Expression, Direction)>, -} - -impl Order { - pub fn new(source: QueryIterator, order: Vec<(Expression, Direction)>) -> Self { - Self { source, order } - } - - pub fn execute(mut self) -> Result { - // FIXME Since we can't return errors from the sort_by closure, we have - // to pre-evaluate all values. This means that we can't short-circuit - // evaluation, and have to temporarily store evaluated values, which is - // bad for performance and memory usage respectively - struct Item { - row: Row, - values: Vec, - } - - let mut items = Vec::new(); - while let Some(row) = self.source.next().transpose()? { - let mut values = Vec::new(); - for (expr, _) in self.order.iter() { - values.push(expr.evaluate(Some(&row))?); - } - items.push(Item { row, values }) - } - - let order = &self.order; - items.sort_by(|a, b| { - for (i, (_, order)) in order.iter().enumerate() { - let value_a = &a.values[i]; - let value_b = &b.values[i]; - match value_a.partial_cmp(value_b) { - Some(std::cmp::Ordering::Equal) => {} - Some(o) => return if *order == Direction::Ascending { o } else { o.reverse() }, - None => {} - } - } - std::cmp::Ordering::Equal - }); - - Ok(QueryIterator { - columns: self.source.columns, - rows: Box::new(items.into_iter().map(|i| Ok(i.row))), - }) - } -} - -/// A LIMIT executor -pub struct Limit { - source: QueryIterator, - limit: u64, -} - -impl Limit { - pub fn new(source: QueryIterator, limit: u64) -> Self { - Self { source, limit } - } - - pub fn execute(self) -> QueryIterator { - QueryIterator { - columns: self.source.columns, - rows: Box::new(self.source.rows.take(self.limit as usize)), - } - } -} - -/// An OFFSET executor -pub struct Offset { - source: QueryIterator, - offset: u64, -} - -impl Offset { - pub fn new(source: QueryIterator, offset: u64) -> Self { - Self { source, offset } - } - - pub fn execute(self) -> QueryIterator { - QueryIterator { - columns: self.source.columns, - rows: Box::new(self.source.rows.skip(self.offset as usize)), - } - } -} diff --git a/src/sql/execution/schema.rs b/src/sql/execution/schema.rs deleted file mode 100644 index 3f86c801..00000000 --- a/src/sql/execution/schema.rs +++ /dev/null @@ -1,39 +0,0 @@ -use crate::error::Result; -use crate::sql::engine::Transaction; -use crate::sql::types::schema::Table; - -/// A CREATE TABLE executor -pub struct CreateTable { - table: Table, -} - -impl CreateTable { - pub fn new(table: Table) -> Self { - Self { table } - } - - pub fn execute(self, txn: &mut impl Transaction) -> Result<()> { - txn.create_table(self.table) - } -} - -/// A DROP TABLE executor -pub struct DropTable { - table: String, - if_exists: bool, -} - -impl DropTable { - pub fn new(table: String, if_exists: bool) -> Self { - Self { table, if_exists } - } - - pub fn execute(self, txn: &mut impl Transaction) -> Result { - // TODO the planner should deal with this. - if self.if_exists && txn.get_table(&self.table)?.is_none() { - return Ok(false); - } - txn.drop_table(&self.table)?; - Ok(true) - } -} diff --git a/src/sql/execution/source.rs b/src/sql/execution/source.rs index 06db995e..adc79f54 100644 --- a/src/sql/execution/source.rs +++ b/src/sql/execution/source.rs @@ -3,94 +3,71 @@ use crate::error::Result; use crate::sql::engine::Transaction; use crate::sql::types::{Column, Expression, Row, Value}; -use std::collections::HashSet; - -/// A table scan executor -pub struct Scan { - table: String, +/// A table scan source. +pub(super) fn scan( + txn: &mut impl Transaction, + table: &str, filter: Option, +) -> Result { + // TODO: this should not be fallible. Pass the schema in the plan node. + let table = txn.must_get_table(table)?; + Ok(QueryIterator { + columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(), + rows: Box::new(txn.scan(&table.name, filter)?), + }) } -impl Scan { - pub fn new(table: String, filter: Option) -> Self { - Self { table, filter } - } - - pub fn execute(self, txn: &mut impl Transaction) -> Result { - let table = txn.must_get_table(&self.table)?; - Ok(QueryIterator { - columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(txn.scan(&table.name, self.filter)?), - }) - } -} - -/// A primary key lookup executor -pub struct KeyLookup { - table: String, +/// A primary key lookup source. +pub(super) fn lookup_key( + txn: &mut impl Transaction, + table: &str, keys: Vec, +) -> Result { + // TODO: move catalog lookup elsewhere and make this infallible. + let table = txn.must_get_table(table)?; + + // TODO: don't collect into vec, requires shared txn borrow. + let rows = keys + .into_iter() + .filter_map(|key| txn.get(&table.name, &key).transpose()) + .collect::>>()?; + + Ok(QueryIterator { + columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), + rows: Box::new(rows.into_iter().map(Ok)), + }) } -impl KeyLookup { - pub fn new(table: String, keys: Vec) -> Self { - Self { table, keys } - } - - pub fn execute(self, txn: &mut impl Transaction) -> Result { - let table = txn.must_get_table(&self.table)?; - - // FIXME Is there a way to pass the txn into an iterator closure instead? - let rows = self - .keys - .into_iter() - .filter_map(|key| txn.get(&table.name, &key).transpose()) - .collect::>>()?; - - Ok(QueryIterator { - columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(rows.into_iter().map(Ok)), - }) - } -} - -/// An index value lookup executor -pub struct IndexLookup { - table: String, - column: String, +/// An index lookup source. +pub(super) fn lookup_index( + txn: &mut impl Transaction, + table: &str, + column: &str, values: Vec, -} - -impl IndexLookup { - pub fn new(table: String, column: String, values: Vec) -> Self { - Self { table, column, values } +) -> Result { + // TODO: pass in from planner. + let table = txn.must_get_table(table)?; + + // TODO: consider cleaning up. + let mut pks = std::collections::HashSet::new(); + for value in values { + pks.extend(txn.lookup_index(&table.name, column, &value)?); } - pub fn execute(self, txn: &mut impl Transaction) -> Result { - let table = txn.must_get_table(&self.table)?; + // TODO: use a shared txn borrow instead. + let rows = pks + .into_iter() + .filter_map(|pk| txn.get(&table.name, &pk).transpose()) + .collect::>>()?; - let mut pks: HashSet = HashSet::new(); - for value in self.values { - pks.extend(txn.lookup_index(&self.table, &self.column, &value)?); - } - - // FIXME Is there a way to pass the txn into an iterator closure instead? - let rows = pks - .into_iter() - .filter_map(|pk| txn.get(&table.name, &pk).transpose()) - .collect::>>()?; - - Ok(QueryIterator { - columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(rows.into_iter().map(Ok)), - }) - } + Ok(QueryIterator { + columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), + rows: Box::new(rows.into_iter().map(Ok)), + }) } -/// An executor that produces a single empty row -pub struct Nothing; - -impl Nothing { - pub fn execute(self) -> QueryIterator { - QueryIterator { columns: Vec::new(), rows: Box::new(std::iter::once(Ok(Row::new()))) } - } +/// Produces a single empty row. Used for queries without a FROM clause, e.g. +/// SELECT 1+1, in order to have something to project against. +pub(super) fn nothing() -> QueryIterator { + QueryIterator { columns: Vec::new(), rows: Box::new(std::iter::once(Ok(Row::new()))) } } diff --git a/src/sql/execution/transform.rs b/src/sql/execution/transform.rs new file mode 100644 index 00000000..749151e2 --- /dev/null +++ b/src/sql/execution/transform.rs @@ -0,0 +1,98 @@ +use itertools::izip; + +use super::QueryIterator; +use crate::errinput; +use crate::error::Result; +use crate::sql::plan::Direction; +use crate::sql::types::{Column, Expression, Value}; + +/// Filters the input rows (i.e. WHERE). +pub(super) fn filter(source: QueryIterator, predicate: Expression) -> QueryIterator { + source.map_rows(|rows| { + rows.filter_map(move |r| { + r.and_then(|row| match predicate.evaluate(Some(&row))? { + Value::Boolean(true) => Ok(Some(row)), + Value::Boolean(false) => Ok(None), + Value::Null => Ok(None), + value => errinput!("filter returned {value}, expected boolean",), + }) + .transpose() + }) + }) +} + +/// Limits the result to the given number of rows (i.e. LIMIT). +pub(super) fn limit(source: QueryIterator, limit: u64) -> QueryIterator { + source.map_rows(|rows| rows.take(limit as usize)) +} + +/// Skips the given number of rows (i.e. OFFSET). +pub(super) fn offset(source: QueryIterator, offset: u64) -> QueryIterator { + source.map_rows(|rows| rows.skip(offset as usize)) +} + +/// Sorts the rows (i.e. ORDER BY). +pub(super) fn order(source: QueryIterator, order: Vec<(Expression, Direction)>) -> QueryIterator { + source.try_map_rows(move |rows| { + // We can't use sort_by_cached_key(), since expression evaluation is + // fallible, and since we may have to vary the sort direction of each + // expression. Precompute the sort values instead, and map them based on + // the row index. + let mut irows: Vec<_> = + rows.enumerate().map(|(i, r)| r.map(|row| (i, row))).collect::>()?; + + let mut sort_values = Vec::with_capacity(irows.len()); + for (_, row) in &irows { + let values: Vec<_> = + order.iter().map(|(e, _)| e.evaluate(Some(row))).collect::>()?; + sort_values.push(values) + } + + irows.sort_by(|&(a, _), &(b, _)| { + let dirs = order.iter().map(|(_, dir)| dir); + for (a, b, dir) in izip!(&sort_values[a], &sort_values[b], dirs) { + match a.cmp(b) { + std::cmp::Ordering::Equal => {} + order if *dir == Direction::Descending => return order.reverse(), + order => return order, + } + } + std::cmp::Ordering::Equal + }); + + Ok(irows.into_iter().map(|(_, row)| Ok(row))) + }) +} + +/// Projects the rows using the given expressions and labels (i.e. SELECT). +pub(super) fn project( + source: QueryIterator, + expressions: Vec<(Expression, Option)>, +) -> QueryIterator { + // TODO: pass expressions and labels separately. + let (expressions, labels): (Vec<_>, Vec<_>) = expressions.into_iter().unzip(); + + // Use explicit column label if given, or pass through the source column + // label if referenced (e.g. SELECT a, b, a FROM table). + source + .map_columns(|columns| { + labels + .into_iter() + .enumerate() + .map(|(i, label)| { + if let Some(label) = label { + Column { name: Some(label) } + } else if let Expression::Field(f, _) = &expressions[i] { + columns.get(*f).cloned().expect("invalid field reference") + } else { + Column { name: None } + } + }) + .collect() + }) + .map_rows(|rows| { + rows.map(move |r| { + r.and_then(|row| expressions.iter().map(|e| e.evaluate(Some(&row))).collect()) + }) + }) +} diff --git a/src/sql/types/value.rs b/src/sql/types/value.rs index 46eaa76c..d068d7d4 100644 --- a/src/sql/types/value.rs +++ b/src/sql/types/value.rs @@ -61,27 +61,45 @@ impl std::hash::Hash for Value { } } -impl PartialOrd for Value { - fn partial_cmp(&self, other: &Self) -> Option { +impl Ord for Value { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + use std::cmp::Ordering; match (self, other) { - // For ordering purposes, NULL is ordered first. - // - // TODO: revisit this and make sure the NULL handling is sound and - // consistent with e.g. hash and eq handling. - (Self::Null, Self::Null) => Some(std::cmp::Ordering::Equal), - (Self::Null, _) => Some(std::cmp::Ordering::Less), - (_, Self::Null) => Some(std::cmp::Ordering::Greater), - (Self::Boolean(a), Self::Boolean(b)) => a.partial_cmp(b), - (Self::Float(a), Self::Float(b)) => a.partial_cmp(b), - (Self::Float(a), Self::Integer(b)) => a.partial_cmp(&(*b as f64)), - (Self::Integer(a), Self::Float(b)) => (*a as f64).partial_cmp(b), - (Self::Integer(a), Self::Integer(b)) => a.partial_cmp(b), - (Self::String(a), Self::String(b)) => a.partial_cmp(b), - (_, _) => None, + // For ordering purposes, we consider e.g. NULL and NaN equal, and + // establish a total order. + (Self::Null, Self::Null) => Ordering::Equal, + (Self::Boolean(a), Self::Boolean(b)) => a.cmp(b), + (Self::Integer(a), Self::Integer(b)) => a.cmp(b), + (Self::Integer(a), Self::Float(b)) => (*a as f64).total_cmp(b), + (Self::Float(a), Self::Integer(b)) => a.total_cmp(&(*b as f64)), + (Self::Float(a), Self::Float(b)) => a.total_cmp(b), + (Self::String(a), Self::String(b)) => a.cmp(b), + + // Mixed types. Should rarely come up, but we may as well establish + // an order, especially since we also implement Eq. We can handle + // any special cases during expression evaluation. + (Self::Null, _) => Ordering::Less, + (_, Self::Null) => Ordering::Greater, + (Self::Boolean(_), _) => Ordering::Less, + (_, Self::Boolean(_)) => Ordering::Greater, + (Self::Float(_), _) => Ordering::Less, + (_, Self::Float(_)) => Ordering::Greater, + (Self::Integer(_), _) => Ordering::Less, + (_, Self::Integer(_)) => Ordering::Greater, + #[allow(unreachable_patterns)] + (Self::String(_), _) => Ordering::Less, + #[allow(unreachable_patterns)] + (_, Self::String(_)) => Ordering::Greater, } } } +impl PartialOrd for Value { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl Value { /// Returns the value's datatype, or None for null values. pub fn datatype(&self) -> Option { diff --git a/tests/sql/expression.rs b/tests/sql/expression.rs index eeb73886..c5123b19 100644 --- a/tests/sql/expression.rs +++ b/tests/sql/expression.rs @@ -119,6 +119,7 @@ test_expr! { op_eq_float_nan: "NAN = NAN" => Ok(Boolean(false)), op_eq_float_int: "3.0 = 3" => Ok(Boolean(true)), op_eq_float_int_not: "3.01 = 3" => Ok(Boolean(false)), + op_eq_float_zeroes: "0.0 = -0.0" => Ok(Boolean(true)), op_eq_int: "1 = 1" => Ok(Boolean(true)), op_eq_int_not: "1 = 2" => Ok(Boolean(false)), op_eq_int_float: "3 = 3.0" => Ok(Boolean(true)),