Skip to content

Commit

Permalink
sql: split out engine and session submodules
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 17, 2024
1 parent 9a3d839 commit d3c0902
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 262 deletions.
66 changes: 66 additions & 0 deletions src/sql/engine/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#![allow(clippy::module_inception)]

use super::Session;
use crate::error::Result;
use crate::sql::types::schema::Catalog;
use crate::sql::types::{Expression, Row, Value};

use std::collections::HashSet;

/// The SQL engine interface
pub trait Engine: Clone {
/// The transaction type
type Transaction: Transaction;

/// Begins a read-write transaction.
fn begin(&self) -> Result<Self::Transaction>;

/// Begins a read-only transaction.
fn begin_read_only(&self) -> Result<Self::Transaction>;

/// Begins a read-only transaction as of a historical version.
fn begin_as_of(&self, version: u64) -> Result<Self::Transaction>;

/// Begins a session for executing individual statements
fn session(&self) -> Session<Self> {
Session::new(self.clone())
}
}

/// A SQL transaction.
///
/// TODO: split out Catalog trait and don't have Transaction depend on it. This
/// enforces cleaner separation of when catalog access is valid (i.e. during
/// planning but not execution).
pub trait Transaction: Catalog {
/// The transaction's version
fn version(&self) -> u64;
/// Whether the transaction is read-only
fn read_only(&self) -> bool;

/// Commits the transaction
fn commit(self) -> Result<()>;
/// Rolls back the transaction
fn rollback(self) -> Result<()>;

/// Creates a new table row
fn create(&mut self, table: &str, row: Row) -> Result<()>;
/// Deletes a table row
fn delete(&mut self, table: &str, id: &Value) -> Result<()>;
/// Reads a table row, if it exists
fn read(&self, table: &str, id: &Value) -> Result<Option<Row>>;
/// Reads an index entry, if it exists
fn read_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>>;
/// Scans a table's rows
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Scan>;
/// Scans a column's index entries
fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan>;
/// Updates a table row
fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()>;
}

/// A row scan iterator
pub type Scan = Box<dyn DoubleEndedIterator<Item = Result<Row>> + Send>;

/// An index scan iterator
pub type IndexScan = Box<dyn DoubleEndedIterator<Item = Result<(Value, HashSet<Value>)>> + Send>;
268 changes: 6 additions & 262 deletions src/sql/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,265 +1,9 @@
//! The SQL engine provides fundamental CRUD storage operations.
mod engine;
mod kv;
pub mod raft;
mod raft;
mod session;

pub use engine::{Engine, IndexScan, Scan, Transaction};
pub use kv::KV;
pub use raft::{Raft, Status};
use serde::{Deserialize, Serialize};

use super::execution::ExecutionResult;
use super::parser::{ast, Parser};
use super::plan::Plan;
use super::types::schema::Catalog;
use super::types::{Columns, Expression, Row, Rows, Value};
use crate::error::{Error, Result};
use crate::{errdata, errinput};

use std::collections::HashSet;

/// The SQL engine interface
pub trait Engine: Clone {
/// The transaction type
type Transaction: Transaction;

/// Begins a read-write transaction.
fn begin(&self) -> Result<Self::Transaction>;

/// Begins a read-only transaction.
fn begin_read_only(&self) -> Result<Self::Transaction>;

/// Begins a read-only transaction as of a historical version.
fn begin_as_of(&self, version: u64) -> Result<Self::Transaction>;

/// Begins a session for executing individual statements
fn session(&self) -> Session<Self> {
Session { engine: self.clone(), txn: None }
}
}

/// An SQL transaction
pub trait Transaction: Catalog {
/// The transaction's version
fn version(&self) -> u64;
/// Whether the transaction is read-only
fn read_only(&self) -> bool;

/// Commits the transaction
fn commit(self) -> Result<()>;
/// Rolls back the transaction
fn rollback(self) -> Result<()>;

/// Creates a new table row
fn create(&mut self, table: &str, row: Row) -> Result<()>;
/// Deletes a table row
fn delete(&mut self, table: &str, id: &Value) -> Result<()>;
/// Reads a table row, if it exists
fn read(&self, table: &str, id: &Value) -> Result<Option<Row>>;
/// Reads an index entry, if it exists
fn read_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>>;
/// Scans a table's rows
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Scan>;
/// Scans a column's index entries
fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan>;
/// Updates a table row
fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()>;
}

/// An SQL session, which handles transaction control and simplified query execution
pub struct Session<E: Engine + 'static> {
/// The underlying engine
engine: E,
/// The current session transaction, if any
txn: Option<E::Transaction>,
}

