At the highest level, toyDB consists of a cluster of nodes that execute SQL transactions against a replicated state machine. Clients can connect to any node in the cluster and submit SQL statements. It aims to provide linearizability (i.e. strong consistency) and serializability, but falls slightly short as it currently only implements snapshot isolation.
The Raft algorithm is used for cluster consensus, which tolerates the failure of any node as long as a majority of nodes are still available. One node is elected leader, and replicates commands to the others which apply them to local copies of the state machine. If the leader is lost, a new leader is elected and the cluster continues operation. Client commands are automatically forwarded to the leader.
This architecture guide will begin with a high-level overview of node components, before discussing each component from the bottom up. Along the way, we will make note of tradeoffs and design choices.
A toyDB node consists of three main components:
-
Storage engine: stores data and manages transactions, on disk and in memory.
-
Raft consensus engine: handles cluster coordination and state machine replication.
-
SQL engine: parses, plans, and executes SQL statements for clients.
These components are integrated in the toyDB server, which handles network communication with clients and other nodes. The following diagram illustrates its internal structure:
At the bottom is a simple key/value store, which stores all SQL data. This is wrapped inside an MVCC key/value store that adds ACID transactions. On top of that is a SQL storage engine, providing basic CRUD operations on tables, rows, and indexes. This makes up the node's core storage engine.
The SQL storage engine is wrapped in a Raft state machine interface, allowing it to be managed by the Raft consensus engine. The Raft node receives commands from clients and coordinates with other Raft nodes to reach consensus on an ordered command log. Once commands are committed to the log, they are applied to the local state machine.
On top of the Raft engine is a Raft-based SQL storage engine, which implements the SQL storage interface and submits commands to the Raft cluster. This allows the rest of the SQL layer to use the Raft cluster as if it was local storage. The SQL engine manages client SQL sessions, which take SQL queries as text, parse them, generate query plans, and execute them against the SQL storage engine.
Surrounding these components is the toyDB server, which in addition to network communication also handles configuration, logging, and other process-level concerns.
ToyDB uses a pluggable key/value storage engine, with the SQL and Raft storage engines configurable
via the storage_sql
and storage_raft
options respectively. The higher-level SQL storage engine
will be discussed separately in the SQL section.
A key/value storage engine stores arbitrary key/value pairs as binary byte slices, and implements
the
storage::Engine
trait:
/// A key/value storage engine, where both keys and values are arbitrary byte
/// strings between 0 B and 2 GB, stored in lexicographical key order. Writes
/// are only guaranteed durable after calling flush().
///
/// Only supports single-threaded use since all methods (including reads) take a
/// mutable reference -- serialized access can't be avoided anyway, since both
/// Raft execution and file access is serial.
pub trait Engine: std::fmt::Display + Send + Sync {
/// The iterator returned by scan(). Traits can't return "impl Trait", and
/// we don't want to use trait objects, so the type must be specified.
type ScanIterator<'a>: DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a
where
Self: 'a;
/// Deletes a key, or does nothing if it does not exist.
fn delete(&mut self, key: &[u8]) -> Result<()>;
/// Flushes any buffered data to the underlying storage medium.
fn flush(&mut self) -> Result<()>;
/// Gets a value for a key, if it exists.
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
/// Iterates over an ordered range of key/value pairs.
fn scan<R: std::ops::RangeBounds<Vec<u8>>>(&mut self, range: R) -> Self::ScanIterator<'_>;
/// Sets a value for a key, replacing the existing value if any.
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()>;
}
The get
, set
and delete
methods simply read and write key/value pairs, and flush
ensures
any buffered data is written out to storage (e.g. via the fsync
system call). scan
iterates
over a key/value range in order, a property that is crucial to higher-level functionality (e.g.
SQL table scans) and has a couple of important implications:
-
Implementations should store data ordered by key, for performance.
-
Keys should use an order-preserving byte encoding, to allow range scans.
The engine itself does not care what keys contain, but the storage module offers an order-preserving key encoding called KeyCode for use by higher layers. These storage layers often use composite keys made up of several possibly variable-length values (e.g. an index key consists of table, column, and value), and the natural ordering of each segment must be preserved, a property satisfied by this encoding:
bool
:0x00
forfalse
,0x01
fortrue
.u64
: big-endian binary representation.i64
: big-endian binary representation, with sign bit flipped.f64
: big-endian binary representation, with sign bit flipped, and rest if negative.Vec<u8>
:0x00
is escaped as0x00ff
, terminated with0x0000
.String
: likeVec<u8>
.
Additionally, several container types are supported:
- Tuple: concatenation of elements, with no surrounding structure.
- Array: like tuple.
- Vec: like tuple.
- Enum: the variant's enum index as a single
u8
byte, then contents. - Value: like enum.
The default key/value engine is
storage::BitCask
,
a very simple variant of Bitcask, an append-only log-structured storage engine.
All writes are appended to a log file, with an index mapping live keys to file
positions maintained in memory. When the amount of garbage (replaced or deleted
keys) in the file exceeds 20%, a new log file is written containing only live
keys, replacing the old log file.
Keyset in memory: BitCask requires the entire key set to fit in memory, and must also scan the log file on startup to construct the key index.
Compaction volume: unlike an LSM tree, this single-file BitCask implementation requires rewriting the entire dataset during compactions, which can produce significant write amplification over time.
Key encoding: does not make use of any compression, e.g. variable-length integers, preferring simplicity and correctness.
MVCC (Multi-Version Concurrency Control) is a relatively simple concurrency control mechanism that provides ACID transactions with snapshot isolation without taking out locks or having writes block reads. It also versions all data, allowing querying of historical data.
toyDB implements MVCC at the storage layer as
storage::mvcc::MVCC
,
using any storage::Engine
implementation for underlying storage. begin
returns a new
transaction, which provides the usual key/value operations such as get
, set
, and scan
.
Additionally, it has a commit
method which persists the changes and makes them visible to
other transactions, and a rollback
method which discards them.
When a transaction begins, it fetches the next available version from Key::NextVersion
and
increments it, then records itself as an active transaction via Key::TxnActive(version)
. It also
takes a snapshot of the active set, containing the versions of all other active transactions as of
the transaction start, and saves it as Key::TxnActiveSnapshot(id)
.
Key/value pairs are saved as Key::Version(key, version)
, where key
is the user-provided key
and version
is the transaction's version. The visibility of key/value pairs for a transaction is
given as follows:
-
For a given user key, do a reverse scan of
Key::Version(key, version)
starting at the current transaction's version. -
Skip any records whose version is in the transaction's snapshot of the active set.
-
Return the first matching record, if any. This record may be either a
Some(value)
or aNone
if the key was deleted.
When writing a key/value pair, the transaction first checks for any conflicts by scanning for a
Key::Version(key, version)
which is not visible to it. If one is found, a serialization error
is returned and the client must retry the transaction. Otherwise, the transaction writes the new
record and keeps track of the change as Key::TxnWrite(version, key)
in case it must roll back.
When the transaction commits, it simply deletes its Txn::Active(id)
record, thus making its
changes visible to any subsequent transactions. If the transaction instead rolls back, it
iterates over all Key::TxnWrite(id, key)
entries and removes the written key/value records before
removing its Txn::Active(id)
entry.
This simple scheme is sufficient to provide ACID transaction guarantees with snapshot isolation: commits are atomic, a transaction sees a consistent snapshot of the key/value store as of the start of the transaction, and any write conflicts result in serialization errors which must be retried.
To satisfy time travel queries, a read-only transaction simply loads the Key::TxnActiveSnapshot
entry of a past transaction and applies the same visibility rules as for normal transactions.
Serializability: snapshot isolation is not fully serializable, since it exhibits write skew anomalies. This would require serializable snapshot isolation, which was considered unnecessary for a first version - it may be implemented later.
Garbage collection: old MVCC versions are never removed, leading to unbounded disk usage. However, this also allows for complete data history, and simplifies the implementation.
Transaction ID overflow: transaction IDs will overflow after 64 bits, but this is never going to happen with toyDB.
The Raft consensus protocol is explained well in the original Raft paper, and will not be repeated here - refer to it for details. toyDB's implementation follows the paper fairly closely.
The Raft node raft::Node
is
the core of the implementation, a finite state machine with enum variants for the node roles:
leader, follower, and candidate. This enum wraps the RawNode
struct, which contains common
node functionality and is generic over the specific roles Leader
, Follower
, and Candidate
that implement the Raft protocol.
Nodes are initialized with an ID and a list of peer IDs, and communicate by passing
raft::Message
messages. Inbound messages are received via Node.step()
calls, and outbound messages are sent
via an mpsc
channel. Nodes also use a logical clock to keep track of e.g. election timeouts
and heartbeats, and the clock is ticked at regular intervals via Node.tick()
calls. These
methods are synchronous and may cause state transitions, e.g. changing a candidate into a leader
when it receives the winning vote.
Nodes have a command log raft::Log
,
using a storage::Engine
for storage, and a raft::State
state machine (the SQL engine). When the leader receives a write request, it appends the command
to its local log and replicates it to followers. Once a quorum have replicated it, the command is
committed and applied to the state machine, and the result returned the the client. When the leader
receives a read request, it needs to ensure it is still the leader in order to satisfy
linearizability (a new leader could exist elsewhere resulting in a stale read). It increments a
read sequence number and broadcasts it via a Raft heartbeat. Once a quorum have confirmed the
leader at this sequence number, the read command is executed against the state machine and the
result returned to the client.
The actual network communication is handled by the server process, which will be described in a separate section.
Single-threaded state: all state operations run in a single thread on the leader, preventing horizontal scalability. Improvements here would require running multiple sharded Raft clusters, which is out of scope for the project.
Synchronous application: state machine application happens synchronously in the main Raft thread. This is significantly simpler than asynchronous application, but may cause delays in Raft processing.
Log replication: only the simplest form of Raft log replication is implemented, without state snapshots or rapid log replay. Lagging nodes will be very slow to catch up.
Cluster resizing: the Raft cluster consists of a static set of nodes given at startup, resizing it requires a complete cluster restart.
The SQL engine builds on Raft and MVCC to provide a SQL interface to clients. Logically, the life of a SQL query is as follows:
Query → Lexer → Parser → Planner → Optimizer → Executor → Storage Engine
We'll begin by looking at the basic SQL type and schema systems, as well as the SQL storage engine and its session interface. Then we'll switch sides and look at how a query is executed, starting at the front with the parser and following it until it's executed against the SQL storage engine, completing the chain.
toyDB has a very simple type system, with the
sql::DataType
enum
specifying the available data types: Boolean
, Integer
, Float
, and String
.
The sql::Value
enum
represents a specific value using Rust's native type system, e.g. an integer value is
Value::Integer(i64)
. This enum also specifies comparison, ordering, and formatting of values. The
special value Value::Null
represents an unknown value of unknown type, following the rules of
three-valued logic.
Values can be grouped into a Row
, which is an alias for Vec<Value>
. The type Rows
is an alias
for a fallible row iterator, and Column
is a result column containing a name.
Expressions sql::Expression
represent operations on values. For example, (1 + 2) * 3
is represented as:
Expression::Multiply(
Expression::Add(
Expression::Constant(Value::Integer(1)),
Expression::Constant(Value::Integer(2)),
),
Expression::Constant(Value::Integer(3)),
)
Calling evaluate()
on the expression will recursively evaluate it, returning Value::Integer(9)
.
The schema defines the tables sql::Table
and columns sql::Column
in a toyDB database. Tables have a name and a list of columns, while a column has several
attributes such as name, data type, and various constraints. They also have methods to
validate rows and values, e.g. to make sure a value is of the correct type for a column
or to enforce referential integrity.
The schema is stored and managed with sql::Catalog
,
a trait implemented by the SQL storage engine:
pub trait Catalog {
/// Creates a new table.
fn create_table(&mut self, table: &Table) -> Result<()>;
/// Deletes a table, or errors if it does not exist.
fn delete_table(&mut self, table: &str) -> Result<()>;
/// Reads a table, if it exists.
fn read_table(&self, table: &str) -> Result<Option<Table>>;
/// Iterates over all tables.
fn scan_tables(&self) -> Result<Tables>;
}
Single database: only a single, unnamed database is supported per toyDB cluster. This is sufficient for toyDB's use-cases, and simplifies the implementation.
Schema changes: schema changes other than creating or dropping tables is not supported. This avoids complicated data migration logic, and allows using table/column names as storage identifiers (since they can never change) without any additional indirection.