Skip to content

Commit

Permalink
sql: split transaction and catalog traits, and use catalog during pla…
Browse files Browse the repository at this point in the history
…nning
  • Loading branch information
erikgrinaker committed Jun 18, 2024
1 parent 5a1756a commit 1269f8e
Show file tree
Hide file tree
Showing 121 changed files with 18,427 additions and 440 deletions.
18 changes: 6 additions & 12 deletions src/sql/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::{HashMap, HashSet};
/// The SQL engine interface.
pub trait Engine<'a>: Sized {
/// The transaction type.
type Transaction: Transaction + 'a;
type Transaction: Transaction + Catalog + 'a;

/// Begins a read-write transaction.
fn begin(&self) -> Result<Self::Transaction>;
Expand All @@ -32,11 +32,7 @@ pub trait Engine<'a>: Sized {
/// All methods operate on row batches rather than single rows to amortize the
/// cost. We have to do a Raft roundtrip for every call, and we'd rather not
/// have to do a Raft roundtrip for every row.
///
/// TODO: split out Catalog trait and don't have Transaction depend on it. This
/// enforces cleaner separation of when catalog access is valid (i.e. during
/// planning but not execution).
pub trait Transaction: Catalog {
pub trait Transaction {
/// The transaction's MVCC version.
fn version(&self) -> mvcc::Version;
/// Whether the transaction is read-only.
Expand Down Expand Up @@ -68,15 +64,13 @@ pub trait Transaction: Catalog {
/// An index scan iterator.
pub type IndexScan = Box<dyn Iterator<Item = Result<(Value, HashSet<Value>)>>>;

/// The catalog stores schema information
/// The catalog stores schema information.
pub trait Catalog {
/// Creates a new table.
fn create_table(&self, table: Table) -> Result<()>;
/// Drops a table. Errors if it does not exist.
///
/// TODO: consider taking an if_exists parameter, but that will incur a Raft
/// roundtrip.
fn drop_table(&self, table: &str) -> Result<()>;
/// Drops a table. Errors if it does not exist, unless if_exists is true.
/// Returns true if the table existed and was deleted.
fn drop_table(&self, table: &str, if_exists: bool) -> Result<bool>;
/// Fetches a table schema.
fn get_table(&self, table: &str) -> Result<Option<Table>>;
/// Lists tables.
Expand Down
13 changes: 10 additions & 3 deletions src/sql/engine/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,23 @@ impl<E: storage::Engine> Catalog for Transaction<E> {
self.txn.set(&Key::Table((&table.name).into()).encode(), table.encode())
}

fn drop_table(&self, table: &str) -> Result<()> {
let table = self.must_get_table(table)?;
fn drop_table(&self, table: &str, if_exists: bool) -> Result<bool> {
let table = if !if_exists {
self.must_get_table(table)?
} else if let Some(table) = self.get_table(table)? {
table
} else {
return Ok(false);
};
if let Some((t, cs)) = self.references(&table.name, false)?.first() {
return errinput!("table {} is referenced by table {} column {}", table.name, t, cs[0]);
}
let mut scan = self.scan(&table.name, None)?;
while let Some(row) = scan.next().transpose()? {
self.delete(&table.name, &[table.get_row_key(&row)?])?
}
self.txn.delete(&Key::Table(table.name.into()).encode())
self.txn.delete(&Key::Table(table.name.into()).encode())?;
Ok(true)
}

fn get_table(&self, table: &str) -> Result<Option<Table>> {
Expand Down
15 changes: 9 additions & 6 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ enum Mutation {
/// Creates a table
CreateTable { txn: TransactionState, schema: Table },
/// Deletes a table
DeleteTable { txn: TransactionState, table: String },
DeleteTable { txn: TransactionState, table: String, if_exists: bool },
}

impl encoding::Value for Mutation {}
Expand Down Expand Up @@ -270,9 +270,12 @@ impl Catalog for Transaction {
self.client.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table })
}

fn drop_table(&self, table: &str) -> Result<()> {
self.client
.mutate(Mutation::DeleteTable { txn: self.state.clone(), table: table.to_string() })
fn drop_table(&self, table: &str, if_exists: bool) -> Result<bool> {
self.client.mutate(Mutation::DeleteTable {
txn: self.state.clone(),
table: table.to_string(),
if_exists,
})
}

fn get_table(&self, table: &str) -> Result<Option<Table>> {
Expand Down Expand Up @@ -323,8 +326,8 @@ impl<E: storage::Engine> State<E> {
Mutation::CreateTable { txn, schema } => {
bincode::serialize(&self.engine.resume(txn)?.create_table(schema)?)
}
Mutation::DeleteTable { txn, table } => {
bincode::serialize(&self.engine.resume(txn)?.drop_table(&table)?)
Mutation::DeleteTable { txn, table, if_exists } => {
bincode::serialize(&self.engine.resume(txn)?.drop_table(&table, if_exists)?)
}
};
Ok(response)
Expand Down
4 changes: 2 additions & 2 deletions src/sql/engine/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<'a, E: Engine<'a>> Session<'a, E> {
}
// TODO: this needs testing.
ast::Statement::Explain(statement) => self.with_txn_read_only(|txn| {
Ok(StatementResult::Explain(Plan::build(*statement, txn)?.optimize(txn)?))
Ok(StatementResult::Explain(Plan::build(*statement, txn)?.optimize()?))
}),
statement if self.txn.is_some() => {
Self::execute_with(statement, self.txn.as_mut().unwrap())
Expand Down Expand Up @@ -101,7 +101,7 @@ impl<'a, E: Engine<'a>> Session<'a, E> {
statement: ast::Statement,
txn: &mut E::Transaction,
) -> Result<StatementResult> {
Plan::build(statement, txn)?.optimize(txn)?.execute(txn)?.try_into()
Plan::build(statement, txn)?.optimize()?.execute(txn)?.try_into()
}

/// Runs a read-only closure in the session's transaction, or a new
Expand Down
31 changes: 17 additions & 14 deletions src/sql/execution/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,50 +5,53 @@ use super::source;
use super::transform;
use super::write;
use crate::error::Result;
use crate::sql::engine::Transaction;
use crate::sql::engine::{Catalog, Transaction};
use crate::sql::plan::{Node, Plan};
use crate::sql::types::{Columns, Row, Rows};

/// Executes a plan, returning an execution result.
pub fn execute_plan(plan: Plan, txn: &impl Transaction) -> Result<ExecutionResult> {
pub fn execute_plan(
plan: Plan,
txn: &impl Transaction,
catalog: &impl Catalog,
) -> Result<ExecutionResult> {
Ok(match plan {
Plan::CreateTable { schema } => {
let name = schema.name.clone();
schema::create_table(txn, schema)?;
schema::create_table(catalog, schema)?;
ExecutionResult::CreateTable { name }
}

Plan::DropTable { table, if_exists } => {
let existed = schema::drop_table(txn, &table, if_exists)?;
let existed = schema::drop_table(catalog, &table, if_exists)?;
ExecutionResult::DropTable { name: table, existed }
}

Plan::Delete { table, source } => {
Plan::Delete { table, key_index, source } => {
let source = execute(source, txn)?;
let count = write::delete(txn, &table, source)?;
let count = write::delete(txn, table, key_index, source)?;
ExecutionResult::Delete { count }
}

Plan::Insert { table, columns, expressions } => {
let count = write::insert(txn, &table, columns, expressions)?;
let count = write::insert(txn, table, columns, expressions)?;
ExecutionResult::Insert { count }
}

Plan::Select(node) => ExecutionResult::Select { iter: execute(node, txn)? },

Plan::Update { table, source, expressions } => {
Plan::Update { table, key_index, source, expressions } => {
let source = execute(source, txn)?;
let expressions = expressions.into_iter().map(|(i, _, expr)| (i, expr)).collect();
let count = write::update(txn, &table, source, expressions)?;
let count = write::update(txn, table, key_index, source, expressions)?;
ExecutionResult::Update { count }
}
})
}

/// Recursively executes a query plan node, returning a query iterator.
///
/// TODO: since iterators are lazy, make this infallible and move all catalog
/// lookups to planning.
/// TODO: since iterators are lazy, make this infallible if possible.
pub fn execute(node: Node, txn: &impl Transaction) -> Result<QueryIterator> {
match node {
Node::Aggregation { source, aggregates } => {
Expand All @@ -68,10 +71,10 @@ pub fn execute(node: Node, txn: &impl Transaction) -> Result<QueryIterator> {
}

Node::IndexLookup { table, alias: _, column, values } => {
source::lookup_index(txn, &table, &column, values)
source::lookup_index(txn, table, column, values)
}

Node::KeyLookup { table, alias: _, keys } => source::lookup_key(txn, &table, keys),
Node::KeyLookup { table, alias: _, keys } => source::lookup_key(txn, table, keys),

Node::Limit { source, limit } => {
let source = execute(*source, txn)?;
Expand Down Expand Up @@ -101,7 +104,7 @@ pub fn execute(node: Node, txn: &impl Transaction) -> Result<QueryIterator> {
Ok(transform::project(source, expressions))
}

Node::Scan { table, alias: _, filter } => source::scan(txn, &table, filter),
Node::Scan { table, alias: _, filter } => source::scan(txn, table, filter),
}
}

Expand Down
15 changes: 5 additions & 10 deletions src/sql/execution/schema.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
use crate::error::Result;
use crate::sql::engine::Transaction;
use crate::sql::engine::Catalog;
use crate::sql::types::schema::Table;

// Creates a table (i.e. CREATE TABLE).
pub(super) fn create_table(txn: &impl Transaction, schema: Table) -> Result<()> {
txn.create_table(schema)
pub(super) fn create_table(catalog: &impl Catalog, schema: Table) -> Result<()> {
catalog.create_table(schema)
}

/// Deletes a table (i.e. DROP TABLE). Returns true if the table existed.
pub(super) fn drop_table(txn: &impl Transaction, table: &str, if_exists: bool) -> Result<bool> {
// TODO the planner should deal with this.
if if_exists && txn.get_table(table)?.is_none() {
return Ok(false);
}
txn.drop_table(table)?;
Ok(true)
pub(super) fn drop_table(catalog: &impl Catalog, table: &str, if_exists: bool) -> Result<bool> {
catalog.drop_table(table, if_exists)
}
17 changes: 6 additions & 11 deletions src/sql/execution/source.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use super::QueryIterator;
use crate::error::Result;
use crate::sql::engine::Transaction;
use crate::sql::types::schema::Table;
use crate::sql::types::{Column, Expression, Row, Value};

/// A table scan source.
pub(super) fn scan(
txn: &impl Transaction,
table: &str,
table: Table,
filter: Option<Expression>,
) -> Result<QueryIterator> {
// TODO: this should not be fallible. Pass the schema in the plan node.
let table = txn.must_get_table(table)?;
Ok(QueryIterator {
columns: table.columns.into_iter().map(|c| Column { name: Some(c.name) }).collect(),
rows: Box::new(txn.scan(&table.name, filter)?),
Expand All @@ -20,11 +19,9 @@ pub(super) fn scan(
/// A primary key lookup source.
pub(super) fn lookup_key(
txn: &impl Transaction,
table: &str,
table: Table,
keys: Vec<Value>,
) -> Result<QueryIterator> {
// TODO: move catalog lookup elsewhere and make this infallible.
let table = txn.must_get_table(table)?;
Ok(QueryIterator {
columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(),
rows: Box::new(txn.get(&table.name, &keys)?.into_iter().map(Ok)),
Expand All @@ -34,13 +31,11 @@ pub(super) fn lookup_key(
/// An index lookup source.
pub(super) fn lookup_index(
txn: &impl Transaction,
table: &str,
column: &str,
table: Table,
column: String,
values: Vec<Value>,
) -> Result<QueryIterator> {
// TODO: pass in from planner.
let table = txn.must_get_table(table)?;
let pks: Vec<_> = txn.lookup_index(&table.name, column, &values)?.into_iter().collect();
let pks: Vec<_> = txn.lookup_index(&table.name, &column, &values)?.into_iter().collect();
let rows = txn.get(&table.name, &pks)?;

Ok(QueryIterator {
Expand Down
25 changes: 13 additions & 12 deletions src/sql/execution/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use crate::sql::types::{Expression, Row, Value};

/// Deletes rows, taking primary keys from the source (i.e. DELETE).
/// Returns the number of rows deleted.
pub(super) fn delete(txn: &impl Transaction, table: &str, source: QueryIterator) -> Result<u64> {
// TODO: should be prepared by planner.
let table = txn.must_get_table(table)?;
let ids: Vec<_> =
source.map(|r| r.and_then(|row| table.get_row_key(&row))).collect::<Result<_>>()?;
pub(super) fn delete(
txn: &impl Transaction,
table: String,
key_index: usize,
source: QueryIterator,
) -> Result<u64> {
let ids: Vec<_> = source.map(|r| r.map(|row| row[key_index].clone())).collect::<Result<_>>()?;
let count = ids.len() as u64;
txn.delete(&table.name, &ids)?;
txn.delete(&table, &ids)?;
Ok(count)
}

Expand All @@ -22,11 +24,10 @@ pub(super) fn delete(txn: &impl Transaction, table: &str, source: QueryIterator)
/// TODO: this should take rows from a values source.
pub(super) fn insert(
txn: &impl Transaction,
table: &str,
table: Table,
columns: Vec<String>,
values: Vec<Vec<Expression>>,
) -> Result<u64> {
let table = txn.must_get_table(table)?;
let mut rows = Vec::with_capacity(values.len());
for expressions in values {
let mut row =
Expand All @@ -47,22 +48,22 @@ pub(super) fn insert(
/// rows updated.
pub(super) fn update(
txn: &impl Transaction,
table: &str,
table: String,
key_index: usize,
mut source: QueryIterator,
expressions: Vec<(usize, Expression)>,
) -> Result<u64> {
let table = txn.must_get_table(table)?;
let mut update = std::collections::HashMap::new();
while let Some(row) = source.next().transpose()? {
let id = table.get_row_key(&row)?;
let id = row[key_index].clone();
let mut new = row.clone();
for (field, expr) in &expressions {
new[*field] = expr.evaluate(Some(&row))?;
}
update.insert(id, new);
}
let count = update.len() as u64;
txn.update(&table.name, update)?;
txn.update(&table, update)?;
Ok(count)
}

Expand Down

0 comments on commit 1269f8e

Please sign in to comment.