Skip to content

Commit

Permalink
feat: add "file_buffer_size" config item
Browse files Browse the repository at this point in the history
  • Loading branch information
Kree0 authored and roseboy-liu committed Jan 24, 2024
1 parent 07e229d commit eca1b24
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ edition.workspace = true
serde = { workspace = true }
toml = { workspace = true }
async-backtrace = { workspace = true, optional = true }
sys-info = {workspace = true}
num_cpus = {workspace = true}
sys-info = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }

[features]
default = []
backtrace = ["async-backtrace"]
backtrace = ["async-backtrace"]
1 change: 1 addition & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#reporting_disabled = false
#node_id = 100
file_buffer_size = '1M'

[deployment]
#mode = 'singleton'
Expand Down
1 change: 1 addition & 0 deletions config/config_8902.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#reporting_disabled = false
host = "localhost"
file_buffer_size = '1M'

[deployment]
#mode = 'singleton'
Expand Down
1 change: 1 addition & 0 deletions config/config_8912.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#reporting_disabled = false
host = "localhost"
file_buffer_size = '1M'

[deployment]
#mode = 'singleton'
Expand Down
15 changes: 15 additions & 0 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::io::Read;
use std::path::{Path, PathBuf};

use check::{CheckConfig, CheckConfigResult};
use codec::bytes_num;
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};

pub use crate::cache_config::*;
Expand Down Expand Up @@ -48,6 +50,10 @@ pub struct Config {
#[serde(default = "Config::default_host")]
pub host: String,

///
#[serde(with = "bytes_num", default = "Config::default_file_buffer_size")]
pub file_buffer_size: u64,

///
#[serde(default = "Default::default")]
pub deployment: DeploymentConfig,
Expand Down Expand Up @@ -96,11 +102,15 @@ pub struct Config {
pub trace: TraceConfig,
}

// TODO: All configuration items should use global variables
pub static FILE_BUFFER_SIZE: OnceCell<usize> = OnceCell::new();

impl Default for Config {
fn default() -> Self {
Self {
reporting_disabled: Self::default_reporting_disabled(),
host: Self::default_host(),
file_buffer_size: Self::default_file_buffer_size(),
deployment: Default::default(),
query: Default::default(),
storage: Default::default(),
Expand All @@ -126,6 +136,10 @@ impl Config {
"localhost".to_string()
}

fn default_file_buffer_size() -> u64 {
1024 * 1024
}

pub fn override_by_env(&mut self) {
self.cluster.override_by_env();
self.storage.override_by_env();
Expand Down Expand Up @@ -183,6 +197,7 @@ pub fn get_config(path: impl AsRef<Path>) -> Result<Config, std::io::Error> {
}
};
config.wal.introspect();
FILE_BUFFER_SIZE.get_or_init(|| config.file_buffer_size as usize);
Ok(config)
}

Expand Down
12 changes: 8 additions & 4 deletions tskv/src/file_system/file/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::io::{Error, ErrorKind, IoSlice, Result, SeekFrom};

use config::FILE_BUFFER_SIZE;

use crate::file_system::file::async_file::AsyncFile;
use crate::file_system::file::IFile;

const BUFFER_SIZE: usize = 1024 * 1024;
const DEFAULT_FILE_BUFFER_SIZE: usize = 1024 * 1024;

pub struct FileCursor {
file: AsyncFile,
Expand Down Expand Up @@ -33,15 +35,17 @@ impl FileCursor {

pub async fn write(&mut self, buf: &[u8]) -> Result<usize> {
self.buf.extend_from_slice(buf);
self.try_flush(BUFFER_SIZE).await?;
self.try_flush(*FILE_BUFFER_SIZE.get_or_init(|| DEFAULT_FILE_BUFFER_SIZE))
.await?;
Ok(buf.len())
}

pub async fn write_vec<'a>(&mut self, bufs: &'a [IoSlice<'a>]) -> Result<usize> {
let sum = bufs.iter().fold(0, |acc, buf| acc + buf.len());
self.buf.reserve(sum);
bufs.iter().for_each(|buf| self.buf.extend_from_slice(buf));
self.try_flush(BUFFER_SIZE).await?;
self.try_flush(*FILE_BUFFER_SIZE.get_or_init(|| DEFAULT_FILE_BUFFER_SIZE))
.await?;
Ok(sum)
}

Expand Down Expand Up @@ -97,7 +101,7 @@ impl From<AsyncFile> for FileCursor {
FileCursor {
file,
pos: 0,
buf: Vec::with_capacity(BUFFER_SIZE),
buf: Vec::with_capacity(*FILE_BUFFER_SIZE.get_or_init(|| DEFAULT_FILE_BUFFER_SIZE)),
}
}
}

0 comments on commit eca1b24

Please sign in to comment.