Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add simple test for read only file store and fix bugs #10

Merged
merged 1 commit into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#![feature(trait_alias)]
#![feature(pattern)]

use std::fmt::Debug;

pub use policies::Policy;

pub mod collections;
Expand All @@ -23,7 +25,7 @@ pub mod policies;
pub mod store;

pub trait Index:
PartialOrd + Ord + PartialEq + Eq + Clone + std::hash::Hash + Send + Sync + 'static
PartialOrd + Ord + PartialEq + Eq + Clone + std::hash::Hash + Send + Sync + 'static + Debug
{
fn size() -> usize;

Expand Down
4 changes: 2 additions & 2 deletions src/store/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ impl Location {
}

pub fn read(mut buf: &[u8]) -> Self {
let offset = buf.get_u32();
let len = buf.get_u32();
let offset = buf.get_u32_le();
let len = buf.get_u32_le();
Self { offset, len }
}

Expand Down
160 changes: 150 additions & 10 deletions src/store/read_only_file_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fs::read_dir;
use std::fs::{create_dir_all, read_dir};
use std::mem::swap;
use std::path::{Path, PathBuf};

use std::str::pattern::Pattern;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{collections::HashMap, marker::PhantomData};

Expand Down Expand Up @@ -44,6 +45,21 @@ pub struct Config {

/// max cache file size
pub max_file_size: usize,

/// store capacity
pub capacity: usize,

/// ratio of garbage to trigger reclaim
pub trigger_reclaim_garbage_ratio: f64,

/// ratio of size to trigger reclaim
pub trigger_reclaim_capacity_ratio: f64,

/// ratio of size to trigger randomly drop
pub trigger_random_drop_ratio: f64,

/// ratio of randomly dropped entries
pub random_drop_ratio: f64,
}

struct Frozen {
Expand Down Expand Up @@ -79,8 +95,11 @@ where

indices: Arc<RwLock<HashMap<I, (FileId, SlotId, Location)>>>,

/// write lock is used when rotating active file or reclaiming frozen files
files: Arc<RwLock<ReadOnlyFileStoreFiles>>,

size: Arc<AtomicUsize>,

_marker: PhantomData<D>,
}

Expand All @@ -95,6 +114,7 @@ where
config: Arc::clone(&self.config),
indices: Arc::clone(&self.indices),
files: Arc::clone(&self.files),
size: Arc::clone(&self.size),
_marker: PhantomData,
}
}
Expand All @@ -118,6 +138,7 @@ where
let ids = asyncify({
let dir = config.dir.clone();
move || {
create_dir_all(&dir)?;
let ids: Vec<FileId> = read_dir(dir)?
.map(|entry| entry.unwrap())
.filter(|entry| {
Expand All @@ -137,10 +158,14 @@ where
.await?;

let mut frozens = HashMap::new();
let mut size = 0;
for id in &ids {
let frozen = Self::open_frozen_file(&config.dir, *id).await?;
// when restore, `len` is filled with `fstat(2)`
size += frozen.cache_file.len();
frozens.insert(*id, frozen);
}
// create new active file on every `open`
let id = ids.into_iter().max().unwrap_or(0) + 1;
let active = Self::open_active_file(&config.dir, id).await?;

Expand All @@ -155,13 +180,15 @@ where
config: Arc::new(config),
indices: Arc::new(RwLock::new(indices)),
files: Arc::new(RwLock::new(files)),
size: Arc::new(AtomicUsize::new(size)),
_marker: PhantomData,
})
}

#[allow(clippy::uninit_vec)]
async fn store(&self, index: Self::I, data: Self::D) -> Result<()> {
let buf = data.into();
let len = buf.len();

// append cache file and meta file
let (fid, sid, location) = {
Expand Down Expand Up @@ -192,14 +219,18 @@ where
drop(indices);
}

self.size.fetch_add(len, Ordering::Relaxed);

if active_file_size >= self.config.max_file_size {
let inner = self.files.write().await;
let files = self.files.write().await;
// check size again in the critical section to prevent from double rotating
if inner.active.cache_file.len() >= self.config.max_file_size {
self.rotate_active_file_locked(inner).await?;
if files.active.cache_file.len() >= self.config.max_file_size {
self.rotate_active_file_locked(files).await?;
}
}

self.maybe_trigger_reclaim().await?;

Ok(())
}

Expand Down Expand Up @@ -236,11 +267,13 @@ where
}
};

self.maybe_trigger_reclaim().await?;

Ok(Some(buf.into()))
}

async fn delete(&self, index: &Self::I) -> Result<()> {
let (fid, sid, _location) = {
let (fid, sid, location) = {
let indices = self.indices.read().await;
let (fid, sid, location) = match indices.get(index) {
Some((fid, sid, location)) => (*fid, *sid, *location),
Expand Down Expand Up @@ -269,6 +302,11 @@ where
}
}

self.size
.fetch_sub(location.len as usize, Ordering::Relaxed);

self.maybe_trigger_reclaim().await?;

Ok(())
}
}
Expand Down Expand Up @@ -296,7 +334,7 @@ where
Ok(())
}

