Skip to content

Commit

Permalink
Add MVCC TransactionState and use it for Transaction::resume().
Browse files Browse the repository at this point in the history
This allows passing transaction state across the Raft state machine
boundary. Later, this will allow read-only transactions to be truly
read-only, omitting the on-disk state tracking of the txn ID, snapshot,
and active set.
  • Loading branch information
erikgrinaker committed Sep 5, 2023
1 parent 453ca47 commit 3be2413
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 94 deletions.
14 changes: 11 additions & 3 deletions src/sql/engine/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ impl<E: storage::Engine> KV<E> {
Self { kv: storage::mvcc::MVCC::new(engine) }
}

/// Resumes a transaction with the given ID
pub fn resume(&self, id: u64) -> Result<<Self as super::Engine>::Transaction> {
Ok(<Self as super::Engine>::Transaction::new(self.kv.resume(id)?))
/// Resumes a transaction from the given state
pub fn resume(
&self,
state: storage::mvcc::TransactionState,
) -> Result<<Self as super::Engine>::Transaction> {
Ok(<Self as super::Engine>::Transaction::new(self.kv.resume(state)?))
}

/// Fetches an unversioned metadata value
Expand Down Expand Up @@ -73,6 +76,11 @@ impl<E: storage::Engine> Transaction<E> {
Self { txn }
}

/// Returns the transaction's serialized state.
pub(super) fn state(&self) -> storage::mvcc::TransactionState {
self.txn.state()
}

/// Loads an index entry
fn index_load(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>> {
Ok(self
Expand Down
138 changes: 75 additions & 63 deletions src/sql/engine/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,53 +3,57 @@ use super::super::types::{Expression, Row, Value};
use super::{Engine as _, IndexScan, Mode, Scan, Transaction as _};
use crate::error::{Error, Result};
use crate::raft;
use crate::storage;
use crate::storage::{self, mvcc::TransactionState};

use serde::{Deserialize, Serialize};
use std::collections::HashSet;

/// A Raft state machine mutation
///
/// TODO: use Cows for these.
#[derive(Clone, Serialize, Deserialize)]
enum Mutation {
/// Begins a transaction in the given mode
Begin(Mode),
/// Commits the transaction with the given ID
Commit(u64),
/// Rolls back the transaction with the given ID
Rollback(u64),
/// Commits the given transaction
Commit(TransactionState),
/// Rolls back the given transaction
Rollback(TransactionState),

/// Creates a new row
Create { txn_id: u64, table: String, row: Row },
Create { txn: TransactionState, table: String, row: Row },
/// Deletes a row
Delete { txn_id: u64, table: String, id: Value },
Delete { txn: TransactionState, table: String, id: Value },
/// Updates a row
Update { txn_id: u64, table: String, id: Value, row: Row },
Update { txn: TransactionState, table: String, id: Value, row: Row },

/// Creates a table
CreateTable { txn_id: u64, schema: Table },
CreateTable { txn: TransactionState, schema: Table },
/// Deletes a table
DeleteTable { txn_id: u64, table: String },
DeleteTable { txn: TransactionState, table: String },
}

/// A Raft state machine query
///
/// TODO: use Cows for these.
#[derive(Clone, Serialize, Deserialize)]
enum Query {
/// Fetches engine status
Status,

/// Reads a row
Read { txn_id: u64, table: String, id: Value },
Read { txn: TransactionState, table: String, id: Value },
/// Reads an index entry
ReadIndex { txn_id: u64, table: String, column: String, value: Value },
ReadIndex { txn: TransactionState, table: String, column: String, value: Value },
/// Scans a table's rows
Scan { txn_id: u64, table: String, filter: Option<Expression> },
Scan { txn: TransactionState, table: String, filter: Option<Expression> },
/// Scans an index
ScanIndex { txn_id: u64, table: String, column: String },
ScanIndex { txn: TransactionState, table: String, column: String },

/// Scans the tables
ScanTables { txn_id: u64 },
ScanTables { txn: TransactionState },
/// Reads a table
ReadTable { txn_id: u64, table: String },
ReadTable { txn: TransactionState, table: String },
}

/// Status for the Raft SQL engine.
Expand Down Expand Up @@ -110,19 +114,17 @@ impl super::Engine for Raft {
pub struct Transaction {
/// The underlying Raft cluster
client: raft::Client,
/// The transaction ID
id: u64,
/// The transaction mode
mode: Mode,
// Transaction state
state: TransactionState,
}

impl Transaction {
/// Starts a transaction in the given mode
fn begin(client: raft::Client, mode: Mode) -> Result<Self> {
let id = Raft::deserialize(&futures::executor::block_on(
let state = Raft::deserialize(&futures::executor::block_on(
client.mutate(Raft::serialize(&Mutation::Begin(mode))?),
)?)?;
Ok(Self { client, id, mode })
Ok(Self { client, state })
}

/// Executes a mutation
Expand All @@ -138,48 +140,48 @@ impl Transaction {

impl super::Transaction for Transaction {
fn id(&self) -> u64 {
self.id
self.state.id
}

fn mode(&self) -> Mode {
self.mode
self.state.mode
}

fn commit(self) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Commit(self.id))?)
Raft::deserialize(&self.mutate(Mutation::Commit(self.state.clone()))?)
}

fn rollback(self) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Rollback(self.id))?)
Raft::deserialize(&self.mutate(Mutation::Rollback(self.state.clone()))?)
}

fn create(&mut self, table: &str, row: Row) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Create {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
row,
})?)
}

fn delete(&mut self, table: &str, id: &Value) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Delete {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
})?)
}

