Skip to content

Commit

Permalink
sql: use row/ID batches in engine trait
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 18, 2024
1 parent a5e12fd commit 6e28774
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 158 deletions.
26 changes: 14 additions & 12 deletions src/sql/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::sql::types::schema::Table;
use crate::sql::types::{Expression, Row, Rows, Value};
use crate::storage::mvcc;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

/// The SQL engine interface.
pub trait Engine<'a>: Sized {
Expand All @@ -29,6 +29,10 @@ pub trait Engine<'a>: Sized {

/// A SQL transaction.
///
/// All methods operate on row batches rather than single rows to amortize the
/// cost. We have to do a Raft roundtrip for every call, and we'd rather not
/// have to do a Raft roundtrip for every row.
///
/// TODO: split out Catalog trait and don't have Transaction depend on it. This
/// enforces cleaner separation of when catalog access is valid (i.e. during
/// planning but not execution).
Expand All @@ -43,24 +47,22 @@ pub trait Transaction: Catalog {
/// Rolls back the transaction.
fn rollback(self) -> Result<()>;

/// TOOD: consider operating on batches of IDs/rows.

/// Deletes a set of table row by primary key.
fn delete(&self, table: &str, id: &Value) -> Result<()>;
/// Fetches a table row by primary key.
fn get(&self, table: &str, id: &Value) -> Result<Option<Row>>;
/// Inserts a new table row.
fn insert(&self, table: &str, row: Row) -> Result<()>;
/// Deletes table rows by primary key.
fn delete(&self, table: &str, ids: &[Value]) -> Result<()>;
/// Fetches table rows by primary key.
fn get(&self, table: &str, ids: &[Value]) -> Result<Vec<Row>>;
/// Inserts new table rows.
fn insert(&self, table: &str, rows: Vec<Row>) -> Result<()>;
/// Looks up a set of table primary keys by an index value.
/// TODO: should this just return a Vec instead?
fn lookup_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>>;
fn lookup_index(&self, table: &str, column: &str, values: &[Value]) -> Result<HashSet<Value>>;
/// Scans a table's rows, optionally applying the given filter.
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Rows>;
/// Scans a column's index entries.
/// TODO: this is only used for tests. Remove it?
fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan>;
/// Updates a table row by primary key.
fn update(&self, table: &str, id: &Value, row: Row) -> Result<()>;
/// Updates table rows by primary key.
fn update(&self, table: &str, rows: HashMap<Value, Row>) -> Result<()>;
}

/// An index scan iterator.
Expand Down
162 changes: 92 additions & 70 deletions src/sql/engine/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{errdata, errinput};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::clone::Clone;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

