-
Notifications
You must be signed in to change notification settings - Fork 556
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sql: split out engine and session submodules
- Loading branch information
1 parent
9a3d839
commit 8a4e8b8
Showing
3 changed files
with
277 additions
and
262 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
#![allow(clippy::module_inception)] | ||
|
||
use super::Session; | ||
use crate::error::Result; | ||
use crate::sql::types::schema::Catalog; | ||
use crate::sql::types::{Expression, Row, Value}; | ||
|
||
use std::collections::HashSet; | ||
|
||
/// The SQL engine interface | ||
pub trait Engine: Clone { | ||
/// The transaction type | ||
type Transaction: Transaction; | ||
|
||
/// Begins a read-write transaction. | ||
fn begin(&self) -> Result<Self::Transaction>; | ||
|
||
/// Begins a read-only transaction. | ||
fn begin_read_only(&self) -> Result<Self::Transaction>; | ||
|
||
/// Begins a read-only transaction as of a historical version. | ||
fn begin_as_of(&self, version: u64) -> Result<Self::Transaction>; | ||
|
||
/// Begins a session for executing individual statements | ||
fn session(&self) -> Session<Self> { | ||
Session::new(self.clone()) | ||
} | ||
} | ||
|
||
/// An SQL transaction | ||
pub trait Transaction: Catalog { | ||
/// The transaction's version | ||
fn version(&self) -> u64; | ||
/// Whether the transaction is read-only | ||
fn read_only(&self) -> bool; | ||
|
||
/// Commits the transaction | ||
fn commit(self) -> Result<()>; | ||
/// Rolls back the transaction | ||
fn rollback(self) -> Result<()>; | ||
|
||
/// Creates a new table row | ||
fn create(&mut self, table: &str, row: Row) -> Result<()>; | ||
/// Deletes a table row | ||
fn delete(&mut self, table: &str, id: &Value) -> Result<()>; | ||
/// Reads a table row, if it exists | ||
fn read(&self, table: &str, id: &Value) -> Result<Option<Row>>; | ||
/// Reads an index entry, if it exists | ||
fn read_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>>; | ||
/// Scans a table's rows | ||
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Scan>; | ||
/// Scans a column's index entries | ||
fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan>; | ||
/// Updates a table row | ||
fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()>; | ||
} | ||
|
||
/// A row scan iterator | ||
pub type Scan = Box<dyn DoubleEndedIterator<Item = Result<Row>> + Send>; | ||
|
||
/// An index scan iterator | ||
pub type IndexScan = Box<dyn DoubleEndedIterator<Item = Result<(Value, HashSet<Value>)>> + Send>; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,265 +1,9 @@ | ||
//! The SQL engine provides fundamental CRUD storage operations. | ||
mod engine; | ||
mod kv; | ||
pub mod raft; | ||
mod raft; | ||
mod session; | ||
|
||
pub use engine::{Engine, IndexScan, Scan, Transaction}; | ||
pub use kv::KV; | ||
pub use raft::{Raft, Status}; | ||
use serde::{Deserialize, Serialize}; | ||
|
||
use super::execution::ExecutionResult; | ||
use super::parser::{ast, Parser}; | ||
use super::plan::Plan; | ||
use super::types::schema::Catalog; | ||
use super::types::{Columns, Expression, Row, Rows, Value}; | ||
use crate::error::{Error, Result}; | ||
use crate::{errdata, errinput}; | ||
|
||
use std::collections::HashSet; | ||
|
||
/// The SQL engine interface | ||
pub trait Engine: Clone { | ||
/// The transaction type | ||
type Transaction: Transaction; | ||
|
||
/// Begins a read-write transaction. | ||
fn begin(&self) -> Result<Self::Transaction>; | ||
|
||
/// Begins a read-only transaction. | ||
fn begin_read_only(&self) -> Result<Self::Transaction>; | ||
|
||
/// Begins a read-only transaction as of a historical version. | ||
fn begin_as_of(&self, version: u64) -> Result<Self::Transaction>; | ||
|
||
/// Begins a session for executing individual statements | ||
fn session(&self) -> Session<Self> { | ||
Session { engine: self.clone(), txn: None } | ||
} | ||
} | ||
|
||
/// An SQL transaction | ||
pub trait Transaction: Catalog { | ||
/// The transaction's version | ||
fn version(&self) -> u64; | ||
/// Whether the transaction is read-only | ||
fn read_only(&self) -> bool; | ||
|
||
/// Commits the transaction | ||
fn commit(self) -> Result<()>; | ||
/// Rolls back the transaction | ||
fn rollback(self) -> Result<()>; | ||
|
||
/// Creates a new table row | ||
fn create(&mut self, table: &str, row: Row) -> Result<()>; | ||
/// Deletes a table row | ||
fn delete(&mut self, table: &str, id: &Value) -> Result<()>; | ||
/// Reads a table row, if it exists | ||
fn read(&self, table: &str, id: &Value) -> Result<Option<Row>>; | ||
/// Reads an index entry, if it exists | ||
fn read_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>>; | ||
/// Scans a table's rows | ||
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Scan>; | ||
/// Scans a column's index entries | ||
fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan>; | ||
/// Updates a table row | ||
fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()>; | ||
} | ||
|
||
/// An SQL session, which handles transaction control and simplified query execution | ||
pub struct Session<E: Engine + 'static> { | ||
/// The underlying engine | ||
engine: E, | ||
/// The current session transaction, if any | ||
txn: Option<E::Transaction>, | ||
} | ||
|
||
impl<E: Engine + 'static> Session<E> { | ||
/// Executes a query, managing transaction status for the session | ||
pub fn execute(&mut self, query: &str) -> Result<StatementResult> { | ||
// FIXME We should match on self.txn as well, but get this error: | ||
// error[E0009]: cannot bind by-move and by-ref in the same pattern | ||
// ...which seems like an arbitrary compiler limitation | ||
match Parser::new(query).parse()? { | ||
ast::Statement::Begin { .. } if self.txn.is_some() => { | ||
errinput!("already in a transaction") | ||
} | ||
ast::Statement::Begin { read_only: true, as_of: None } => { | ||
let txn = self.engine.begin_read_only()?; | ||
let result = StatementResult::Begin { version: txn.version(), read_only: true }; | ||
self.txn = Some(txn); | ||
Ok(result) | ||
} | ||
ast::Statement::Begin { read_only: true, as_of: Some(version) } => { | ||
let txn = self.engine.begin_as_of(version)?; | ||
let result = StatementResult::Begin { version, read_only: true }; | ||
self.txn = Some(txn); | ||
Ok(result) | ||
} | ||
ast::Statement::Begin { read_only: false, as_of: Some(_) } => { | ||
errinput!("can't start read-write transaction in a given version") | ||
} | ||
ast::Statement::Begin { read_only: false, as_of: None } => { | ||
let txn = self.engine.begin()?; | ||
let result = StatementResult::Begin { version: txn.version(), read_only: false }; | ||
self.txn = Some(txn); | ||
Ok(result) | ||
} | ||
ast::Statement::Commit | ast::Statement::Rollback if self.txn.is_none() => { | ||
errinput!("not in a transaction") | ||
} | ||
ast::Statement::Commit => { | ||
let txn = self.txn.take().unwrap(); | ||
let version = txn.version(); | ||
txn.commit()?; | ||
Ok(StatementResult::Commit { version }) | ||
} | ||
ast::Statement::Rollback => { | ||
let txn = self.txn.take().unwrap(); | ||
let version = txn.version(); | ||
txn.rollback()?; | ||
Ok(StatementResult::Rollback { version }) | ||
} | ||
// TODO: this needs testing. | ||
ast::Statement::Explain(statement) => self.with_txn_read_only(|txn| { | ||
Ok(StatementResult::Explain(Plan::build(*statement, txn)?.optimize(txn)?)) | ||
}), | ||
statement if self.txn.is_some() => { | ||
Self::execute_with(statement, self.txn.as_mut().unwrap()) | ||
} | ||
statement @ ast::Statement::Select { .. } => { | ||
let mut txn = self.engine.begin_read_only()?; | ||
let result = Self::execute_with(statement, &mut txn); | ||
txn.rollback()?; | ||
result | ||
} | ||
statement => { | ||
let mut txn = self.engine.begin()?; | ||
let result = Self::execute_with(statement, &mut txn); | ||
match &result { | ||
Ok(_) => txn.commit()?, | ||
Err(_) => txn.rollback()?, | ||
} | ||
result | ||
} | ||
} | ||
} | ||
|
||
/// Helper function to execute a statement with the given transaction. | ||
/// Allows using a mutable borrow either to the session's transaction | ||
/// or a temporary read-only or read/write transaction. | ||
/// | ||
/// TODO: reconsider this. | ||
fn execute_with( | ||
statement: ast::Statement, | ||
txn: &mut E::Transaction, | ||
) -> Result<StatementResult> { | ||
Plan::build(statement, txn)?.optimize(txn)?.execute(txn)?.try_into() | ||
} | ||
|
||
/// Runs a read-only closure in the session's transaction, or a new | ||
/// read-only transaction if none is active. | ||
/// | ||
/// TODO: reconsider this. | ||
pub fn with_txn_read_only<F, R>(&mut self, f: F) -> Result<R> | ||
where | ||
F: FnOnce(&mut E::Transaction) -> Result<R>, | ||
{ | ||
if let Some(ref mut txn) = self.txn { | ||
return f(txn); | ||
} | ||
let mut txn = self.engine.begin_read_only()?; | ||
let result = f(&mut txn); | ||
txn.rollback()?; | ||
result | ||
} | ||
} | ||
|
||
impl Session<Raft> { | ||
pub fn status(&self) -> Result<Status> { | ||
self.engine.status() | ||
} | ||
} | ||
|
||
impl<E: Engine + 'static> Drop for Session<E> { | ||
fn drop(&mut self) { | ||
self.execute("ROLLBACK").ok(); | ||
} | ||
} | ||
|
||
/// A row scan iterator | ||
pub type Scan = Box<dyn DoubleEndedIterator<Item = Result<Row>> + Send>; | ||
|
||
/// An index scan iterator | ||
pub type IndexScan = Box<dyn DoubleEndedIterator<Item = Result<(Value, HashSet<Value>)>> + Send>; | ||
|
||
/// A session statement result. This is also sent across the wire to SQL | ||
/// clients. | ||
#[derive(Debug, PartialEq, Serialize, Deserialize)] | ||
pub enum StatementResult { | ||
// Transaction started | ||
Begin { version: u64, read_only: bool }, | ||
// Transaction committed | ||
Commit { version: u64 }, | ||
// Transaction rolled back | ||
Rollback { version: u64 }, | ||
// Rows created | ||
Create { count: u64 }, | ||
// Rows deleted | ||
Delete { count: u64 }, | ||
// Rows updated | ||
Update { count: u64 }, | ||
// Table created | ||
CreateTable { name: String }, | ||
// Table dropped | ||
DropTable { name: String, existed: bool }, | ||
// Query result. | ||
// | ||
// For simplicity, buffer and send the entire result as a vector instead of | ||
// streaming it to the client. Streaming reads haven't been implemented from | ||
// Raft either. | ||
Query { columns: Columns, rows: Vec<Row> }, | ||
// Explain result | ||
Explain(Plan), | ||
} | ||
|
||
impl StatementResult { | ||
/// Converts the ResultSet into a row, or errors if not a query result with rows. | ||
pub fn into_row(self) -> Result<Row> { | ||
self.into_rows()?.next().transpose()?.ok_or(errdata!("no rows returned")) | ||
} | ||
|
||
/// Converts the ResultSet into a row iterator, or errors if not a query | ||
/// result with rows. | ||
pub fn into_rows(self) -> Result<Rows> { | ||
if let StatementResult::Query { rows, .. } = self { | ||
Ok(Box::new(rows.into_iter().map(Ok))) | ||
} else { | ||
errdata!("not a query result: {self:?}") | ||
} | ||
} | ||
|
||
/// Converts the ResultSet into a value, if possible. | ||
/// TODO: use TryFrom for this, also to primitive types via Value as TryFrom. | ||
pub fn into_value(self) -> Result<Value> { | ||
self.into_row()?.into_iter().next().ok_or(errdata!("no value returned")) | ||
} | ||
} | ||
|
||
// TODO: remove or revisit this. | ||
impl TryFrom<ExecutionResult> for StatementResult { | ||
type Error = Error; | ||
|
||
fn try_from(result: ExecutionResult) -> Result<Self> { | ||
Ok(match result { | ||
ExecutionResult::CreateTable { name } => StatementResult::CreateTable { name }, | ||
ExecutionResult::DropTable { name, existed } => { | ||
StatementResult::DropTable { name, existed } | ||
} | ||
ExecutionResult::Delete { count } => StatementResult::Delete { count }, | ||
ExecutionResult::Insert { count } => StatementResult::Create { count }, | ||
ExecutionResult::Select { iter } => StatementResult::Query { | ||
columns: iter.columns, | ||
rows: iter.rows.collect::<Result<_>>()?, | ||
}, | ||
ExecutionResult::Update { count } => StatementResult::Update { count }, | ||
}) | ||
} | ||
} | ||
pub use session::{Session, StatementResult}; |
Oops, something went wrong.