Skip to content

Commit

Permalink
refactor wal: fix some bugs and remove min/max sequence in wal reader…
Browse files Browse the repository at this point in the history
…/writer
  • Loading branch information
bartliu827 authored and roseboy-liu committed Jun 24, 2024
1 parent 7ee9816 commit d1b96b1
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 414 deletions.
2 changes: 2 additions & 0 deletions tskv/src/record_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl Writer {
Ok(written_size)
}

#[allow(dead_code)]
pub async fn write_footer(&mut self, footer: &mut [u8; FILE_FOOTER_LEN]) -> TskvResult<usize> {
self.sync().await?;

Expand Down Expand Up @@ -196,6 +197,7 @@ impl Writer {
self.path.clone()
}

#[allow(dead_code)]
pub fn footer(&self) -> Option<[u8; FILE_FOOTER_LEN]> {
self.footer
}
Expand Down
109 changes: 13 additions & 96 deletions tskv/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ use snafu::{IntoError, OptionExt, ResultExt};
use self::reader::WalReader;
use self::writer::WalWriter;
use crate::error::{CommonSnafu, DecodeSnafu, EncodeSnafu};
use crate::file_system::async_filesystem::LocalFileSystem;
use crate::file_system::FileSystem;
use crate::kv_option::WalOptions;
use crate::tsm::codec::{get_str_codec, StringCodec};
pub use crate::wal::reader::print_wal_statistics;
Expand All @@ -63,7 +61,6 @@ const WAL_HEADER_LEN: usize = 9;

/// raft log has no cache
const WAL_BUFFER_SIZE: usize = 0;
const WAL_FOOTER_MAGIC_NUMBER: u32 = u32::from_be_bytes([b'w', b'a', b'l', b'o']);

#[repr(u8)]
#[derive(Eq, PartialEq, Clone, Copy, Debug)]
Expand Down Expand Up @@ -122,78 +119,44 @@ impl VnodeWal {
}

pub async fn open_writer(config: Arc<WalOptions>, wal_dir: &Path) -> TskvResult<WalWriter> {
// Create a new wal file every time it starts.
let (pre_max_seq, next_file_id) =
let next_file_id =
match file_utils::get_max_sequence_file_name(wal_dir, file_utils::get_wal_file_id) {
Some((_, id)) => {
let path = file_utils::make_wal_file(wal_dir, id);
let (_, max_seq) = reader::read_footer(&path, config.clone())
.await?
.unwrap_or((1_u64, 1_u64));
(max_seq + 1, id)
}
None => (1_u64, 1_u64),
Some((_, id)) => id,
None => 1_u64,
};

let new_wal = file_utils::make_wal_file(wal_dir, next_file_id);
let writer_file =
writer::WalWriter::open(config, next_file_id, new_wal, pre_max_seq).await?;
trace::info!("WAL '{}' starts write", writer_file.id());
let writer_file = WalWriter::open(config, next_file_id, new_wal).await?;
trace::info!("WAL '{:?}' starts write", writer_file.path());

Ok(writer_file)
}

async fn roll_wal_file(&mut self, max_file_size: u64) -> TskvResult<()> {
if self.current_wal.size() > max_file_size {
trace::info!(
"WAL '{}' is full at seq '{}', begin rolling.",
self.current_wal.id(),
self.current_wal.max_sequence()
);
trace::info!("WAL '{:?}' is full", self.current_wal.path());

let new_file_id = self.current_wal.id() + 1;
let new_file_name = file_utils::make_wal_file(&self.wal_dir, new_file_id);

let new_file = WalWriter::open(
self.config.clone(),
new_file_id,
new_file_name,
self.current_wal.max_sequence(),
)
.await?;
let new_file = WalWriter::open(self.config.clone(), new_file_id, new_file_name).await?;

let mut old_file = std::mem::replace(&mut self.current_wal, new_file);
if old_file.max_sequence() <= old_file.min_sequence() {
old_file.set_max_sequence(old_file.min_sequence());
} else {
old_file.set_max_sequence(old_file.max_sequence() - 1);
}
old_file.close().await?;

trace::info!(
"WAL '{}' starts write at seq {}",
self.current_wal.id(),
self.current_wal.max_sequence()
);
}
Ok(())
}

pub async fn truncate_wal_file(
&mut self,
file_id: u64,
pos: u64,
seq_no: u64,
) -> TskvResult<()> {
pub async fn truncate_wal_file(&mut self, file_id: u64, pos: u64) -> TskvResult<()> {
if self.current_wal_id() == file_id {
self.current_wal.truncate(pos, seq_no).await;
self.current_wal.truncate(pos).await;
self.current_wal.sync().await?;
return Ok(());
}

let file_name = file_utils::make_wal_file(&self.wal_dir, file_id);
let mut new_file = WalWriter::open(self.config.clone(), file_id, file_name, seq_no).await?;
new_file.truncate(pos, seq_no).await;
let mut new_file = WalWriter::open(self.config.clone(), file_id, file_name).await?;
new_file.truncate(pos).await;
new_file.sync().await?;

Ok(())
Expand All @@ -214,48 +177,6 @@ impl VnodeWal {
Ok(())
}

pub async fn delete_wal_files(&mut self, wal_ids: &[u64]) {
for wal_id in wal_ids {
if *wal_id == self.current_wal.id() {
continue;
}

let file_path = file_utils::make_wal_file(self.wal_dir(), *wal_id);
trace::info!("Removing wal file '{}'", file_path.display());
if let Err(e) = tokio::fs::remove_file(&file_path).await {
trace::error!("Failed to remove file '{}': {:?}", file_path.display(), e);
}
}
}

// delete wal files < seq
async fn delete_wal_before_seq(&mut self, seq: u64) -> TskvResult<()> {
let mut delete_ids = vec![];
let wal_files = LocalFileSystem::list_file_names(self.wal_dir());
for file_name in wal_files {
// If file name cannot be parsed to wal id, skip that file.
let wal_id = match file_utils::get_wal_file_id(&file_name) {
Ok(id) => id,
Err(_) => continue,
};

if self.current_wal_id() == wal_id {
continue;
}

let path = self.wal_dir().join(&file_name);
if let Ok(reader) = WalReader::open(&path, self.config.clone()).await {
if reader.max_sequence() < seq {
delete_ids.push(wal_id);
}
}
}

self.delete_wal_files(&delete_ids).await;

Ok(())
}

async fn write_raft_entry(
&mut self,
raft_entry: &wal_store::RaftEntry,
Expand All @@ -279,7 +200,7 @@ impl VnodeWal {
} else {
let wal_dir = self.config.wal_dir(&self.tenant_database, self.vnode_id);
let wal_path = file_utils::make_wal_file(wal_dir, wal_id);
let reader = WalReader::open(wal_path, self.config.clone()).await?;
let reader = WalReader::open(wal_path).await?;
Ok(reader)
}
}
Expand All @@ -289,7 +210,7 @@ impl VnodeWal {
}

/// Close current record file, return count of bytes appended as footer.
pub async fn close(&mut self) -> TskvResult<usize> {
pub async fn close(&mut self) -> TskvResult<()> {
self.current_wal.close().await
}

Expand All @@ -301,10 +222,6 @@ impl VnodeWal {
&self.wal_dir
}

pub fn current_seq_no(&self) -> u64 {
self.current_wal.max_sequence()
}

pub fn current_wal_id(&self) -> u64 {
self.current_wal.id()
}
Expand Down
Loading

0 comments on commit d1b96b1

Please sign in to comment.