pub async fn reclaim_frozen_file(&self, id: FileId) -> Result<()> {
async fn reclaim_frozen_file(&self, id: FileId) -> Result<()> {
let (fid, meta_file, cache_file) = {
let mut files = self.files.write().await;

Expand All @@ -312,19 +350,59 @@ where
let mut indices_to_delete = HashMap::new();
Self::restore_meta(fid, &meta_file, &mut indices_to_delete).await?;

{
let size = {
let mut size = 0;
let mut indices = self.indices.write().await;
for index in indices_to_delete.keys() {
indices.remove(index);
for (index, (_fid, _sid, Location { offset: _, len })) in indices_to_delete {
indices.remove(&index);
size += len;
}
}
size as usize
};

self.size.fetch_sub(size, Ordering::Relaxed);

meta_file.reclaim().await?;
cache_file.reclaim().await?;

Ok(())
}

async fn maybe_trigger_reclaim(&self) -> Result<()> {
// trigger by size ratio
if self.size.load(Ordering::Relaxed) as f64
>= self.config.capacity as f64 * self.config.trigger_reclaim_capacity_ratio
{
self.reclaim().await?;
}

// TODO(MrCroxx): trigger reclaim based on garbage ratio

Ok(())
}

/// Reclaim garbage to make room.
///
/// Policy:
///
/// [WIP] For now, simply reclaim the oldest frozen file.
///
/// TODO(MrCroxx): better reclaim policy
async fn reclaim(&self) -> Result<()> {
let id = {
let files = self.files.read().await;
let id = files.frozens.keys().min();
match id {
Some(id) => *id,
None => return Ok(()),
}
};

self.reclaim_frozen_file(id).await?;

Ok(())
}

async fn open_active_file(dir: impl AsRef<Path>, id: FileId) -> Result<Active> {
let meta_file = WritableFile::open(Self::meta_file_path(&dir, id)).await?;
let cache_file = AppendableFile::open(Self::cache_file_path(&dir, id)).await?;
Expand Down Expand Up @@ -375,3 +453,65 @@ where
I::size() + Location::size()
}
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;

fn data(i: u8, len: usize) -> Vec<u8> {
vec![i; len]
}

#[tokio::test]
async fn test_read_only_file_store_simple() {
let dir = tempdir().unwrap();

let config = Config {
dir: dir.path().to_owned(),
max_file_size: 4 * 1024,
capacity: 16 * 1024,
trigger_reclaim_garbage_ratio: 0.0, // disabled
trigger_reclaim_capacity_ratio: 0.75,
trigger_random_drop_ratio: 0.0, // disabled
random_drop_ratio: 0.0, // disabled
};

let store: ReadOnlyFileStore<u64, Vec<u8>> =
ReadOnlyFileStore::open(0, config).await.unwrap();

store.store(1, data(1, 1024)).await.unwrap();
assert_eq!(store.load(&1).await.unwrap(), Some(data(1, 1024)));

store.store(2, data(2, 1024)).await.unwrap();
assert_eq!(store.load(&2).await.unwrap(), Some(data(2, 1024)));
store.store(3, data(3, 1024)).await.unwrap();
assert_eq!(store.load(&3).await.unwrap(), Some(data(3, 1024)));
store.store(4, data(4, 1024)).await.unwrap();
assert_eq!(store.load(&4).await.unwrap(), Some(data(4, 1024)));

// assert rotate
assert_eq!(store.files.read().await.frozens.len(), 1);

assert_eq!(store.load(&1).await.unwrap(), Some(data(1, 1024)));
assert_eq!(store.load(&2).await.unwrap(), Some(data(2, 1024)));
assert_eq!(store.load(&3).await.unwrap(), Some(data(3, 1024)));
assert_eq!(store.load(&4).await.unwrap(), Some(data(4, 1024)));

store.store(5, data(5, 4 * 1024)).await.unwrap();
assert_eq!(store.size.load(Ordering::Relaxed), 8 * 1024);
assert_eq!(store.files.read().await.frozens.len(), 2);

// assert reclaim
store.store(6, data(6, 4 * 1024)).await.unwrap();
assert_eq!(store.files.read().await.frozens.len(), 2);
assert_eq!(store.size.load(Ordering::Relaxed), 8 * 1024);

assert_eq!(store.load(&1).await.unwrap(), None);
assert_eq!(store.load(&2).await.unwrap(), None);
assert_eq!(store.load(&3).await.unwrap(), None);
assert_eq!(store.load(&4).await.unwrap(), None);

drop(dir);
}
}
Loading