impl<E: Engine + 'static> Session<E> {
/// Executes a query, managing transaction status for the session
pub fn execute(&mut self, query: &str) -> Result<StatementResult> {
// FIXME We should match on self.txn as well, but get this error:
// error[E0009]: cannot bind by-move and by-ref in the same pattern
// ...which seems like an arbitrary compiler limitation
match Parser::new(query).parse()? {
ast::Statement::Begin { .. } if self.txn.is_some() => {
errinput!("already in a transaction")
}
ast::Statement::Begin { read_only: true, as_of: None } => {
let txn = self.engine.begin_read_only()?;
let result = StatementResult::Begin { version: txn.version(), read_only: true };
self.txn = Some(txn);
Ok(result)
}
ast::Statement::Begin { read_only: true, as_of: Some(version) } => {
let txn = self.engine.begin_as_of(version)?;
let result = StatementResult::Begin { version, read_only: true };
self.txn = Some(txn);
Ok(result)
}
ast::Statement::Begin { read_only: false, as_of: Some(_) } => {
errinput!("can't start read-write transaction in a given version")
}
ast::Statement::Begin { read_only: false, as_of: None } => {
let txn = self.engine.begin()?;
let result = StatementResult::Begin { version: txn.version(), read_only: false };
self.txn = Some(txn);
Ok(result)
}
ast::Statement::Commit | ast::Statement::Rollback if self.txn.is_none() => {
errinput!("not in a transaction")
}
ast::Statement::Commit => {
let txn = self.txn.take().unwrap();
let version = txn.version();
txn.commit()?;
Ok(StatementResult::Commit { version })
}
ast::Statement::Rollback => {
let txn = self.txn.take().unwrap();
let version = txn.version();
txn.rollback()?;
Ok(StatementResult::Rollback { version })
}
// TODO: this needs testing.
ast::Statement::Explain(statement) => self.with_txn_read_only(|txn| {
Ok(StatementResult::Explain(Plan::build(*statement, txn)?.optimize(txn)?))
}),
statement if self.txn.is_some() => {
Self::execute_with(statement, self.txn.as_mut().unwrap())
}
statement @ ast::Statement::Select { .. } => {
let mut txn = self.engine.begin_read_only()?;
let result = Self::execute_with(statement, &mut txn);
txn.rollback()?;
result
}
statement => {
let mut txn = self.engine.begin()?;
let result = Self::execute_with(statement, &mut txn);
match &result {
Ok(_) => txn.commit()?,
Err(_) => txn.rollback()?,
}
result
}
}
}

/// Helper function to execute a statement with the given transaction.
/// Allows using a mutable borrow either to the session's transaction
/// or a temporary read-only or read/write transaction.
///
/// TODO: reconsider this.
fn execute_with(
statement: ast::Statement,
txn: &mut E::Transaction,
) -> Result<StatementResult> {
Plan::build(statement, txn)?.optimize(txn)?.execute(txn)?.try_into()
}

/// Runs a read-only closure in the session's transaction, or a new
/// read-only transaction if none is active.
///
/// TODO: reconsider this.
pub fn with_txn_read_only<F, R>(&mut self, f: F) -> Result<R>
where
F: FnOnce(&mut E::Transaction) -> Result<R>,
{
if let Some(ref mut txn) = self.txn {
return f(txn);
}
let mut txn = self.engine.begin_read_only()?;
let result = f(&mut txn);
txn.rollback()?;
result
}
}

impl Session<Raft> {
pub fn status(&self) -> Result<Status> {
self.engine.status()
}
}

impl<E: Engine + 'static> Drop for Session<E> {
fn drop(&mut self) {
self.execute("ROLLBACK").ok();
}
}

/// A row scan iterator
pub type Scan = Box<dyn DoubleEndedIterator<Item = Result<Row>> + Send>;

/// An index scan iterator
pub type IndexScan = Box<dyn DoubleEndedIterator<Item = Result<(Value, HashSet<Value>)>> + Send>;

/// A session statement result. This is also sent across the wire to SQL
/// clients.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub enum StatementResult {
// Transaction started
Begin { version: u64, read_only: bool },
// Transaction committed
Commit { version: u64 },
// Transaction rolled back
Rollback { version: u64 },
// Rows created
Create { count: u64 },
// Rows deleted
Delete { count: u64 },
// Rows updated
Update { count: u64 },
// Table created
CreateTable { name: String },
// Table dropped
DropTable { name: String, existed: bool },
// Query result.
//
// For simplicity, buffer and send the entire result as a vector instead of
// streaming it to the client. Streaming reads haven't been implemented from
// Raft either.
Query { columns: Columns, rows: Vec<Row> },
// Explain result
Explain(Plan),
}

impl StatementResult {
/// Converts the ResultSet into a row, or errors if not a query result with rows.
pub fn into_row(self) -> Result<Row> {
self.into_rows()?.next().transpose()?.ok_or(errdata!("no rows returned"))
}

/// Converts the ResultSet into a row iterator, or errors if not a query
/// result with rows.
pub fn into_rows(self) -> Result<Rows> {
if let StatementResult::Query { rows, .. } = self {
Ok(Box::new(rows.into_iter().map(Ok)))
} else {
errdata!("not a query result: {self:?}")
}
}

/// Converts the ResultSet into a value, if possible.
/// TODO: use TryFrom for this, also to primitive types via Value as TryFrom.
pub fn into_value(self) -> Result<Value> {
self.into_row()?.into_iter().next().ok_or(errdata!("no value returned"))
}
}

// TODO: remove or revisit this.
impl TryFrom<ExecutionResult> for StatementResult {
type Error = Error;

fn try_from(result: ExecutionResult) -> Result<Self> {
Ok(match result {
ExecutionResult::CreateTable { name } => StatementResult::CreateTable { name },
ExecutionResult::DropTable { name, existed } => {
StatementResult::DropTable { name, existed }
}
ExecutionResult::Delete { count } => StatementResult::Delete { count },
ExecutionResult::Insert { count } => StatementResult::Create { count },
ExecutionResult::Select { iter } => StatementResult::Query {
columns: iter.columns,
rows: iter.rows.collect::<Result<_>>()?,
},
ExecutionResult::Update { count } => StatementResult::Update { count },
})
}
}
pub use session::{Session, StatementResult};
Loading

0 comments on commit d3c0902

Please sign in to comment.