fn read(&self, table: &str, id: &Value) -> Result<Option<Row>> {
Raft::deserialize(&self.query(Query::Read {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
})?)
}

fn read_index(&self, table: &str, column: &str, value: &Value) -> Result<HashSet<Value>> {
Raft::deserialize(&self.query(Query::ReadIndex {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
column: column.to_string(),
value: value.clone(),
Expand All @@ -189,7 +191,7 @@ impl super::Transaction for Transaction {
fn scan(&self, table: &str, filter: Option<Expression>) -> Result<Scan> {
Ok(Box::new(
Raft::deserialize::<Vec<_>>(&self.query(Query::Scan {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
filter,
})?)?
Expand All @@ -201,7 +203,7 @@ impl super::Transaction for Transaction {
fn scan_index(&self, table: &str, column: &str) -> Result<IndexScan> {
Ok(Box::new(
Raft::deserialize::<Vec<_>>(&self.query(Query::ScanIndex {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
column: column.to_string(),
})?)?
Expand All @@ -212,7 +214,7 @@ impl super::Transaction for Transaction {

fn update(&mut self, table: &str, id: &Value, row: Row) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::Update {
txn_id: self.id,
txn: self.state.clone(),
table: table.to_string(),
id: id.clone(),
row,
Expand All @@ -222,25 +224,32 @@ impl super::Transaction for Transaction {

impl Catalog for Transaction {
fn create_table(&mut self, table: Table) -> Result<()> {
Raft::deserialize(&self.mutate(Mutation::CreateTable { txn_id: self.id, schema: table })?)
Raft::deserialize(
&self.mutate(Mutation::CreateTable { txn: self.state.clone(), schema: table })?,
)
}

fn delete_table(&mut self, table: &str) -> Result<()> {
Raft::deserialize(
&self.mutate(Mutation::DeleteTable { txn_id: self.id, table: table.to_string() })?,
&self.mutate(Mutation::DeleteTable {
txn: self.state.clone(),
table: table.to_string(),
})?,
)
}

fn read_table(&self, table: &str) -> Result<Option<Table>> {
Raft::deserialize(
&self.query(Query::ReadTable { txn_id: self.id, table: table.to_string() })?,
&self.query(Query::ReadTable { txn: self.state.clone(), table: table.to_string() })?,
)
}

fn scan_tables(&self) -> Result<Tables> {
Ok(Box::new(
Raft::deserialize::<Vec<_>>(&self.query(Query::ScanTables { txn_id: self.id })?)?
.into_iter(),
Raft::deserialize::<Vec<_>>(
&self.query(Query::ScanTables { txn: self.state.clone() })?,
)?
.into_iter(),
))
}
}
Expand All @@ -267,25 +276,28 @@ impl<E: storage::Engine> State<E> {
/// Applies a state machine mutation
fn apply(&mut self, mutation: Mutation) -> Result<Vec<u8>> {
match mutation {
Mutation::Begin(mode) => Raft::serialize(&self.engine.begin(mode)?.id()),
Mutation::Commit(txn_id) => Raft::serialize(&self.engine.resume(txn_id)?.commit()?),
Mutation::Rollback(txn_id) => Raft::serialize(&self.engine.resume(txn_id)?.rollback()?),
Mutation::Begin(mode) => {
let txn = self.engine.begin(mode)?;
Raft::serialize(&txn.state())
}
Mutation::Commit(txn) => Raft::serialize(&self.engine.resume(txn)?.commit()?),
Mutation::Rollback(txn) => Raft::serialize(&self.engine.resume(txn)?.rollback()?),

Mutation::Create { txn_id, table, row } => {
Raft::serialize(&self.engine.resume(txn_id)?.create(&table, row)?)
Mutation::Create { txn, table, row } => {
Raft::serialize(&self.engine.resume(txn)?.create(&table, row)?)
}
Mutation::Delete { txn_id, table, id } => {
Raft::serialize(&self.engine.resume(txn_id)?.delete(&table, &id)?)
Mutation::Delete { txn, table, id } => {
Raft::serialize(&self.engine.resume(txn)?.delete(&table, &id)?)
}
Mutation::Update { txn_id, table, id, row } => {
Raft::serialize(&self.engine.resume(txn_id)?.update(&table, &id, row)?)
Mutation::Update { txn, table, id, row } => {
Raft::serialize(&self.engine.resume(txn)?.update(&table, &id, row)?)
}

Mutation::CreateTable { txn_id, schema } => {
Raft::serialize(&self.engine.resume(txn_id)?.create_table(schema)?)
Mutation::CreateTable { txn, schema } => {
Raft::serialize(&self.engine.resume(txn)?.create_table(schema)?)
}
Mutation::DeleteTable { txn_id, table } => {
Raft::serialize(&self.engine.resume(txn_id)?.delete_table(&table)?)
Mutation::DeleteTable { txn, table } => {
Raft::serialize(&self.engine.resume(txn)?.delete_table(&table)?)
}
}
}
Expand All @@ -311,30 +323,30 @@ impl<E: storage::Engine> raft::State for State<E> {

fn query(&self, command: Vec<u8>) -> Result<Vec<u8>> {
match Raft::deserialize(&command)? {
Query::Read { txn_id, table, id } => {
Raft::serialize(&self.engine.resume(txn_id)?.read(&table, &id)?)
Query::Read { txn, table, id } => {
Raft::serialize(&self.engine.resume(txn)?.read(&table, &id)?)
}
Query::ReadIndex { txn_id, table, column, value } => {
Raft::serialize(&self.engine.resume(txn_id)?.read_index(&table, &column, &value)?)
Query::ReadIndex { txn, table, column, value } => {
Raft::serialize(&self.engine.resume(txn)?.read_index(&table, &column, &value)?)
}
// FIXME These need to stream rows somehow
Query::Scan { txn_id, table, filter } => Raft::serialize(
&self.engine.resume(txn_id)?.scan(&table, filter)?.collect::<Result<Vec<_>>>()?,
Query::Scan { txn, table, filter } => Raft::serialize(
&self.engine.resume(txn)?.scan(&table, filter)?.collect::<Result<Vec<_>>>()?,
),
Query::ScanIndex { txn_id, table, column } => Raft::serialize(
Query::ScanIndex { txn, table, column } => Raft::serialize(
&self
.engine
.resume(txn_id)?
.resume(txn)?
.scan_index(&table, &column)?
.collect::<Result<Vec<_>>>()?,
),
Query::Status => Raft::serialize(&self.engine.kv.status()?),

Query::ReadTable { txn_id, table } => {
Raft::serialize(&self.engine.resume(txn_id)?.read_table(&table)?)
Query::ReadTable { txn, table } => {
Raft::serialize(&self.engine.resume(txn)?.read_table(&table)?)
}
Query::ScanTables { txn_id } => {
Raft::serialize(&self.engine.resume(txn_id)?.scan_tables()?.collect::<Vec<_>>())
Query::ScanTables { txn } => {
Raft::serialize(&self.engine.resume(txn)?.scan_tables()?.collect::<Vec<_>>())
}
}
}
Expand Down
Loading

0 comments on commit 3be2413

Please sign in to comment.