From 3457a9fbe66a224ecc87997604761b7a52e2fa6f Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 16 Jun 2024 17:01:30 +0200 Subject: [PATCH] sql: move `ResultSet` to `engine::StatementResult` and simplify --- src/bin/toysql.rs | 30 +++++---- src/bin/workload.rs | 4 +- src/client.rs | 26 +++----- src/lib.rs | 2 +- src/server.rs | 32 +--------- src/sql/engine/mod.rs | 104 +++++++++++++++++++++++++++---- src/sql/execution/mod.rs | 114 +--------------------------------- tests/e2e/client.rs | 72 +++++++++++---------- tests/e2e/mod.rs | 10 +-- tests/sql/query.rs | 15 +---- tests/sql/query/where_float | 54 +++++++++++++++- tests/sql/query/where_integer | 54 +++++++++++++++- tests/sql/query/where_string | 54 +++++++++++++++- 13 files changed, 326 insertions(+), 245 deletions(-) diff --git a/src/bin/toysql.rs b/src/bin/toysql.rs index 0c2ca1dee..14b4a7671 100644 --- a/src/bin/toysql.rs +++ b/src/bin/toysql.rs @@ -5,13 +5,14 @@ #![warn(clippy::all)] +use itertools::Itertools as _; use rustyline::history::DefaultHistory; use rustyline::validate::{ValidationContext, ValidationResult, Validator}; use rustyline::{error::ReadlineError, Editor, Modifiers}; use rustyline_derive::{Completer, Helper, Highlighter, Hinter}; use toydb::errinput; use toydb::error::{Error, Result}; -use toydb::sql::execution::ResultSet; +use toydb::sql::engine::StatementResult; use toydb::sql::parser::{Lexer, Token}; use toydb::Client; @@ -181,22 +182,22 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, /// Runs a query and displays the results fn execute_query(&mut self, query: &str) -> Result<()> { match self.client.execute(query)? { - ResultSet::Begin { version, read_only } => match read_only { + StatementResult::Begin { version, read_only } => match read_only { false => println!("Began transaction at new version {}", version), true => println!("Began read-only transaction at version {}", version), }, - ResultSet::Commit { version: id } => println!("Committed transaction {}", id), - ResultSet::Rollback { version: id } => println!("Rolled back transaction {}", id), - ResultSet::Create { count } => println!("Created {} rows", count), - ResultSet::Delete { count } => println!("Deleted {} rows", count), - ResultSet::Update { count } => println!("Updated {} rows", count), - ResultSet::CreateTable { name } => println!("Created table {}", name), - ResultSet::DropTable { name, existed } => match existed { + StatementResult::Commit { version: id } => println!("Committed transaction {}", id), + StatementResult::Rollback { version: id } => println!("Rolled back transaction {}", id), + StatementResult::Create { count } => println!("Created {} rows", count), + StatementResult::Delete { count } => println!("Deleted {} rows", count), + StatementResult::Update { count } => println!("Updated {} rows", count), + StatementResult::CreateTable { name } => println!("Created table {}", name), + StatementResult::DropTable { name, existed } => match existed { true => println!("Dropped table {}", name), false => println!("Table {} did not exit", name), }, - ResultSet::Explain(plan) => println!("{}", plan), - ResultSet::Query { columns, mut rows } => { + StatementResult::Explain(plan) => println!("{}", plan), + StatementResult::Query { columns, rows } => { if self.show_headers { println!( "{}", @@ -207,11 +208,8 @@ Storage: {keys} keys, {logical_size} MB logical, {nodes}x {disk_size} MB disk, .join("|") ); } - while let Some(row) = rows.next().transpose()? { - println!( - "{}", - row.into_iter().map(|v| format!("{}", v)).collect::>().join("|") - ); + for row in rows { + println!("{}", row.into_iter().map(|v| format!("{}", v)).join("|")); } } } diff --git a/src/bin/workload.rs b/src/bin/workload.rs index 657e992ff..d12cd9acb 100644 --- a/src/bin/workload.rs +++ b/src/bin/workload.rs @@ -20,7 +20,7 @@ use std::collections::HashSet; use std::io::Write as _; use std::time::Duration; use toydb::error::Result; -use toydb::{Client, ResultSet}; +use toydb::{Client, StatementResult}; fn main() -> Result<()> { let Command { runner, subcommand } = Command::parse(); @@ -337,7 +337,7 @@ impl Workload for Write { r#"INSERT INTO "write" (id, value) VALUES {}"#, item.iter().map(|(id, value)| format!("({}, '{}')", id, value)).join(", ") ); - if let ResultSet::Create { count } = client.execute(&query)? { + if let StatementResult::Create { count } = client.execute(&query)? { assert_eq!(count as usize, batch_size, "Unexpected row count"); } else { panic!("Unexpected result") diff --git a/src/client.rs b/src/client.rs index 9b20c8d39..802998963 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ use crate::encoding::Value as _; use crate::errdata; use crate::error::{Error, Result}; use crate::server::{Request, Response, Status}; -use crate::sql::execution::ResultSet; +use crate::sql::engine::StatementResult; use crate::sql::types::schema::Table; use rand::Rng; @@ -33,27 +33,17 @@ impl Client { } /// Executes a query - pub fn execute(&mut self, query: &str) -> Result { - let mut resultset = match self.call(Request::Execute(query.into()))? { + pub fn execute(&mut self, query: &str) -> Result { + let resultset = match self.call(Request::Execute(query.into()))? { Response::Execute(rs) => rs, response => return errdata!("unexpected response {response:?}"), }; - if let ResultSet::Query { columns, .. } = resultset { - // FIXME We buffer rows for now to avoid lifetime hassles - let mut rows = Vec::new(); - loop { - match Result::::decode_from(&mut self.reader)?? { - Response::Row(Some(row)) => rows.push(row), - Response::Row(None) => break, - response => return errdata!("unexpected response {response:?}"), - } - } - resultset = ResultSet::Query { columns, rows: Box::new(rows.into_iter().map(Ok)) } - }; match &resultset { - ResultSet::Begin { version, read_only } => self.txn = Some((*version, *read_only)), - ResultSet::Commit { .. } => self.txn = None, - ResultSet::Rollback { .. } => self.txn = None, + StatementResult::Begin { version, read_only } => { + self.txn = Some((*version, *read_only)) + } + StatementResult::Commit { .. } => self.txn = None, + StatementResult::Rollback { .. } => self.txn = None, _ => {} } Ok(resultset) diff --git a/src/lib.rs b/src/lib.rs index b14e4f917..f59302862 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,4 +12,4 @@ pub mod storage; pub use client::Client; pub use server::Server; -pub use sql::execution::ResultSet; +pub use sql::engine::StatementResult; diff --git a/src/server.rs b/src/server.rs index b93cd7b1f..e81e6d17e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,8 +2,7 @@ use crate::encoding::{self, Value as _}; use crate::error::{Error, Result}; use crate::raft; use crate::sql; -use crate::sql::engine::Engine as _; -use crate::sql::execution::ResultSet; +use crate::sql::engine::{Engine as _, StatementResult}; use crate::sql::types::schema::{Catalog as _, Table}; use crate::sql::types::Row; use crate::storage; @@ -288,7 +287,7 @@ impl Server { while let Some(request) = Request::maybe_decode_from(&mut reader)? { // Execute request. debug!("Received request {request:?}"); - let mut response = match request { + let response = match request { Request::Execute(query) => session.execute(&query).map(Response::Execute), Request::GetTable(table) => session .with_txn_read_only(|txn| txn.must_read_table(&table)) @@ -304,32 +303,7 @@ impl Server { // Process response. debug!("Returning response {response:?}"); - let mut rows: Box> + Send> = - Box::new(std::iter::empty()); - if let Ok(Response::Execute(ResultSet::Query { rows: ref mut resultrows, .. })) = - &mut response - { - // TODO: don't stream results, for simplicity. - rows = Box::new( - std::mem::replace(resultrows, Box::new(std::iter::empty())) - .map(|result| result.map(|row| Response::Row(Some(row)))) - .chain(std::iter::once(Ok(Response::Row(None)))) - .scan(false, |err_sent, response| match (&err_sent, &response) { - (true, _) => None, - (_, Err(error)) => { - *err_sent = true; - Some(Err(error.clone())) - } - _ => Some(response), - }) - .fuse(), - ); - } - response.encode_into(&mut writer)?; - for row in rows { - row.encode_into(&mut writer)?; - } writer.flush()?; } Ok(()) @@ -354,7 +328,7 @@ impl encoding::Value for Request {} /// A SQL server response. #[derive(Debug, Serialize, Deserialize)] pub enum Response { - Execute(ResultSet), + Execute(StatementResult), Row(Option), GetTable(Table), ListTables(Vec), diff --git a/src/sql/engine/mod.rs b/src/sql/engine/mod.rs index f65d54cba..f91873683 100644 --- a/src/sql/engine/mod.rs +++ b/src/sql/engine/mod.rs @@ -3,14 +3,15 @@ mod kv; pub mod raft; pub use kv::KV; pub use raft::{Raft, Status}; +use serde::{Deserialize, Serialize}; -use super::execution::ResultSet; +use super::execution::ExecutionResult; use super::parser::{ast, Parser}; use super::plan::Plan; use super::types::schema::Catalog; -use super::types::{Expression, Row, Value}; -use crate::errinput; -use crate::error::Result; +use super::types::{Columns, Expression, Row, Rows, Value}; +use crate::error::{Error, Result}; +use crate::{errdata, errinput}; use std::collections::HashSet; @@ -72,7 +73,7 @@ pub struct Session { impl Session { /// Executes a query, managing transaction status for the session - pub fn execute(&mut self, query: &str) -> Result { + pub fn execute(&mut self, query: &str) -> Result { // FIXME We should match on self.txn as well, but get this error: // error[E0009]: cannot bind by-move and by-ref in the same pattern // ...which seems like an arbitrary compiler limitation @@ -82,13 +83,13 @@ impl Session { } ast::Statement::Begin { read_only: true, as_of: None } => { let txn = self.engine.begin_read_only()?; - let result = ResultSet::Begin { version: txn.version(), read_only: true }; + let result = StatementResult::Begin { version: txn.version(), read_only: true }; self.txn = Some(txn); Ok(result) } ast::Statement::Begin { read_only: true, as_of: Some(version) } => { let txn = self.engine.begin_as_of(version)?; - let result = ResultSet::Begin { version, read_only: true }; + let result = StatementResult::Begin { version, read_only: true }; self.txn = Some(txn); Ok(result) } @@ -97,7 +98,7 @@ impl Session { } ast::Statement::Begin { read_only: false, as_of: None } => { let txn = self.engine.begin()?; - let result = ResultSet::Begin { version: txn.version(), read_only: false }; + let result = StatementResult::Begin { version: txn.version(), read_only: false }; self.txn = Some(txn); Ok(result) } @@ -108,17 +109,17 @@ impl Session { let txn = self.txn.take().unwrap(); let version = txn.version(); txn.commit()?; - Ok(ResultSet::Commit { version }) + Ok(StatementResult::Commit { version }) } ast::Statement::Rollback => { let txn = self.txn.take().unwrap(); let version = txn.version(); txn.rollback()?; - Ok(ResultSet::Rollback { version }) + Ok(StatementResult::Rollback { version }) } // TODO: this needs testing. ast::Statement::Explain(statement) => self.with_txn_read_only(|txn| { - Ok(ResultSet::Explain(Plan::build(*statement, txn)?.optimize(txn)?)) + Ok(StatementResult::Explain(Plan::build(*statement, txn)?.optimize(txn)?)) }), statement if self.txn.is_some() => { Self::execute_with(statement, self.txn.as_mut().unwrap()) @@ -146,8 +147,11 @@ impl Session { /// or a temporary read-only or read/write transaction. /// /// TODO: reconsider this. - fn execute_with(statement: ast::Statement, txn: &mut E::Transaction) -> Result { - Ok(Plan::build(statement, txn)?.optimize(txn)?.execute(txn)?.into()) + fn execute_with( + statement: ast::Statement, + txn: &mut E::Transaction, + ) -> Result { + Plan::build(statement, txn)?.optimize(txn)?.execute(txn)?.try_into() } /// Runs a read-only closure in the session's transaction, or a new @@ -185,3 +189,77 @@ pub type Scan = Box> + Send>; /// An index scan iterator pub type IndexScan = Box)>> + Send>; + +/// A session statement result. This is also sent across the wire to SQL +/// clients. +#[derive(Debug, PartialEq, Serialize, Deserialize)] +pub enum StatementResult { + // Transaction started + Begin { version: u64, read_only: bool }, + // Transaction committed + Commit { version: u64 }, + // Transaction rolled back + Rollback { version: u64 }, + // Rows created + Create { count: u64 }, + // Rows deleted + Delete { count: u64 }, + // Rows updated + Update { count: u64 }, + // Table created + CreateTable { name: String }, + // Table dropped + DropTable { name: String, existed: bool }, + // Query result. + // + // For simplicity, buffer and send the entire result as a vector instead of + // streaming it to the client. Streaming reads haven't been implemented from + // Raft either. + Query { columns: Columns, rows: Vec }, + // Explain result + Explain(Plan), +} + +impl StatementResult { + /// Converts the ResultSet into a row, or errors if not a query result with rows. + pub fn into_row(self) -> Result { + self.into_rows()?.next().transpose()?.ok_or(errdata!("no rows returned")) + } + + /// Converts the ResultSet into a row iterator, or errors if not a query + /// result with rows. + pub fn into_rows(self) -> Result { + if let StatementResult::Query { rows, .. } = self { + Ok(Box::new(rows.into_iter().map(Ok))) + } else { + errdata!("not a query result: {self:?}") + } + } + + /// Converts the ResultSet into a value, if possible. + /// TODO: use TryFrom for this, also to primitive types via Value as TryFrom. + pub fn into_value(self) -> Result { + self.into_row()?.into_iter().next().ok_or(errdata!("no value returned")) + } +} + +// TODO: remove or revisit this. +impl TryFrom for StatementResult { + type Error = Error; + + fn try_from(result: ExecutionResult) -> Result { + Ok(match result { + ExecutionResult::CreateTable { name } => StatementResult::CreateTable { name }, + ExecutionResult::DropTable { name, existed } => { + StatementResult::DropTable { name, existed } + } + ExecutionResult::Delete { count } => StatementResult::Delete { count }, + ExecutionResult::Insert { count } => StatementResult::Create { count }, + ExecutionResult::Select { iter } => StatementResult::Query { + columns: iter.columns, + rows: iter.rows.collect::>()?, + }, + ExecutionResult::Update { count } => StatementResult::Update { count }, + }) + } +} diff --git a/src/sql/execution/mod.rs b/src/sql/execution/mod.rs index eae4f9682..a56619cc5 100644 --- a/src/sql/execution/mod.rs +++ b/src/sql/execution/mod.rs @@ -14,13 +14,9 @@ use source::{IndexLookup, KeyLookup, Nothing, Scan}; use super::engine::Transaction; use super::plan::{Node, Plan}; -use super::types::{Columns, Row, Rows, Value}; -use crate::errdata; +use super::types::{Columns, Row, Value}; use crate::error::Result; -use derivative::Derivative; -use serde::{Deserialize, Serialize}; - /// A plan execution result. pub enum ExecutionResult { CreateTable { name: String }, @@ -146,111 +142,3 @@ pub fn execute(node: Node, txn: &mut impl Transaction) -> Result Node::Scan { table, alias: _, filter } => Scan::new(table, filter).execute(txn), } } - -/// An executor result set -/// -/// TODO: rename to StatementResult and move to Session. -#[derive(Derivative, Serialize, Deserialize)] -#[derivative(Debug, PartialEq)] -pub enum ResultSet { - // Transaction started - Begin { - version: u64, - read_only: bool, - }, - // Transaction committed - Commit { - version: u64, - }, - // Transaction rolled back - Rollback { - version: u64, - }, - // Rows created - Create { - count: u64, - }, - // Rows deleted - Delete { - count: u64, - }, - // Rows updated - Update { - count: u64, - }, - // Table created - CreateTable { - name: String, - }, - // Table dropped - DropTable { - name: String, - existed: bool, - }, - // Query result - Query { - columns: Columns, - #[derivative(Debug = "ignore")] - #[derivative(PartialEq = "ignore")] - #[serde(skip, default = "ResultSet::empty_rows")] - rows: Rows, - }, - // Explain result - Explain(Plan), -} - -impl ResultSet { - /// Creates an empty row iterator, for use by serde(default). - fn empty_rows() -> Rows { - Box::new(std::iter::empty()) - } - - /// Converts the ResultSet into a row, or errors if not a query result with rows. - pub fn into_row(self) -> Result { - self.into_rows()?.next().transpose()?.ok_or(errdata!("no rows returned")) - } - - /// Converts the ResultSet into a row iterator, or errors if not a query - /// result with rows. - pub fn into_rows(self) -> Result { - if let ResultSet::Query { rows, .. } = self { - Ok(rows) - } else { - errdata!("not a query result: {self:?}") - } - } - - /// Converts the ResultSet into a value, if possible. - /// TODO: use TryFrom for this, also to primitive types via Value as TryFrom. - pub fn into_value(self) -> Result { - self.into_row()?.into_iter().next().ok_or(errdata!("no value returned")) - } -} - -// TODO: remove or revisit this. -impl From for ResultSet { - fn from(result: ExecutionResult) -> Self { - match result { - ExecutionResult::CreateTable { name } => ResultSet::CreateTable { name }, - ExecutionResult::DropTable { name, existed } => ResultSet::DropTable { name, existed }, - ExecutionResult::Delete { count } => ResultSet::Delete { count }, - ExecutionResult::Insert { count } => ResultSet::Create { count }, - ExecutionResult::Select { iter } => { - ResultSet::Query { columns: iter.columns, rows: iter.rows } - } - ExecutionResult::Update { count } => ResultSet::Update { count }, - } - } -} - -impl From for ResultSet { - fn from(iter: QueryIterator) -> Self { - Self::Query { columns: iter.columns, rows: iter.rows } - } -} - -impl From for Result { - fn from(value: QueryIterator) -> Self { - Ok(value.into()) - } -} diff --git a/tests/e2e/client.rs b/tests/e2e/client.rs index 8c243cf6a..6ba16c787 100644 --- a/tests/e2e/client.rs +++ b/tests/e2e/client.rs @@ -3,7 +3,7 @@ use super::{assert_row, assert_rows, dataset, TestCluster}; use toydb::error::{Error, Result}; use toydb::raft; use toydb::server::Status; -use toydb::sql::execution::ResultSet; +use toydb::sql::engine::StatementResult; use toydb::sql::types::schema; use toydb::sql::types::{Column, DataType, Value}; use toydb::storage; @@ -165,29 +165,24 @@ fn execute() -> Result<()> { let result = c.execute("SELECT * FROM genres")?; assert_eq!( result, - ResultSet::Query { + StatementResult::Query { columns: vec![Column { name: Some("id".into()) }, Column { name: Some("name".into()) }], - rows: Box::new(std::iter::empty()), + rows: vec![ + vec![Value::Integer(1), Value::String("Science Fiction".into())], + vec![Value::Integer(2), Value::String("Action".into())], + vec![Value::Integer(3), Value::String("Comedy".into())], + ], } ); - assert_rows( - result, - vec![ - vec![Value::Integer(1), Value::String("Science Fiction".into())], - vec![Value::Integer(2), Value::String("Action".into())], - vec![Value::Integer(3), Value::String("Comedy".into())], - ], - ); let result = c.execute("SELECT * FROM genres WHERE FALSE")?; assert_eq!( result, - ResultSet::Query { + StatementResult::Query { columns: vec![Column { name: Some("id".into()) }, Column { name: Some("name".into()) }], - rows: Box::new(std::iter::empty()), + rows: vec![], } ); - assert_rows(result, Vec::new()); assert_eq!( c.execute("SELECT * FROM x"), @@ -201,7 +196,7 @@ fn execute() -> Result<()> { ); assert_eq!( c.execute("INSERT INTO genres VALUES (9, 'Western')"), - Ok(ResultSet::Create { count: 1 }), + Ok(StatementResult::Create { count: 1 }), ); assert_eq!( c.execute("INSERT INTO x VALUES (9, 'Western')"), @@ -211,11 +206,11 @@ fn execute() -> Result<()> { // UPDATE assert_eq!( c.execute("UPDATE genres SET name = 'Horror' WHERE FALSE"), - Ok(ResultSet::Update { count: 0 }), + Ok(StatementResult::Update { count: 0 }), ); assert_eq!( c.execute("UPDATE genres SET name = 'Horror' WHERE id = 9"), - Ok(ResultSet::Update { count: 1 }), + Ok(StatementResult::Update { count: 1 }), ); assert_eq!( c.execute("UPDATE genres SET id = 1 WHERE id = 9"), @@ -223,8 +218,14 @@ fn execute() -> Result<()> { ); // DELETE - assert_eq!(c.execute("DELETE FROM genres WHERE FALSE"), Ok(ResultSet::Delete { count: 0 }),); - assert_eq!(c.execute("DELETE FROM genres WHERE id = 9"), Ok(ResultSet::Delete { count: 1 }),); + assert_eq!( + c.execute("DELETE FROM genres WHERE FALSE"), + Ok(StatementResult::Delete { count: 0 }), + ); + assert_eq!( + c.execute("DELETE FROM genres WHERE id = 9"), + Ok(StatementResult::Delete { count: 1 }), + ); assert_eq!( c.execute("DELETE FROM genres WHERE x = 1"), Err(Error::InvalidInput("unknown field x".into())) @@ -242,10 +243,10 @@ fn execute_txn() -> Result<()> { assert_eq!(c.txn(), None); // Committing a change in a txn should work - assert_eq!(c.execute("BEGIN")?, ResultSet::Begin { version: 2, read_only: false }); + assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 2, read_only: false }); assert_eq!(c.txn(), Some((2, false))); c.execute("INSERT INTO genres VALUES (4, 'Drama')")?; - assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 2 }); + assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 2 }); assert_eq!(c.txn(), None); assert_row( c.execute("SELECT * FROM genres WHERE id = 4")?, @@ -254,19 +255,22 @@ fn execute_txn() -> Result<()> { assert_eq!(c.txn(), None); // Rolling back a change in a txn should also work - assert_eq!(c.execute("BEGIN")?, ResultSet::Begin { version: 3, read_only: false }); + assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 3, read_only: false }); assert_eq!(c.txn(), Some((3, false))); c.execute("INSERT INTO genres VALUES (5, 'Musical')")?; assert_row( c.execute("SELECT * FROM genres WHERE id = 5")?, vec![Value::Integer(5), Value::String("Musical".into())], ); - assert_eq!(c.execute("ROLLBACK")?, ResultSet::Rollback { version: 3 }); + assert_eq!(c.execute("ROLLBACK")?, StatementResult::Rollback { version: 3 }); assert_rows(c.execute("SELECT * FROM genres WHERE id = 5")?, Vec::new()); assert_eq!(c.txn(), None); // Starting a read-only txn should block writes - assert_eq!(c.execute("BEGIN READ ONLY")?, ResultSet::Begin { version: 4, read_only: true }); + assert_eq!( + c.execute("BEGIN READ ONLY")?, + StatementResult::Begin { version: 4, read_only: true } + ); assert_eq!(c.txn(), Some((4, true))); assert_row( c.execute("SELECT * FROM genres WHERE id = 4")?, @@ -277,13 +281,13 @@ fn execute_txn() -> Result<()> { c.execute("SELECT * FROM genres WHERE id = 4")?, vec![Value::Integer(4), Value::String("Drama".into())], ); - assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 4 }); + assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 4 }); // Starting a time-travel txn should work, it shouldn't see recent changes, and it should // block writes assert_eq!( c.execute("BEGIN READ ONLY AS OF SYSTEM TIME 2")?, - ResultSet::Begin { version: 2, read_only: true }, + StatementResult::Begin { version: 2, read_only: true }, ); assert_eq!(c.txn(), Some((2, true))); assert_rows( @@ -295,10 +299,10 @@ fn execute_txn() -> Result<()> { ], ); assert_eq!(c.execute("INSERT INTO genres VALUES (5, 'Musical')"), Err(Error::ReadOnly)); - assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 2 }); + assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 2 }); // A txn should still be usable after an error occurs - assert_eq!(c.execute("BEGIN")?, ResultSet::Begin { version: 4, read_only: false }); + assert_eq!(c.execute("BEGIN")?, StatementResult::Begin { version: 4, read_only: false }); c.execute("INSERT INTO genres VALUES (5, 'Horror')")?; assert_eq!( c.execute("INSERT INTO genres VALUES (5, 'Musical')"), @@ -306,7 +310,7 @@ fn execute_txn() -> Result<()> { ); assert_eq!(c.txn(), Some((4, false))); c.execute("INSERT INTO genres VALUES (6, 'Western')")?; - assert_eq!(c.execute("COMMIT")?, ResultSet::Commit { version: 4 }); + assert_eq!(c.execute("COMMIT")?, StatementResult::Commit { version: 4 }); assert_rows( c.execute("SELECT * FROM genres")?, vec![ @@ -330,8 +334,8 @@ fn execute_txn_concurrent() -> Result<()> { let mut b = tc.connect_any()?; // Concurrent updates should throw a serialization failure on conflict. - assert_eq!(a.execute("BEGIN")?, ResultSet::Begin { version: 2, read_only: false }); - assert_eq!(b.execute("BEGIN")?, ResultSet::Begin { version: 3, read_only: false }); + assert_eq!(a.execute("BEGIN")?, StatementResult::Begin { version: 2, read_only: false }); + assert_eq!(b.execute("BEGIN")?, StatementResult::Begin { version: 3, read_only: false }); assert_row( a.execute("SELECT * FROM genres WHERE id = 1")?, @@ -344,12 +348,12 @@ fn execute_txn_concurrent() -> Result<()> { assert_eq!( a.execute("UPDATE genres SET name = 'x' WHERE id = 1"), - Ok(ResultSet::Update { count: 1 }) + Ok(StatementResult::Update { count: 1 }) ); assert_eq!(b.execute("UPDATE genres SET name = 'y' WHERE id = 1"), Err(Error::Serialization)); - assert_eq!(a.execute("COMMIT"), Ok(ResultSet::Commit { version: 2 })); - assert_eq!(b.execute("ROLLBACK"), Ok(ResultSet::Rollback { version: 3 })); + assert_eq!(a.execute("COMMIT"), Ok(StatementResult::Commit { version: 2 })); + assert_eq!(b.execute("ROLLBACK"), Ok(StatementResult::Rollback { version: 3 })); assert_row( a.execute("SELECT * FROM genres WHERE id = 1")?, diff --git a/tests/e2e/mod.rs b/tests/e2e/mod.rs index cbc0b2cdc..2a0f9e62c 100644 --- a/tests/e2e/mod.rs +++ b/tests/e2e/mod.rs @@ -12,16 +12,18 @@ mod testcluster; use testcluster::TestCluster; /// Asserts that a resultset contains the expected rows. -fn assert_rows(result: toydb::ResultSet, expect: Vec) { +/// +/// TODO: get rid of these. +fn assert_rows(result: toydb::StatementResult, expect: Vec) { match result { - toydb::ResultSet::Query { rows, .. } => { - pretty_assertions::assert_eq!(rows.collect::, _>>().unwrap(), expect) + toydb::StatementResult::Query { rows, .. } => { + pretty_assertions::assert_eq!(rows, expect) } r => panic!("Unexpected result {:?}", r), } } /// Asserts that a resultset contains the single expected row. -fn assert_row(result: toydb::ResultSet, expect: toydb::sql::types::Row) { +fn assert_row(result: toydb::StatementResult, expect: toydb::sql::types::Row) { assert_rows(result, vec![expect]) } diff --git a/tests/sql/query.rs b/tests/sql/query.rs index 68bbb6fa2..45644027e 100644 --- a/tests/sql/query.rs +++ b/tests/sql/query.rs @@ -2,11 +2,9 @@ //! and compares the results with golden files stored under tests/sql/query/ use toydb::errdata; use toydb::error::Result; -use toydb::sql::engine::{Engine, Transaction}; -use toydb::sql::execution::ResultSet; +use toydb::sql::engine::{Engine, StatementResult, Transaction}; use toydb::sql::parser::Parser; use toydb::sql::plan::Plan; -use toydb::sql::types::Row; use goldenfile::Mint; use std::io::Write; @@ -85,18 +83,11 @@ macro_rules! test_query { .and_then(|plan| plan.optimize(&mut txn)) .and_then(|plan| { write!(f, "Explain:\n{}\n\n", plan)?; - plan.execute(&mut txn).map(|r| r.into()) + plan.execute(&mut txn).and_then(|r| r.try_into()) }); match result { - Ok(ResultSet::Query{columns, rows}) => { - let rows: Vec = match rows.collect() { - Ok(rows) => rows, - Err(err) => { - write!(f, " {:?}", err)?; - return Ok(()) - } - }; + Ok(StatementResult::Query{columns, rows}) => { write!(f, "Result:")?; if !columns.is_empty() || !rows.is_empty() { write!(f, " {:?}\n", columns diff --git a/tests/sql/query/where_float b/tests/sql/query/where_float index 7fed96ea2..f56996949 100644 --- a/tests/sql/query/where_float +++ b/tests/sql/query/where_float @@ -3,4 +3,56 @@ Query: SELECT * FROM movies WHERE 3.14 Explain: Scan: movies (3.14) - InvalidData("filter returned 3.14, expected boolean") \ No newline at end of file +Error: invalid data: filter returned 3.14, expected boolean + +AST: Select { + select: [], + from: [ + Table { + name: "movies", + alias: None, + }, + ], + where: Some( + Literal( + Float( + 3.14, + ), + ), + ), + group_by: [], + having: None, + order: [], + offset: None, + limit: None, +} + +Plan: Select( + Filter { + source: Scan { + table: "movies", + alias: None, + filter: None, + }, + predicate: Constant( + Float( + 3.14, + ), + ), + }, +) + +Optimized plan: Select( + Scan { + table: "movies", + alias: None, + filter: Some( + Constant( + Float( + 3.14, + ), + ), + ), + }, +) + diff --git a/tests/sql/query/where_integer b/tests/sql/query/where_integer index 990a85642..6df7f5d6b 100644 --- a/tests/sql/query/where_integer +++ b/tests/sql/query/where_integer @@ -3,4 +3,56 @@ Query: SELECT * FROM movies WHERE 7 Explain: Scan: movies (7) - InvalidData("filter returned 7, expected boolean") \ No newline at end of file +Error: invalid data: filter returned 7, expected boolean + +AST: Select { + select: [], + from: [ + Table { + name: "movies", + alias: None, + }, + ], + where: Some( + Literal( + Integer( + 7, + ), + ), + ), + group_by: [], + having: None, + order: [], + offset: None, + limit: None, +} + +Plan: Select( + Filter { + source: Scan { + table: "movies", + alias: None, + filter: None, + }, + predicate: Constant( + Integer( + 7, + ), + ), + }, +) + +Optimized plan: Select( + Scan { + table: "movies", + alias: None, + filter: Some( + Constant( + Integer( + 7, + ), + ), + ), + }, +) + diff --git a/tests/sql/query/where_string b/tests/sql/query/where_string index ccf37fdd4..bafeedc48 100644 --- a/tests/sql/query/where_string +++ b/tests/sql/query/where_string @@ -3,4 +3,56 @@ Query: SELECT * FROM movies WHERE 'abc' Explain: Scan: movies (abc) - InvalidData("filter returned abc, expected boolean") \ No newline at end of file +Error: invalid data: filter returned abc, expected boolean + +AST: Select { + select: [], + from: [ + Table { + name: "movies", + alias: None, + }, + ], + where: Some( + Literal( + String( + "abc", + ), + ), + ), + group_by: [], + having: None, + order: [], + offset: None, + limit: None, +} + +Plan: Select( + Filter { + source: Scan { + table: "movies", + alias: None, + filter: None, + }, + predicate: Constant( + String( + "abc", + ), + ), + }, +) + +Optimized plan: Select( + Scan { + table: "movies", + alias: None, + filter: Some( + Constant( + String( + "abc", + ), + ), + ), + }, +) +