Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gracefullly shutdown runners #47

Merged
merged 2 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions foyer-storage-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ async fn main() {
&metrics_dump_end,
);
println!("\nTotal:\n{}", analysis);

store.shutdown_runners().await.unwrap();
}

async fn bench(args: Args, store: Arc<TStore>, metrics: Metrics, stop: oneshot::Receiver<()>) {
Expand Down
141 changes: 79 additions & 62 deletions foyer-storage/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use std::sync::Arc;
use foyer_common::queue::AsyncQueue;
use foyer_intrusive::{core::adapter::Link, eviction::EvictionPolicy};
use itertools::Itertools;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
use tokio::{
sync::{broadcast, mpsc, Mutex},
task::JoinHandle,
};

use crate::{
Expand All @@ -38,7 +38,7 @@ pub struct FlushTask {
struct FlusherInner {
sequence: usize,

task_txs: Vec<UnboundedSender<FlushTask>>,
task_txs: Vec<mpsc::UnboundedSender<FlushTask>>,
}

pub struct Flusher {
Expand All @@ -63,7 +63,9 @@ impl Flusher {
&self,
buffers: Arc<AsyncQueue<Vec<u8, A>>>,
region_manager: Arc<RegionManager<A, D, E, EL>>,
) where
stop_rxs: Vec<broadcast::Receiver<()>>,
) -> Vec<JoinHandle<()>>
where
A: BufferAllocator,
D: Device<IoBufferAllocator = A>,
E: EvictionPolicy<RegionEpItemAdapter<EL>, Link = EL>,
Expand All @@ -73,25 +75,30 @@ impl Flusher {

#[allow(clippy::type_complexity)]
let (mut txs, rxs): (
Vec<UnboundedSender<FlushTask>>,
Vec<UnboundedReceiver<FlushTask>>,
) = (0..self.runners).map(|_| unbounded_channel()).unzip();
Vec<mpsc::UnboundedSender<FlushTask>>,
Vec<mpsc::UnboundedReceiver<FlushTask>>,
) = (0..self.runners).map(|_| mpsc::unbounded_channel()).unzip();
inner.task_txs.append(&mut txs);

let runners = rxs
.into_iter()
.map(|rx| Runner {
task_rx: rx,
.zip_eq(stop_rxs.into_iter())
.map(|(task_rx, stop_rx)| Runner {
task_rx,
buffers: buffers.clone(),
region_manager: region_manager.clone(),
stop_rx,
})
.collect_vec();

let mut handles = vec![];
for runner in runners {
tokio::spawn(async move {
let handle = tokio::spawn(async move {
runner.run().await.unwrap();
});
handles.push(handle);
}
handles
}

pub fn runners(&self) -> usize {
Expand All @@ -113,10 +120,12 @@ where
E: EvictionPolicy<RegionEpItemAdapter<EL>, Link = EL>,
EL: Link,
{
task_rx: UnboundedReceiver<FlushTask>,
task_rx: mpsc::UnboundedReceiver<FlushTask>,
buffers: Arc<AsyncQueue<Vec<u8, A>>>,

region_manager: Arc<RegionManager<A, D, E, EL>>,

stop_rx: broadcast::Receiver<()>,
}

impl<A, D, E, EL> Runner<A, D, E, EL>
Expand All @@ -128,71 +137,79 @@ where
{
async fn run(mut self) -> Result<()> {
loop {
if let Some(task) = self.task_rx.recv().await {
// TODO(MrCroxx): seal buffer
tokio::select! {
Some(task) = self.task_rx.recv() => {
self.handle(task).await?;
}
_ = self.stop_rx.recv() => {
tracing::info!("[flusher] exit");
return Ok(())
}
}
}
}

tracing::info!("[flusher] receive flush task, region: {}", task.region_id);
async fn handle(&self, task: FlushTask) -> Result<()> {
tracing::info!("[flusher] receive flush task, region: {}", task.region_id);

let region = self.region_manager.region(&task.region_id);
let region = self.region_manager.region(&task.region_id);

tracing::trace!("[flusher] step 1");
tracing::trace!("[flusher] step 1");

{
// step 1: write buffer back to device
let slice = region.load(.., 0).await?.unwrap();
{
// step 1: write buffer back to device
let slice = region.load(.., 0).await?.unwrap();

// wait all physical readers (from previous version) and writers done
let guard = region.exclusive(false, true, false).await;
// wait all physical readers (from previous version) and writers done
let guard = region.exclusive(false, true, false).await;

tracing::trace!("[flusher] write region {} back to device", task.region_id);
tracing::trace!("[flusher] write region {} back to device", task.region_id);

let mut offset = 0;
let len = region.device().io_size();
while offset < region.device().region_size() {
let start = offset;
let end = std::cmp::min(offset + len, region.device().region_size());
let mut offset = 0;
let len = region.device().io_size();
while offset < region.device().region_size() {
let start = offset;
let end = std::cmp::min(offset + len, region.device().region_size());

let s = unsafe { Slice::new(&slice.as_ref()[start..end]) };
region
.device()
.write(s, region.id(), offset as u64, len)
.await?;
offset += len;
}
drop(guard);
slice.destroy().await;
}
let s = unsafe { Slice::new(&slice.as_ref()[start..end]) };
region
.device()
.write(s, region.id(), offset as u64, len)
.await?;
offset += len;
}
drop(guard);
slice.destroy().await;
}

tracing::trace!("[flusher] step 2");

tracing::trace!("[flusher] step 2");
let buffer = {
// step 2: detach buffer
let mut guard = region.exclusive(false, false, true).await;

let buffer = {
// step 2: detach buffer
let mut guard = region.exclusive(false, false, true).await;
let buffer = guard.detach_buffer();

let buffer = guard.detach_buffer();
tracing::trace!(
"[flusher] region {}, writers: {}, buffered readers: {}, physical readers: {}",
region.id(),
guard.writers(),
guard.buffered_readers(),
guard.physical_readers()
);

tracing::trace!(
"[flusher] region {}, writers: {}, buffered readers: {}, physical readers: {}",
region.id(),
guard.writers(),
guard.buffered_readers(),
guard.physical_readers()
);
drop(guard);
buffer
};

drop(guard);
buffer
};
tracing::trace!("[flusher] step 3");

tracing::trace!("[flusher] step 3");
// step 3: release buffer
self.buffers.release(buffer);
self.region_manager.set_region_evictable(&region.id()).await;

// step 3: release buffer
self.buffers.release(buffer);
self.region_manager.set_region_evictable(&region.id()).await;
tracing::info!("[flusher] finish flush task, region: {}", task.region_id);

tracing::info!("[flusher] finish flush task, region: {}", task.region_id);
} else {
return Ok(());
}
}
Ok(())
}
}
1 change: 1 addition & 0 deletions foyer-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#![feature(strict_provenance)]
#![feature(trait_alias)]
#![feature(get_mut_unchecked)]
#![feature(let_chains)]
#![allow(clippy::type_complexity)]

use device::io_buffer::AlignedAllocator;
Expand Down
Loading
Loading