Skip to content

Commit

Permalink
storage: move Debug engine to engine::test::Emit
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jun 10, 2024
1 parent a637d77 commit e1e228b
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 110 deletions.
15 changes: 9 additions & 6 deletions src/raft/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ mod tests {
use super::*;
use crossbeam::channel::Receiver;
use std::{error::Error, result::Result};
use storage::engine::test as testengine;
use test_each_file::test_each_path;

// Run goldenscript tests in src/raft/testscripts/log.
Expand All @@ -355,7 +356,7 @@ mod tests {
/// Runs Raft log goldenscript tests. For available commands, see run().
struct TestRunner {
log: Log,
op_rx: Receiver<storage::debug::Operation>,
op_rx: Receiver<testengine::Operation>,
}

impl goldenscript::Runner for TestRunner {
Expand Down Expand Up @@ -535,8 +536,8 @@ mod tests {

impl TestRunner {
fn new() -> Self {
let engine = storage::Debug::new(storage::Memory::new());
let op_rx = engine.op_rx();
let (op_tx, op_rx) = crossbeam::channel::unbounded();
let engine = testengine::Emit::new(storage::Memory::new(), op_tx);
let log = Log::new(Box::new(engine)).expect("log init failed");
Self { log, op_rx }
}
Expand Down Expand Up @@ -566,11 +567,13 @@ mod tests {
return;
}
while let Ok(op) = self.op_rx.try_recv() {
use storage::debug::Operation;
use testengine::Operation;
let s = match op {
Operation::Delete(k) => format!("delete {}", Self::format_key(&k)),
Operation::Delete { key } => format!("delete {}", Self::format_key(&key)),
Operation::Flush => "flush".to_string(),
Operation::Set(k, v) => format!("set {}", Self::format_key_value(&k, &v)),
Operation::Set { key, value } => {
format!("set {}", Self::format_key_value(&key, &value))
}
};
output.push_str(&format!("engine: {s}\n"));
}
Expand Down
83 changes: 3 additions & 80 deletions src/storage/debug.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
//! Storage debug helpers, primarily formatting of raw engine data.

use std::collections::HashSet;

use crossbeam::channel::{Receiver, Sender};
// TODO: consider moving these elsewhere.

use super::engine::{self, ScanIterator, Status};
use super::mvcc::{self, TransactionState};
use crate::encoding::{bincode, Key as _};
use crate::error::Result;

use std::collections::HashSet;

/// Formats a raw byte string, either as a UTF-8 string (if valid and
/// printable), otherwise hex-encoded.
Expand Down Expand Up @@ -92,78 +90,3 @@ pub fn format_key_value(key: &[u8], value: &Option<Vec<u8>>) -> (String, Option<

(fkey, fvalue)
}

/// A debug storage engine, which wraps another engine and emits events.
pub struct Engine<E: engine::Engine> {
/// The wrapped engine.
inner: E,
/// Sends engine operations.
op_tx: Sender<Operation>,
/// Receives engine operations.
op_rx: Receiver<Operation>,
}

impl<E: engine::Engine> std::fmt::Display for Engine<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "debug:{}", self.inner)
}
}

impl<E: engine::Engine> Engine<E> {
pub fn new(inner: E) -> Self {
let (op_tx, op_rx) = crossbeam::channel::unbounded();
Self { inner, op_tx, op_rx }
}

pub fn op_rx(&self) -> Receiver<Operation> {
self.op_rx.clone()
}
}

impl<E: engine::Engine> engine::Engine for Engine<E> {
type ScanIterator<'a> = E::ScanIterator<'a> where E: 'a;

fn flush(&mut self) -> Result<()> {
self.inner.flush()?;
self.op_tx.send(Operation::Flush)?;
Ok(())
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
self.inner.delete(key)?;
self.op_tx.send(Operation::Delete(key.to_vec()))?;
Ok(())
}

fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.inner.get(key)
}

fn scan(&mut self, range: impl std::ops::RangeBounds<Vec<u8>>) -> Self::ScanIterator<'_> {
self.inner.scan(range)
}

fn scan_dyn(
&mut self,
range: (std::ops::Bound<Vec<u8>>, std::ops::Bound<Vec<u8>>),
) -> Box<dyn ScanIterator + '_> {
Box::new(self.scan(range))
}

fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> {
self.inner.set(key, value.clone())?;
self.op_tx.send(Operation::Set(key.to_vec(), value))?;
Ok(())
}

fn status(&mut self) -> Result<Status> {
self.inner.status()
}
}

/// An engine operation, emitted by the debug engine.
pub enum Operation {
Delete(Vec<u8>),
Flush,
Set(Vec<u8>, Vec<u8>),
}
76 changes: 76 additions & 0 deletions src/storage/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,82 @@ pub struct Status {
pub garbage_disk_size: u64,
}

#[cfg(test)]
/// Test helper engines.
pub mod test {
use super::*;
use crossbeam::channel::Sender;

/// Wraps another engine and emits write events to the given channel.
pub struct Emit<E: Engine> {
/// The wrapped engine.
inner: E,
/// Sends operation events.
tx: Sender<Operation>,
}

/// An engine operation emitted by the Emit engine.
pub enum Operation {
Delete { key: Vec<u8> },
Flush,
Set { key: Vec<u8>, value: Vec<u8> },
}

impl<E: Engine> std::fmt::Display for Emit<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// Just dispatch to the inner engine.
self.inner.fmt(f)
}
}

impl<E: Engine> Emit<E> {
pub fn new(inner: E, tx: Sender<Operation>) -> Self {
Self { inner, tx }
}
}

impl<E: Engine> Engine for Emit<E> {
type ScanIterator<'a> = E::ScanIterator<'a> where E: 'a;

fn flush(&mut self) -> Result<()> {
self.inner.flush()?;
self.tx.send(Operation::Flush)?;
Ok(())
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
self.inner.delete(key)?;
self.tx.send(Operation::Delete { key: key.to_vec() })?;
Ok(())
}

fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.inner.get(key)
}

fn scan(&mut self, range: impl std::ops::RangeBounds<Vec<u8>>) -> Self::ScanIterator<'_> {
self.inner.scan(range)
}

fn scan_dyn(
&mut self,
range: (std::ops::Bound<Vec<u8>>, std::ops::Bound<Vec<u8>>),
) -> Box<dyn ScanIterator + '_> {
Box::new(self.scan(range))
}

fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> {
self.inner.set(key, value.clone())?;
self.tx.send(Operation::Set { key: key.to_vec(), value })?;
Ok(())
}

fn status(&mut self) -> Result<Status> {
self.inner.status()
}
}
}

#[cfg(test)]
pub(crate) mod tests {
/// Generates common tests for any Engine implementation.
Expand Down
2 changes: 0 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@ mod memory;
pub mod mvcc;

pub use bitcask::BitCask;
#[cfg(test)]
pub use debug::Engine as Debug;
pub use engine::{Engine, ScanIterator, Status};
pub use memory::Memory;
Loading

0 comments on commit e1e228b

Please sign in to comment.