diff --git a/src/sql/engine/engine.rs b/src/sql/engine/engine.rs index bf7c0389a..aa3871b7d 100644 --- a/src/sql/engine/engine.rs +++ b/src/sql/engine/engine.rs @@ -7,7 +7,7 @@ use crate::sql::types::schema::Table; use crate::sql::types::{Expression, Row, Rows, Value}; use crate::storage::mvcc; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// The SQL engine interface. pub trait Engine<'a>: Sized { @@ -29,6 +29,10 @@ pub trait Engine<'a>: Sized { /// A SQL transaction. /// +/// All methods operate on row batches rather than single rows to amortize the +/// cost. We have to do a Raft roundtrip for every call, and we'd rather not +/// have to do a Raft roundtrip for every row. +/// /// TODO: split out Catalog trait and don't have Transaction depend on it. This /// enforces cleaner separation of when catalog access is valid (i.e. during /// planning but not execution). @@ -43,24 +47,22 @@ pub trait Transaction: Catalog { /// Rolls back the transaction. fn rollback(self) -> Result<()>; - /// TOOD: consider operating on batches of IDs/rows. - - /// Deletes a set of table row by primary key. - fn delete(&self, table: &str, id: &Value) -> Result<()>; - /// Fetches a table row by primary key. - fn get(&self, table: &str, id: &Value) -> Result>; - /// Inserts a new table row. - fn insert(&self, table: &str, row: Row) -> Result<()>; + /// Deletes table rows by primary key. + fn delete(&self, table: &str, ids: &[Value]) -> Result<()>; + /// Fetches table rows by primary key. + fn get(&self, table: &str, ids: &[Value]) -> Result>; + /// Inserts new table rows. + fn insert(&self, table: &str, rows: Vec) -> Result<()>; /// Looks up a set of table primary keys by an index value. /// TODO: should this just return a Vec instead? - fn lookup_index(&self, table: &str, column: &str, value: &Value) -> Result>; + fn lookup_index(&self, table: &str, column: &str, values: &[Value]) -> Result>; /// Scans a table's rows, optionally applying the given filter. fn scan(&self, table: &str, filter: Option) -> Result; /// Scans a column's index entries. /// TODO: this is only used for tests. Remove it? fn scan_index(&self, table: &str, column: &str) -> Result; - /// Updates a table row by primary key. - fn update(&self, table: &str, id: &Value, row: Row) -> Result<()>; + /// Updates table rows by primary key. + fn update(&self, table: &str, rows: HashMap) -> Result<()>; } /// An index scan iterator. diff --git a/src/sql/engine/local.rs b/src/sql/engine/local.rs index bd1f4654f..8ece37b2c 100644 --- a/src/sql/engine/local.rs +++ b/src/sql/engine/local.rs @@ -9,7 +9,7 @@ use crate::{errdata, errinput}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::clone::Clone; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// A SQL engine using a local storage engine. pub struct Local { @@ -118,70 +118,87 @@ impl super::Transaction for Transaction { self.txn.rollback() } - fn insert(&self, table: &str, row: Row) -> Result<()> { + fn insert(&self, table: &str, rows: Vec) -> Result<()> { let table = self.must_get_table(table)?; - table.validate_row(&row, self)?; - let id = table.get_row_key(&row)?; - if self.get(&table.name, &id)?.is_some() { - return errinput!("primary key {id} already exists for table {}", table.name); - } - self.txn.set(&Key::Row((&table.name).into(), (&id).into()).encode(), row.encode())?; + for row in rows { + table.validate_row(&row, self)?; + let id = table.get_row_key(&row)?; + if !self.get(&table.name, &[id.clone()])?.is_empty() { + return errinput!("primary key {id} already exists for table {}", table.name); + } + self.txn.set(&Key::Row((&table.name).into(), (&id).into()).encode(), row.encode())?; - // Update indexes - for (i, column) in table.columns.iter().enumerate().filter(|(_, c)| c.index) { - let mut index = self.index_load(&table.name, &column.name, &row[i])?; - index.insert(id.clone()); - self.index_save(&table.name, &column.name, &row[i], index)?; + // Update indexes + for (i, column) in table.columns.iter().enumerate().filter(|(_, c)| c.index) { + let mut index = self.index_load(&table.name, &column.name, &row[i])?; + index.insert(id.clone()); + self.index_save(&table.name, &column.name, &row[i], index)?; + } } Ok(()) } - fn delete(&self, table: &str, id: &Value) -> Result<()> { - let table = self.must_get_table(table)?; - for (t, cs) in self.references(&table.name, true)? { - let t = self.must_get_table(&t)?; - let cs = cs - .into_iter() - .map(|c| Ok((t.get_column_index(&c)?, c))) - .collect::>>()?; - let mut scan = self.scan(&t.name, None)?; - while let Some(row) = scan.next().transpose()? { - for (i, c) in &cs { - if &row[*i] == id && (table.name != t.name || id != &table.get_row_key(&row)?) { - return errinput!( - "primary key {id} referenced by table {} column {c}", - t.name - ); + fn delete(&self, table: &str, ids: &[Value]) -> Result<()> { + // TODO: try to be more clever than simply iterating over each ID. + for id in ids { + let table = self.must_get_table(table)?; + for (t, cs) in self.references(&table.name, true)? { + let t = self.must_get_table(&t)?; + let cs = cs + .into_iter() + .map(|c| Ok((t.get_column_index(&c)?, c))) + .collect::>>()?; + let mut scan = self.scan(&t.name, None)?; + while let Some(row) = scan.next().transpose()? { + for (i, c) in &cs { + if &row[*i] == id + && (table.name != t.name || id != &table.get_row_key(&row)?) + { + return errinput!( + "primary key {id} referenced by table {} column {c}", + t.name + ); + } } } } - } - let indexes: Vec<_> = table.columns.iter().enumerate().filter(|(_, c)| c.index).collect(); - if !indexes.is_empty() { - if let Some(row) = self.get(&table.name, id)? { - for (i, column) in indexes { - let mut index = self.index_load(&table.name, &column.name, &row[i])?; - index.remove(id); - self.index_save(&table.name, &column.name, &row[i], index)?; + let indexes: Vec<_> = + table.columns.iter().enumerate().filter(|(_, c)| c.index).collect(); + if !indexes.is_empty() { + for row in self.get(&table.name, &[id.clone()])? { + for (i, column) in &indexes { + let mut index = self.index_load(&table.name, &column.name, &row[*i])?; + index.remove(id); + self.index_save(&table.name, &column.name, &row[*i], index)?; + } } } + self.txn.delete(&Key::Row(table.name.into(), id.into()).encode())?; } - self.txn.delete(&Key::Row(table.name.into(), id.into()).encode()) + Ok(()) } - fn get(&self, table: &str, id: &Value) -> Result> { - self.txn - .get(&Key::Row(table.into(), id.into()).encode())? - .map(|v| Row::decode(&v)) - .transpose() + fn get(&self, table: &str, ids: &[Value]) -> Result> { + ids.iter() + .filter_map(|id| { + self.txn + .get(&Key::Row(table.into(), id.into()).encode()) + .transpose() + .map(|r| r.and_then(|v| Row::decode(&v))) + }) + .collect() } - fn lookup_index(&self, table: &str, column: &str, value: &Value) -> Result> { + fn lookup_index(&self, table: &str, column: &str, value: &[Value]) -> Result> { if !self.must_get_table(table)?.get_column(column)?.index { return errinput!("no index on {table}.{column}"); } - self.index_load(table, column, value) + let mut pks = HashSet::new(); + for v in value { + pks.extend(self.index_load(table, column, v)?) + } + Ok(pks) } fn scan(&self, table: &str, filter: Option) -> Result { @@ -233,35 +250,40 @@ impl super::Transaction for Transaction { )) } - fn update(&self, table: &str, id: &Value, row: Row) -> Result<()> { + fn update(&self, table: &str, rows: HashMap) -> Result<()> { let table = self.must_get_table(table)?; - // If the primary key changes we do a delete and create, otherwise we replace the row - if id != &table.get_row_key(&row)? { - self.delete(&table.name, id)?; - self.insert(&table.name, row)?; - return Ok(()); - } + // TODO: be more clever than just iterating here. + for (id, row) in rows { + // If the primary key changes we do a delete and create, otherwise we replace the row + if id != table.get_row_key(&row)? { + self.delete(&table.name, &[id.clone()])?; + self.insert(&table.name, vec![row])?; + return Ok(()); + } - // Update indexes, knowing that the primary key has not changed - let indexes: Vec<_> = table.columns.iter().enumerate().filter(|(_, c)| c.index).collect(); - if !indexes.is_empty() { - let old = self.get(&table.name, id)?.unwrap(); - for (i, column) in indexes { - if old[i] == row[i] { - continue; - } - let mut index = self.index_load(&table.name, &column.name, &old[i])?; - index.remove(id); - self.index_save(&table.name, &column.name, &old[i], index)?; + // Update indexes, knowing that the primary key has not changed + let indexes: Vec<_> = + table.columns.iter().enumerate().filter(|(_, c)| c.index).collect(); + if !indexes.is_empty() { + let old = self.get(&table.name, &[id.clone()])?.remove(0); + for (i, column) in indexes { + if old[i] == row[i] { + continue; + } + let mut index = self.index_load(&table.name, &column.name, &old[i])?; + index.remove(&id); + self.index_save(&table.name, &column.name, &old[i], index)?; - let mut index = self.index_load(&table.name, &column.name, &row[i])?; - index.insert(id.clone()); - self.index_save(&table.name, &column.name, &row[i], index)?; + let mut index = self.index_load(&table.name, &column.name, &row[i])?; + index.insert(id.clone()); + self.index_save(&table.name, &column.name, &row[i], index)?; + } } - } - table.validate_row(&row, self)?; - self.txn.set(&Key::Row(table.name.into(), id.into()).encode(), row.encode()) + table.validate_row(&row, self)?; + self.txn.set(&Key::Row((&table.name).into(), id.into()).encode(), row.encode())?; + } + Ok(()) } } @@ -281,7 +303,7 @@ impl Catalog for Transaction { } let mut scan = self.scan(&table.name, None)?; while let Some(row) = scan.next().transpose()? { - self.delete(&table.name, &table.get_row_key(&row)?)? + self.delete(&table.name, &[table.get_row_key(&row)?])? } self.txn.delete(&Key::Table(table.name.into()).encode()) } diff --git a/src/sql/engine/raft.rs b/src/sql/engine/raft.rs index 6e52e99c7..1dac78b87 100644 --- a/src/sql/engine/raft.rs +++ b/src/sql/engine/raft.rs @@ -9,7 +9,7 @@ use crate::storage::{self, mvcc, mvcc::TransactionState}; use crossbeam::channel::Sender; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// A Raft state machine mutation. /// @@ -24,11 +24,11 @@ enum Mutation { Rollback(TransactionState), /// Creates a new row - Create { txn: TransactionState, table: String, row: Row }, + Create { txn: TransactionState, table: String, rows: Vec }, /// Deletes a row - Delete { txn: TransactionState, table: String, id: Value }, + Delete { txn: TransactionState, table: String, ids: Vec }, /// Updates a row - Update { txn: TransactionState, table: String, id: Value, row: Row }, + Update { txn: TransactionState, table: String, rows: HashMap }, /// Creates a table CreateTable { txn: TransactionState, schema: Table }, @@ -49,9 +49,9 @@ enum Query { Status, /// Reads a row - Read { txn: TransactionState, table: String, id: Value }, + Read { txn: TransactionState, table: String, ids: Vec }, /// Reads an index entry - ReadIndex { txn: TransactionState, table: String, column: String, value: Value }, + ReadIndex { txn: TransactionState, table: String, column: String, values: Vec }, /// Scans a table's rows Scan { txn: TransactionState, table: String, filter: Option }, /// Scans an index @@ -197,36 +197,36 @@ impl super::Transaction for Transaction { Ok(()) } - fn insert(&self, table: &str, row: Row) -> Result<()> { + fn insert(&self, table: &str, rows: Vec) -> Result<()> { self.client.mutate(Mutation::Create { txn: self.state.clone(), table: table.to_string(), - row, + rows, }) } - fn delete(&self, table: &str, id: &Value) -> Result<()> { + fn delete(&self, table: &str, ids: &[Value]) -> Result<()> { self.client.mutate(Mutation::Delete { txn: self.state.clone(), table: table.to_string(), - id: id.clone(), + ids: ids.to_vec(), }) } - fn get(&self, table: &str, id: &Value) -> Result> { + fn get(&self, table: &str, ids: &[Value]) -> Result> { self.client.query(Query::Read { txn: self.state.clone(), table: table.to_string(), - id: id.clone(), + ids: ids.to_vec(), }) } - fn lookup_index(&self, table: &str, column: &str, value: &Value) -> Result> { + fn lookup_index(&self, table: &str, column: &str, values: &[Value]) -> Result> { self.client.query(Query::ReadIndex { txn: self.state.clone(), table: table.to_string(), column: column.to_string(), - value: value.clone(), + values: values.to_vec(), }) } @@ -256,12 +256,11 @@ impl super::Transaction for Transaction { )) } - fn update(&self, table: &str, id: &Value, row: Row) -> Result<()> { + fn update(&self, table: &str, rows: HashMap) -> Result<()> { self.client.mutate(Mutation::Update { txn: self.state.clone(), table: table.to_string(), - id: id.clone(), - row, + rows, }) } } @@ -311,14 +310,14 @@ impl State { Mutation::Commit(txn) => bincode::serialize(&self.engine.resume(txn)?.commit()?), Mutation::Rollback(txn) => bincode::serialize(&self.engine.resume(txn)?.rollback()?), - Mutation::Create { txn, table, row } => { - bincode::serialize(&self.engine.resume(txn)?.insert(&table, row)?) + Mutation::Create { txn, table, rows } => { + bincode::serialize(&self.engine.resume(txn)?.insert(&table, rows)?) } - Mutation::Delete { txn, table, id } => { - bincode::serialize(&self.engine.resume(txn)?.delete(&table, &id)?) + Mutation::Delete { txn, table, ids } => { + bincode::serialize(&self.engine.resume(txn)?.delete(&table, &ids)?) } - Mutation::Update { txn, table, id, row } => { - bincode::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?) + Mutation::Update { txn, table, rows } => { + bincode::serialize(&self.engine.resume(txn)?.update(&table, rows)?) } Mutation::CreateTable { txn, schema } => { @@ -365,9 +364,9 @@ impl raft::State for State { }; txn.state().encode() } - Query::Read { txn, table, id } => self.engine.resume(txn)?.get(&table, &id)?.encode(), - Query::ReadIndex { txn, table, column, value } => { - self.engine.resume(txn)?.lookup_index(&table, &column, &value)?.encode() + Query::Read { txn, table, ids } => self.engine.resume(txn)?.get(&table, &ids)?.encode(), + Query::ReadIndex { txn, table, column, values } => { + self.engine.resume(txn)?.lookup_index(&table, &column, &values)?.encode() } // FIXME These need to stream rows somehow Query::Scan { txn, table, filter } => { diff --git a/src/sql/execution/source.rs b/src/sql/execution/source.rs index 7d581ddca..608a5b6e7 100644 --- a/src/sql/execution/source.rs +++ b/src/sql/execution/source.rs @@ -25,16 +25,9 @@ pub(super) fn lookup_key( ) -> Result { // TODO: move catalog lookup elsewhere and make this infallible. let table = txn.must_get_table(table)?; - - // TODO: don't collect into vec, requires shared txn borrow. - let rows = keys - .into_iter() - .filter_map(|key| txn.get(&table.name, &key).transpose()) - .collect::>>()?; - Ok(QueryIterator { columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), - rows: Box::new(rows.into_iter().map(Ok)), + rows: Box::new(txn.get(&table.name, &keys)?.into_iter().map(Ok)), }) } @@ -47,18 +40,8 @@ pub(super) fn lookup_index( ) -> Result { // TODO: pass in from planner. let table = txn.must_get_table(table)?; - - // TODO: consider cleaning up. - let mut pks = std::collections::HashSet::new(); - for value in values { - pks.extend(txn.lookup_index(&table.name, column, &value)?); - } - - // TODO: use a shared txn borrow instead. - let rows = pks - .into_iter() - .filter_map(|pk| txn.get(&table.name, &pk).transpose()) - .collect::>>()?; + let pks: Vec<_> = txn.lookup_index(&table.name, column, &values)?.into_iter().collect(); + let rows = txn.get(&table.name, &pks)?; Ok(QueryIterator { columns: table.columns.iter().map(|c| Column { name: Some(c.name.clone()) }).collect(), diff --git a/src/sql/execution/write.rs b/src/sql/execution/write.rs index 412aedf54..c1a94c123 100644 --- a/src/sql/execution/write.rs +++ b/src/sql/execution/write.rs @@ -7,18 +7,13 @@ use crate::sql::types::{Expression, Row, Value}; /// Deletes rows, taking primary keys from the source (i.e. DELETE). /// Returns the number of rows deleted. -pub(super) fn delete( - txn: &impl Transaction, - table: &str, - mut source: QueryIterator, -) -> Result { +pub(super) fn delete(txn: &impl Transaction, table: &str, source: QueryIterator) -> Result { // TODO: should be prepared by planner. let table = txn.must_get_table(table)?; - let mut count = 0; - while let Some(row) = source.next().transpose()? { - txn.delete(&table.name, &table.get_row_key(&row)?)?; - count += 1 - } + let ids: Vec<_> = + source.map(|r| r.and_then(|row| table.get_row_key(&row))).collect::>()?; + let count = ids.len() as u64; + txn.delete(&table.name, &ids)?; Ok(count) } @@ -32,7 +27,7 @@ pub(super) fn insert( values: Vec>, ) -> Result { let table = txn.must_get_table(table)?; - let mut count = 0; + let mut rows = Vec::with_capacity(values.len()); for expressions in values { let mut row = expressions.into_iter().map(|expr| expr.evaluate(None)).collect::>()?; @@ -41,9 +36,10 @@ pub(super) fn insert( } else { row = make_row(&table, &columns, row)?; } - txn.insert(&table.name, row)?; - count += 1; + rows.push(row); } + let count = rows.len() as u64; + txn.insert(&table.name, rows)?; Ok(count) } @@ -56,29 +52,18 @@ pub(super) fn update( expressions: Vec<(usize, Expression)>, ) -> Result { let table = txn.must_get_table(table)?; - // The iterator will see our changes, such that the same item may be - // iterated over multiple times. We keep track of the primary keys here - // to avoid that, althought it may cause ballooning memory usage for - // large updates. - // - // FIXME This is not safe for primary key updates, which may still be - // processed multiple times - it should be possible to come up with a - // pathological case that loops forever (e.g. UPDATE test SET id = id + - // 1). - let mut updated = std::collections::HashSet::new(); + let mut update = std::collections::HashMap::new(); while let Some(row) = source.next().transpose()? { let id = table.get_row_key(&row)?; - if updated.contains(&id) { - continue; - } let mut new = row.clone(); for (field, expr) in &expressions { new[*field] = expr.evaluate(Some(&row))?; } - txn.update(&table.name, &id, new)?; - updated.insert(id); + update.insert(id, new); } - Ok(updated.len() as u64) + let count = update.len() as u64; + txn.update(&table.name, update)?; + Ok(count) } // Builds a row from a set of column names and values, padding it with default values. diff --git a/src/sql/types/schema.rs b/src/sql/types/schema.rs index 52c2e3738..4d7de5383 100644 --- a/src/sql/types/schema.rs +++ b/src/sql/types/schema.rs @@ -213,7 +213,7 @@ impl Column { Value::Null => Ok(()), Value::Float(f) if f.is_nan() => Ok(()), v if target == &table.name && v == pk => Ok(()), - v if txn.get(target, v)?.is_none() => { + v if txn.get(target, &[v.clone()])?.is_empty() => { errinput!("referenced primary key {v} in table {target} does not exist",) } _ => Ok(()),