/// A SQL engine using a local storage engine.
pub struct Local<E: storage::Engine> {
Expand Down Expand Up @@ -118,70 +118,87 @@ impl<E: storage::Engine> super::Transaction for Transaction<E> {
self.txn.rollback()
}

fn insert(&self, table: &str, row: Row) -> Result<()> {
fn insert(&self, table: &str, rows: Vec<Row>) -> Result<()> {
let table = self.must_get_table(table)?;
table.validate_row(&row, self)?;
let id = table.get_row_key(&row)?;
if self.get(&table.name, &id)?.is_some() {
return errinput!("primary key {id} already exists for table {}", table.name);
}
self.txn.set(&Key::Row((&table.name).into(), (&id).into()).encode(), row.encode())?;
for row in rows {
table.validate_row(&row, self)?;
let id = table.get_row_key(&row)?;
if !self.get(&table.name, &[id.clone()])?.is_empty() {
return errinput!("primary key {id} already exists for table {}", table.name);
}
self.txn.set(&Key::Row((&table.name).into(), (&id).into()).encode(), row.encode())?;

// Update indexes
for (i, column) in table.columns.iter().enumerate().filter(|(_, c)| c.index) {
let mut index = self.index_load(&table.name, &column.name, &row[i])?;
index.insert(id.clone());
self.index_save(&table.name, &column.name, &row[i], index)?;
// Update indexes
for (i, column) in table.columns.iter().enumerate().filter(|(_, c)| c.index) {
let mut index = self.index_load(&table.name, &column.name, &row[i])?;
index.insert(id.clone());
self.index_save(&table.name, &column.name, &row[i], index)?;
}
}
Ok(())
}

fn delete(&self, table: &str, id: &Value) -> Result<()> {
let table = self.must_get_table(table)?;
for (t, cs) in self.references(&table.name, true)? {
let t = self.must_get_table(&t)?;
let cs = cs
.into_iter()
.map(|c| Ok((t.get_column_index(&c)?, c)))
.collect::<Result<Vec<_>>>()?;
let mut scan = self.scan(&t.name, None)?;
while let Some(row) = scan.next().transpose()? {
for (i, c) in &cs {
if &row[*i] == id && (table.name != t.name || id != &table.get_row_key(&row)?) {
return errinput!(
"primary key {id} referenced by table {} column {c}",
t.name
);
fn delete(&self, table: &str, ids: &[Value]) -> Result<()> {
// TODO: try to be more clever than simply iterating over each ID.
for id in ids {
let table = self.must_get_table(table)?;
for (t, cs) in self.references(&table.name, true)? {
let t = self.must_get_table(&t)?;
let cs = cs
.into_iter()
.map(|c| Ok((t.get_column_index(&c)?, c)))
.collect::<Result<Vec<_>>>()?;
let mut scan = self.scan(&t.name, None)?;
while let Some(row) = scan.next().transpose()? {
for (i, c) in &cs {
if &row[*i] == id
&& (table.name != t.name || id != &table.get_row_key(&row)?)
{
return errinput!(
"primary key {id} referenced by table {} column {c}",
t.name
);
}
}
}
}
}

let indexes: Vec<_> = table.columns.iter().enumerate().filter(|(_, c)| c.index).collect();
if !indexes.is_empty() {
if let Some(row) = self.get(&table.name, id)? {
for (i, column) in indexes {
let mut index = self.index_load(&table.name, &column.name, &row[i])?;
index.remove(id);
self.index_save(&table.name, &column.name, &row[i], index)?;
let indexes: Vec<_> =
table.columns.iter().enumerate().filter(|(_, c)| c.index).collect();
if !indexes.is_empty() {
for row in self.get(&table.name, &[id.clone()])? {
for (i, column) in &indexes {
let mut index = self.index_load(&table.name, &column.name, &row[*i])?;
index.remove(id);
self.index_save(&table.name, &column.name, &row[*i], index)?;
}
}
}
self.txn.delete(&Key::Row(table.name.into(), id.into()).encode())?;
}
self.txn.delete(&Key::Row(table.name.into(), id.into()).encode())
Ok(())
}

fn get(&self, table: &str, id: &Value) -> Result<Option<Row>> {
self.txn
.get(&Key::Row(table.into(), id.into()).encode())?
.map(|v| Row::decode(&v))
.transpose()
fn get(&self, table: &str, ids: &[Value]) -> Result<Vec<Row>> {
ids.iter()
.filter_map(|id| {
self.txn
.get(&Key::Row(table.into(), id.into()).encode())
.transpose()
.map(|r| r.and_then(|v| Row::decode(&v)))
})
.collect()
}

fn lookup_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>> {
fn lookup_index(&self, table: &str, column: &str, value: &[Value]) -> Result<HashSet<Value>> {
if !self.must_get_table(table)?.get_column(column)?.index {
return errinput!("no index on {table}.{column}");
}
self.index_load(table, column, value)
let mut pks = HashSet::new();
for v in value {
pks.extend(self.index_load(table, column, v)?)
}
Ok(pks)
}

fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Rows> {
Expand Down Expand Up @@ -233,35 +250,40 @@ impl<E: storage::Engine> super::Transaction for Transaction<E> {
))
}

fn update(&self, table: &str, id: &Value, row: Row) -> Result<()> {
fn update(&self, table: &str, rows: HashMap<Value, Row>) -> Result<()> {
let table = self.must_get_table(table)?;
// If the primary key changes we do a delete and create, otherwise we replace the row
if id != &table.get_row_key(&row)? {
self.delete(&table.name, id)?;
self.insert(&table.name, row)?;
return Ok(());
}
// TODO: be more clever than just iterating here.
for (id, row) in rows {
// If the primary key changes we do a delete and create, otherwise we replace the row
if id != table.get_row_key(&row)? {
self.delete(&table.name, &[id.clone()])?;
self.insert(&table.name, vec![row])?;
return Ok(());
}

// Update indexes, knowing that the primary key has not changed
let indexes: Vec<_> = table.columns.iter().enumerate().filter(|(_, c)| c.index).collect();
if !indexes.is_empty() {
let old = self.get(&table.name, id)?.unwrap();
for (i, column) in indexes {
if old[i] == row[i] {
continue;
}
let mut index = self.index_load(&table.name, &column.name, &old[i])?;
index.remove(id);
self.index_save(&table.name, &column.name, &old[i], index)?;
// Update indexes, knowing that the primary key has not changed
let indexes: Vec<_> =
table.columns.iter().enumerate().filter(|(_, c)| c.index).collect();
if !indexes.is_empty() {
let old = self.get(&table.name, &[id.clone()])?.remove(0);
for (i, column) in indexes {
if old[i] == row[i] {
continue;
}
let mut index = self.index_load(&table.name, &column.name, &old[i])?;
index.remove(&id);
self.index_save(&table.name, &column.name, &old[i], index)?;

let mut index = self.index_load(&table.name, &column.name, &row[i])?;
index.insert(id.clone());
self.index_save(&table.name, &column.name, &row[i], index)?;
let mut index = self.index_load(&table.name, &column.name, &row[i])?;
index.insert(id.clone());
self.index_save(&table.name, &column.name, &row[i], index)?;
}
}
}

table.validate_row(&row, self)?;
self.txn.set(&Key::Row(table.name.into(), id.into()).encode(), row.encode())
table.validate_row(&row, self)?;
self.txn.set(&Key::Row((&table.name).into(), id.into()).encode(), row.encode())?;
}
Ok(())
}
}

Expand All @@ -281,7 +303,7 @@ impl<E: storage::Engine> Catalog for Transaction<E> {
}
let mut scan = self.scan(&table.name, None)?;
while let Some(row) = scan.next().transpose()? {
self.delete(&table.name, &table.get_row_key(&row)?)?
self.delete(&table.name, &[table.get_row_key(&row)?])?
}
self.txn.delete(&Key::Table(table.name.into()).encode())
}
Expand Down
51 changes: 25 additions & 26 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::storage::{self, mvcc, mvcc::TransactionState};

use crossbeam::channel::Sender;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

/// A Raft state machine mutation.
///
Expand All @@ -24,11 +24,11 @@ enum Mutation {
Rollback(TransactionState),

/// Creates a new row
Create { txn: TransactionState, table: String, row: Row },
Create { txn: TransactionState, table: String, rows: Vec<Row> },
/// Deletes a row
Delete { txn: TransactionState, table: String, id: Value },
Delete { txn: TransactionState, table: String, ids: Vec<Value> },
/// Updates a row
Update { txn: TransactionState, table: String, id: Value, row: Row },
Update { txn: TransactionState, table: String, rows: HashMap<Value, Row> },

/// Creates a table
CreateTable { txn: TransactionState, schema: Table },
Expand All @@ -49,9 +49,9 @@ enum Query {
Status,

/// Reads a row
Read { txn: TransactionState, table: String, id: Value },
Read { txn: TransactionState, table: String, ids: Vec<Value> },
/// Reads an index entry
ReadIndex { txn: TransactionState, table: String, column: String, value: Value },
ReadIndex { txn: TransactionState, table: String, column: String, values: Vec<Value> },
/// Scans a table's rows
Scan { txn: TransactionState, table: String, filter: Option<Expression> },
/// Scans an index
Expand Down Expand Up @@ -197,36 +197,36 @@ impl super::Transaction for Transaction {
Ok(())
}

fn insert(&self, table: &str, row: Row) -> Result<()> {
fn insert(&self, table: &str, rows: Vec<Row>) -> Result<()> {
self.client.mutate(Mutation::Create {
txn: self.state.clone(),
table: table.to_string(),
row,
rows,
})
}

fn delete(&self, table: &str, id: &Value) -> Result<()> {
fn delete(&self, table: &str, ids: &[Value]) -> Result<()> {
self.client.mutate(Mutation::Delete {
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
ids: ids.to_vec(),
})
}

fn get(&self, table: &str, id: &Value) -> Result<Option<Row>> {
fn get(&self, table: &str, ids: &[Value]) -> Result<Vec<Row>> {
self.client.query(Query::Read {
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
ids: ids.to_vec(),
})
}

fn lookup_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>> {
fn lookup_index(&self, table: &str, column: &str, values: &[Value]) -> Result<HashSet<Value>> {
self.client.query(Query::ReadIndex {
txn: self.state.clone(),
table: table.to_string(),
column: column.to_string(),
value: value.clone(),
values: values.to_vec(),
})
}

Expand Down Expand Up @@ -256,12 +256,11 @@ impl super::Transaction for Transaction {
))
}

fn update(&self, table: &str, id: &Value, row: Row) -> Result<()> {
fn update(&self, table: &str, rows: HashMap<Value, Row>) -> Result<()> {
self.client.mutate(Mutation::Update {
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
row,
rows,
})
}
}
Expand Down Expand Up @@ -311,14 +310,14 @@ impl<E: storage::Engine> State<E> {
Mutation::Commit(txn) => bincode::serialize(&self.engine.resume(txn)?.commit()?),
Mutation::Rollback(txn) => bincode::serialize(&self.engine.resume(txn)?.rollback()?),

Mutation::Create { txn, table, row } => {
bincode::serialize(&self.engine.resume(txn)?.insert(&table, row)?)
Mutation::Create { txn, table, rows } => {
bincode::serialize(&self.engine.resume(txn)?.insert(&table, rows)?)
}
Mutation::Delete { txn, table, id } => {
bincode::serialize(&self.engine.resume(txn)?.delete(&table, &id)?)
Mutation::Delete { txn, table, ids } => {
bincode::serialize(&self.engine.resume(txn)?.delete(&table, &ids)?)
}
Mutation::Update { txn, table, id, row } => {
bincode::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?)
Mutation::Update { txn, table, rows } => {
bincode::serialize(&self.engine.resume(txn)?.update(&table, rows)?)
}

Mutation::CreateTable { txn, schema } => {
Expand Down Expand Up @@ -365,9 +364,9 @@ impl<E: storage::Engine> raft::State for State<E> {
};
txn.state().encode()
}
Query::Read { txn, table, id } => self.engine.resume(txn)?.get(&table, &id)?.encode(),
Query::ReadIndex { txn, table, column, value } => {
self.engine.resume(txn)?.lookup_index(&table, &column, &value)?.encode()
Query::Read { txn, table, ids } => self.engine.resume(txn)?.get(&table, &ids)?.encode(),
Query::ReadIndex { txn, table, column, values } => {
self.engine.resume(txn)?.lookup_index(&table, &column, &values)?.encode()
}
// FIXME These need to stream rows somehow
Query::Scan { txn, table, filter } => {
Expand Down

0 comments on commit 6e28774

Please sign in to comment.