From 3be2413202018666cc22260cc8cedebc09e5d1e9 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 4 Sep 2023 22:39:27 +0200 Subject: [PATCH] Add MVCC TransactionState and use it for Transaction::resume(). This allows passing transaction state across the Raft state machine boundary. Later, this will allow read-only transactions to be truly read-only, omitting the on-disk state tracking of the txn ID, snapshot, and active set. --- src/sql/engine/kv.rs | 14 ++++- src/sql/engine/raft.rs | 138 ++++++++++++++++++++++------------------- src/storage/mvcc.rs | 78 +++++++++++++++-------- tests/client/mod.rs | 2 +- 4 files changed, 138 insertions(+), 94 deletions(-) diff --git a/src/sql/engine/kv.rs b/src/sql/engine/kv.rs index 5f2747fff..b473e264c 100644 --- a/src/sql/engine/kv.rs +++ b/src/sql/engine/kv.rs @@ -28,9 +28,12 @@ impl KV { Self { kv: storage::mvcc::MVCC::new(engine) } } - /// Resumes a transaction with the given ID - pub fn resume(&self, id: u64) -> Result<::Transaction> { - Ok(::Transaction::new(self.kv.resume(id)?)) + /// Resumes a transaction from the given state + pub fn resume( + &self, + state: storage::mvcc::TransactionState, + ) -> Result<::Transaction> { + Ok(::Transaction::new(self.kv.resume(state)?)) } /// Fetches an unversioned metadata value @@ -73,6 +76,11 @@ impl Transaction { Self { txn } } + /// Returns the transaction's serialized state. + pub(super) fn state(&self) -> storage::mvcc::TransactionState { + self.txn.state() + } + /// Loads an index entry fn index_load(&self, table: &str, column: &str, value: &Value) -> Result> { Ok(self diff --git a/src/sql/engine/raft.rs b/src/sql/engine/raft.rs index a694ea9fc..ac2cff597 100644 --- a/src/sql/engine/raft.rs +++ b/src/sql/engine/raft.rs @@ -3,53 +3,57 @@ use super::super::types::{Expression, Row, Value}; use super::{Engine as _, IndexScan, Mode, Scan, Transaction as _}; use crate::error::{Error, Result}; use crate::raft; -use crate::storage; +use crate::storage::{self, mvcc::TransactionState}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; /// A Raft state machine mutation +/// +/// TODO: use Cows for these. #[derive(Clone, Serialize, Deserialize)] enum Mutation { /// Begins a transaction in the given mode Begin(Mode), - /// Commits the transaction with the given ID - Commit(u64), - /// Rolls back the transaction with the given ID - Rollback(u64), + /// Commits the given transaction + Commit(TransactionState), + /// Rolls back the given transaction + Rollback(TransactionState), /// Creates a new row - Create { txn_id: u64, table: String, row: Row }, + Create { txn: TransactionState, table: String, row: Row }, /// Deletes a row - Delete { txn_id: u64, table: String, id: Value }, + Delete { txn: TransactionState, table: String, id: Value }, /// Updates a row - Update { txn_id: u64, table: String, id: Value, row: Row }, + Update { txn: TransactionState, table: String, id: Value, row: Row }, /// Creates a table - CreateTable { txn_id: u64, schema: Table }, + CreateTable { txn: TransactionState, schema: Table }, /// Deletes a table - DeleteTable { txn_id: u64, table: String }, + DeleteTable { txn: TransactionState, table: String }, } /// A Raft state machine query +/// +/// TODO: use Cows for these. #[derive(Clone, Serialize, Deserialize)] enum Query { /// Fetches engine status Status, /// Reads a row - Read { txn_id: u64, table: String, id: Value }, + Read { txn: TransactionState, table: String, id: Value }, /// Reads an index entry - ReadIndex { txn_id: u64, table: String, column: String, value: Value }, + ReadIndex { txn: TransactionState, table: String, column: String, value: Value }, /// Scans a table's rows - Scan { txn_id: u64, table: String, filter: Option }, + Scan { txn: TransactionState, table: String, filter: Option }, /// Scans an index - ScanIndex { txn_id: u64, table: String, column: String }, + ScanIndex { txn: TransactionState, table: String, column: String }, /// Scans the tables - ScanTables { txn_id: u64 }, + ScanTables { txn: TransactionState }, /// Reads a table - ReadTable { txn_id: u64, table: String }, + ReadTable { txn: TransactionState, table: String }, } /// Status for the Raft SQL engine. @@ -110,19 +114,17 @@ impl super::Engine for Raft { pub struct Transaction { /// The underlying Raft cluster client: raft::Client, - /// The transaction ID - id: u64, - /// The transaction mode - mode: Mode, + // Transaction state + state: TransactionState, } impl Transaction { /// Starts a transaction in the given mode fn begin(client: raft::Client, mode: Mode) -> Result { - let id = Raft::deserialize(&futures::executor::block_on( + let state = Raft::deserialize(&futures::executor::block_on( client.mutate(Raft::serialize(&Mutation::Begin(mode))?), )?)?; - Ok(Self { client, id, mode }) + Ok(Self { client, state }) } /// Executes a mutation @@ -138,24 +140,24 @@ impl Transaction { impl super::Transaction for Transaction { fn id(&self) -> u64 { - self.id + self.state.id } fn mode(&self) -> Mode { - self.mode + self.state.mode } fn commit(self) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Commit(self.id))?) + Raft::deserialize(&self.mutate(Mutation::Commit(self.state.clone()))?) } fn rollback(self) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::Rollback(self.id))?) + Raft::deserialize(&self.mutate(Mutation::Rollback(self.state.clone()))?) } fn create(&mut self, table: &str, row: Row) -> Result<()> { Raft::deserialize(&self.mutate(Mutation::Create { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), row, })?) @@ -163,7 +165,7 @@ impl super::Transaction for Transaction { fn delete(&mut self, table: &str, id: &Value) -> Result<()> { Raft::deserialize(&self.mutate(Mutation::Delete { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), id: id.clone(), })?) @@ -171,7 +173,7 @@ impl super::Transaction for Transaction { fn read(&self, table: &str, id: &Value) -> Result> { Raft::deserialize(&self.query(Query::Read { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), id: id.clone(), })?) @@ -179,7 +181,7 @@ impl super::Transaction for Transaction { fn read_index(&self, table: &str, column: &str, value: &Value) -> Result> { Raft::deserialize(&self.query(Query::ReadIndex { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), column: column.to_string(), value: value.clone(), @@ -189,7 +191,7 @@ impl super::Transaction for Transaction { fn scan(&self, table: &str, filter: Option) -> Result { Ok(Box::new( Raft::deserialize::>(&self.query(Query::Scan { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), filter, })?)? @@ -201,7 +203,7 @@ impl super::Transaction for Transaction { fn scan_index(&self, table: &str, column: &str) -> Result { Ok(Box::new( Raft::deserialize::>(&self.query(Query::ScanIndex { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), column: column.to_string(), })?)? @@ -212,7 +214,7 @@ impl super::Transaction for Transaction { fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()> { Raft::deserialize(&self.mutate(Mutation::Update { - txn_id: self.id, + txn: self.state.clone(), table: table.to_string(), id: id.clone(), row, @@ -222,25 +224,32 @@ impl super::Transaction for Transaction { impl Catalog for Transaction { fn create_table(&mut self, table: Table) -> Result<()> { - Raft::deserialize(&self.mutate(Mutation::CreateTable { txn_id: self.id, schema: table })?) + Raft::deserialize( + &self.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table })?, + ) } fn delete_table(&mut self, table: &str) -> Result<()> { Raft::deserialize( - &self.mutate(Mutation::DeleteTable { txn_id: self.id, table: table.to_string() })?, + &self.mutate(Mutation::DeleteTable { + txn: self.state.clone(), + table: table.to_string(), + })?, ) } fn read_table(&self, table: &str) -> Result> { Raft::deserialize( - &self.query(Query::ReadTable { txn_id: self.id, table: table.to_string() })?, + &self.query(Query::ReadTable { txn: self.state.clone(), table: table.to_string() })?, ) } fn scan_tables(&self) -> Result { Ok(Box::new( - Raft::deserialize::>(&self.query(Query::ScanTables { txn_id: self.id })?)? - .into_iter(), + Raft::deserialize::>( + &self.query(Query::ScanTables { txn: self.state.clone() })?, + )? + .into_iter(), )) } } @@ -267,25 +276,28 @@ impl State { /// Applies a state machine mutation fn apply(&mut self, mutation: Mutation) -> Result> { match mutation { - Mutation::Begin(mode) => Raft::serialize(&self.engine.begin(mode)?.id()), - Mutation::Commit(txn_id) => Raft::serialize(&self.engine.resume(txn_id)?.commit()?), - Mutation::Rollback(txn_id) => Raft::serialize(&self.engine.resume(txn_id)?.rollback()?), + Mutation::Begin(mode) => { + let txn = self.engine.begin(mode)?; + Raft::serialize(&txn.state()) + } + Mutation::Commit(txn) => Raft::serialize(&self.engine.resume(txn)?.commit()?), + Mutation::Rollback(txn) => Raft::serialize(&self.engine.resume(txn)?.rollback()?), - Mutation::Create { txn_id, table, row } => { - Raft::serialize(&self.engine.resume(txn_id)?.create(&table, row)?) + Mutation::Create { txn, table, row } => { + Raft::serialize(&self.engine.resume(txn)?.create(&table, row)?) } - Mutation::Delete { txn_id, table, id } => { - Raft::serialize(&self.engine.resume(txn_id)?.delete(&table, &id)?) + Mutation::Delete { txn, table, id } => { + Raft::serialize(&self.engine.resume(txn)?.delete(&table, &id)?) } - Mutation::Update { txn_id, table, id, row } => { - Raft::serialize(&self.engine.resume(txn_id)?.update(&table, &id, row)?) + Mutation::Update { txn, table, id, row } => { + Raft::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?) } - Mutation::CreateTable { txn_id, schema } => { - Raft::serialize(&self.engine.resume(txn_id)?.create_table(schema)?) + Mutation::CreateTable { txn, schema } => { + Raft::serialize(&self.engine.resume(txn)?.create_table(schema)?) } - Mutation::DeleteTable { txn_id, table } => { - Raft::serialize(&self.engine.resume(txn_id)?.delete_table(&table)?) + Mutation::DeleteTable { txn, table } => { + Raft::serialize(&self.engine.resume(txn)?.delete_table(&table)?) } } } @@ -311,30 +323,30 @@ impl raft::State for State { fn query(&self, command: Vec) -> Result> { match Raft::deserialize(&command)? { - Query::Read { txn_id, table, id } => { - Raft::serialize(&self.engine.resume(txn_id)?.read(&table, &id)?) + Query::Read { txn, table, id } => { + Raft::serialize(&self.engine.resume(txn)?.read(&table, &id)?) } - Query::ReadIndex { txn_id, table, column, value } => { - Raft::serialize(&self.engine.resume(txn_id)?.read_index(&table, &column, &value)?) + Query::ReadIndex { txn, table, column, value } => { + Raft::serialize(&self.engine.resume(txn)?.read_index(&table, &column, &value)?) } // FIXME These need to stream rows somehow - Query::Scan { txn_id, table, filter } => Raft::serialize( - &self.engine.resume(txn_id)?.scan(&table, filter)?.collect::>>()?, + Query::Scan { txn, table, filter } => Raft::serialize( + &self.engine.resume(txn)?.scan(&table, filter)?.collect::>>()?, ), - Query::ScanIndex { txn_id, table, column } => Raft::serialize( + Query::ScanIndex { txn, table, column } => Raft::serialize( &self .engine - .resume(txn_id)? + .resume(txn)? .scan_index(&table, &column)? .collect::>>()?, ), Query::Status => Raft::serialize(&self.engine.kv.status()?), - Query::ReadTable { txn_id, table } => { - Raft::serialize(&self.engine.resume(txn_id)?.read_table(&table)?) + Query::ReadTable { txn, table } => { + Raft::serialize(&self.engine.resume(txn)?.read_table(&table)?) } - Query::ScanTables { txn_id } => { - Raft::serialize(&self.engine.resume(txn_id)?.scan_tables()?.collect::>()) + Query::ScanTables { txn } => { + Raft::serialize(&self.engine.resume(txn)?.scan_tables()?.collect::>()) } } } diff --git a/src/storage/mvcc.rs b/src/storage/mvcc.rs index 203afee60..407ecbab2 100644 --- a/src/storage/mvcc.rs +++ b/src/storage/mvcc.rs @@ -8,6 +8,12 @@ use std::iter::Peekable; use std::ops::{Bound, RangeBounds}; use std::sync::{Arc, Mutex, MutexGuard}; +// TODO: for read-only transactions, don't allocate a transaction ID. Instead, +// keep the snapshot in the transaction object, and retain it in the SQL engine +// session to synthesize a new transaction from it (in particular, in the Raft +// engine). This removes the need to persist transaction IDs and snapshots for +// read-only transactions (including AOST transactions). + /// MVCC keys, using the KeyCode encoding which preserves the ordering and /// grouping of keys. Cow byte slices allow encoding borrowed values and /// decoding into owned values. @@ -90,9 +96,9 @@ impl MVCC { Transaction::begin(self.engine.clone(), mode) } - /// Resumes a transaction with the given ID. - pub fn resume(&self, id: u64) -> Result> { - Transaction::resume(self.engine.clone(), id) + /// Resumes a transaction from the given state. + pub fn resume(&self, state: TransactionState) -> Result> { + Transaction::resume(self.engine.clone(), state) } /// Fetches the value of an unversioned key. @@ -137,6 +143,18 @@ pub struct Transaction { snapshot: Snapshot, } +/// A serializable representation of a Transaction's state. It can be exported +/// from a Transaction and later used to instantiate a new Transaction that's +/// functionally equivalent via Transaction::resume(). In particular, this +/// allows passing the transaction between the SQL engine and storage engine +/// across the Raft state machine boundary. +#[derive(Clone, Serialize, Deserialize)] +pub struct TransactionState { + pub id: u64, + pub mode: Mode, + pub snapshot: Snapshot, +} + impl Transaction { /// Begins a new transaction in the given mode. fn begin(engine: Arc>, mode: Mode) -> Result { @@ -161,19 +179,16 @@ impl Transaction { Ok(Self { engine, id, mode, snapshot }) } - /// Resumes an active transaction with the given ID. Errors if the transaction is not active. - fn resume(engine: Arc>, id: u64) -> Result { - let mut session = engine.lock()?; - let mode = match session.get(&Key::TxnActive(id).encode()?)? { - Some(v) => bincode::deserialize(&v)?, - None => return Err(Error::Value(format!("No active transaction {}", id))), - }; - let snapshot = match &mode { - Mode::Snapshot { version } => Snapshot::restore(&mut session, *version)?, - _ => Snapshot::restore(&mut session, id)?, - }; - std::mem::drop(session); - Ok(Self { engine, id, mode, snapshot }) + /// Resumes a transaction from the given state. + fn resume(engine: Arc>, state: TransactionState) -> Result { + // For read-write transactions, verify that the transaction is still + // active before making further writes. + if state.mode == Mode::ReadWrite + && engine.lock()?.get(&Key::TxnActive(state.id).encode()?)?.is_none() + { + return Err(Error::Internal(format!("No active transaction with ID {}", state.id))); + } + Ok(Self { engine, id: state.id, mode: state.mode, snapshot: state.snapshot }) } /// Returns the transaction ID. @@ -186,6 +201,12 @@ impl Transaction { self.mode } + /// Returns the transaction's state. This can be used to instantiate a + /// functionally equivalent transaction via resume(). + pub fn state(&self) -> TransactionState { + TransactionState { id: self.id, mode: self.mode, snapshot: self.snapshot.clone() } + } + /// Commits the transaction, by removing the txn from the active set. pub fn commit(self) -> Result<()> { let mut session = self.engine.lock()?; @@ -363,13 +384,13 @@ impl Mode { } /// A versioned snapshot, containing visibility information about concurrent transactions. -#[derive(Clone)] -struct Snapshot { +#[derive(Clone, Deserialize, Serialize)] +pub struct Snapshot { /// The version (i.e. transaction ID) that the snapshot belongs to. - version: u64, + pub version: u64, /// The set of transaction IDs that were active at the start of the transactions, /// and thus should be invisible to the snapshot. - invisible: HashSet, + pub invisible: HashSet, } impl Snapshot { @@ -620,9 +641,9 @@ pub mod tests { // We now resume t3, who should see it's own changes but none // of the others' - let id = t3.id(); + let state = t3.state(); std::mem::drop(t3); - let tr = mvcc.resume(id)?; + let tr = mvcc.resume(state.clone())?; assert_eq!(3, tr.id()); assert_eq!(Mode::ReadWrite, tr.mode()); @@ -640,6 +661,12 @@ pub mod tests { // Once tr commits, a separate transaction should see t3's changes tr.commit()?; + // Resuming an inactive transaction should error. + assert_eq!( + mvcc.resume(state).err(), + Some(Error::Internal("No active transaction with ID 3".into())) + ); + let t = mvcc.begin()?; assert_eq!(Some(b"t2".to_vec()), t.get(b"a")?); assert_eq!(Some(b"t3".to_vec()), t.get(b"b")?); @@ -651,17 +678,14 @@ pub mod tests { assert_eq!(7, ts.id()); assert_eq!(Some(b"t1".to_vec()), ts.get(b"a")?); - let id = ts.id(); + let state = ts.state(); std::mem::drop(ts); - let ts = mvcc.resume(id)?; + let ts = mvcc.resume(state)?; assert_eq!(7, ts.id()); assert_eq!(Mode::Snapshot { version: 1 }, ts.mode()); assert_eq!(Some(b"t1".to_vec()), ts.get(b"a")?); ts.commit()?; - // Resuming an inactive transaction should error. - assert_eq!(mvcc.resume(7).err(), Some(Error::Value("No active transaction 7".into()))); - Ok(()) } diff --git a/tests/client/mod.rs b/tests/client/mod.rs index e1ffbf2c1..7d0bf1e39 100644 --- a/tests/client/mod.rs +++ b/tests/client/mod.rs @@ -129,7 +129,7 @@ async fn status() -> Result<()> { commit_index: 26, apply_index: 26, storage: "hybrid".into(), - storage_size: 3239, + storage_size: 3739, }, mvcc: mvcc::Status { txns: 1, txns_active: 0, storage: "memory".into() }, }