Skip to content

Commit

Permalink
Fix(2.3): Compaction cache offset overflow; Field blocks in delta fil…
Browse files Browse the repository at this point in the history
…es in order of field_id (#2174)

* refactor(compaction,2.3): make field blocks in delta files ordered by field_id

* refactor(compaction,2.3): reduce the memory cost in flush
  • Loading branch information
zipper-meng committed Jun 14, 2024
1 parent 6532479 commit 35c541b
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 63 deletions.
1 change: 1 addition & 0 deletions common/models/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub type ColumnId = u32;
pub type TagKey = Vec<u8>;
pub type TagValue = Vec<u8>;

/// Unite of ColumnId(u32, high) and SeriesId(u32, low).
pub type FieldId = u64;
pub type FieldName = Vec<u8>;

Expand Down
15 changes: 11 additions & 4 deletions common/models/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ pub const MINUTES_MILLS: i64 = 60 * SECOND_MILLS;
pub const HOUR_MILLS: i64 = 60 * MINUTES_MILLS;
pub const DAY_MILLS: i64 = 24 * HOUR_MILLS;

#[inline(always)]
pub fn high(id: u64) -> u32 {
((id & HIGH_32BIT_MASK) >> 32) as u32
}

#[inline(always)]
pub fn low(id: u64) -> u32 {
(id & LOW_32BIT_MASK) as u32
}

pub fn split_id(id: u64) -> (u32, u32) {
(
((id & HIGH_32BIT_MASK) >> 32) as u32,
(id & LOW_32BIT_MASK) as u32,
)
(high(id), low(id))
}

pub fn unite_id(hash_id: u32, incr_id: u32) -> u64 {
Expand Down
44 changes: 35 additions & 9 deletions tskv/src/compaction/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ impl TsmCache {
let block_meta_len = block_meta.size();

let mut load_off_start = 0_u64;
let mut load_len = 0_usize;
let mut load_len = 0_u64;
let mut found_curr_field = false;
'idx_iter: while let Some(idx) = self.index_iter.peek() {
let blk_meta_iter = match out_time_ranges {
Expand All @@ -1004,34 +1004,49 @@ impl TsmCache {
continue;
}
let blk_len = blk.size();
if blk_len > (self.capacity - load_len) as u64 {
if load_len + blk_len > self.capacity as u64 {
// Cache is full, stop loading next BlockMeta, nether next IndeMeta.
trace::trace!(
"Cache(file:{}) is full({load_len} + {blk_len}> {} at offset: {load_off_start}",
tsm_reader.file_id(), self.capacity,
);
break 'idx_iter;
}

if load_off_start == 0 {
load_off_start = blk_off;
}
load_len = (blk_off + blk_len - load_off_start) as usize;
load_len = blk_off + blk_len - load_off_start;
}

self.index_iter.next();
}

if load_len == 0 {
trace::debug!("Cache not loaded, block_meta not found, file:{},field:{},off:{},len:{}, cache: off:{},len:{}",
tsm_reader.file_id(),
block_meta.field_id(),
block_meta.offset(),
block_meta.size(),
self.tsm_off,
self.data.len(),
);
return Ok(None);
}
self.tsm_off = load_off_start;
trace::trace!(
"Filling cache from file:{} for block_meta: field:{},off:{},len:{}, cache_scale: off:{},len:{}",
"Filling cache from file:{} for block_meta: file:{},field:{},off:{},len:{}, cache_scale: off:{},len:{}",
tsm_reader.file_id(),
block_meta.file_id(),
block_meta.field_id(),
block_meta.offset(),
block_meta.size(),
load_off_start,
load_len
);
self.data = tsm_reader.get_raw_data(load_off_start, load_len).await?;
self.data = tsm_reader
.get_raw_data(load_off_start, load_len as usize)
.await?;

let cache_off = (block_meta_off - self.tsm_off) as usize;
Ok(Some(
Expand All @@ -1042,12 +1057,23 @@ impl TsmCache {
/// Read cached data of block_meta, if data of block_meta is not loaded, return None.
fn read(&self, block_meta: &BlockMeta) -> Option<&[u8]> {
let blk_off = block_meta.offset();
// The block_meta is not in the cache.
if blk_off < self.tsm_off {
trace::debug!("Trying to load previous data, block_meta: file:{},field:{},off:{},len:{}, cache: off:{},len:{}",
block_meta.file_id(),
block_meta.field_id(),
block_meta.offset(),
block_meta.size(),
self.tsm_off,
self.data.len(),
);
return None;
}

let blk_len = block_meta.size() as usize;
let cache_off = (blk_off - self.tsm_off) as usize;
// The block_meta is not in the cache.
if blk_off < self.tsm_off
// Cached data is exceeded, need to fill the cache from the block_meta.
|| cache_off >= self.data.len()
// Cached data is exceeded, need to fill the cache from the block_meta.
if cache_off >= self.data.len()
// This means the block_meta is not for the file.
|| cache_off + blk_len > self.data.len()
{
Expand Down
101 changes: 56 additions & 45 deletions tskv/src/compaction/flush.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use models::codec::Encoding;
use models::{utils as model_utils, ColumnId, FieldId, SeriesId, Timestamp, ValueType};
use models::{FieldId, SeriesId, Timestamp, ValueType};
use parking_lot::RwLock;
use snafu::ResultExt;
use tokio::sync::mpsc::Sender;
Expand All @@ -16,7 +16,9 @@ use utils::BloomFilter;
use crate::compaction::{CompactTask, FlushReq};
use crate::context::{GlobalContext, GlobalSequenceContext};
use crate::error::{self, Result};
use crate::memcache::{FieldVal, MemCache, SeriesData};
use crate::memcache::{
FieldVal, MemCache, SeriesData, SeriesDataFieldValues, SeriesDataFieldValuesBuilder,
};
use crate::summary::{CompactMeta, CompactMetaBuilder, SummaryTask, VersionEdit};
use crate::tsm::codec::DataBlockEncoding;
use crate::tsm::{self, DataBlock, TsmWriter};
Expand Down Expand Up @@ -65,8 +67,8 @@ impl FlushTask {
}
// TODO(zipper, delta_compaction): this changed the order of fields in a tsm file.
// For old tsm file , field_ids are not ordered, so cannot use delta-compaction cache.
let mut flushing_mems_data: BTreeMap<SeriesId, Vec<Arc<RwLock<SeriesData>>>> =
BTreeMap::new();
let mut flushing_mems_data: HashMap<SeriesId, Vec<Arc<RwLock<SeriesData>>>> =
HashMap::new();
let flushing_mems_len = flushing_mems.len();
for mem in flushing_mems.into_iter() {
let seq_no = mem.seq_no();
Expand Down Expand Up @@ -108,56 +110,65 @@ impl FlushTask {
/// (Sometimes one of the two file type.), returns `CompactMeta`s of the wrote files.
async fn flush_mem_caches(
&self,
mut caches_data: BTreeMap<SeriesId, Vec<Arc<RwLock<SeriesData>>>>,
caches_data: HashMap<SeriesId, Vec<Arc<RwLock<SeriesData>>>>,
max_data_block_size: usize,
) -> Result<Option<(CompactMeta, Arc<BloomFilter>)>> {
let mut writer = WriterWrapper::new(self.ts_family_id, max_data_block_size);

let mut column_encoding_map: HashMap<ColumnId, Encoding> = HashMap::new();
let mut column_values_map: BTreeMap<ColumnId, (ValueType, Vec<(Timestamp, FieldVal)>)> =
BTreeMap::new();
for (sid, series_datas) in caches_data.iter_mut() {
column_encoding_map.clear();
column_values_map.clear();

let mut field_values: Vec<SeriesDataFieldValues> = Vec::with_capacity(caches_data.len());
for series_datas in caches_data.values() {
// Iterates [ MemCache ] -> next_series_id -> [ SeriesData ]
for series_data in series_datas.iter_mut() {
// Iterates SeriesData -> [ RowGroups{ schema_id, schema, [ RowData ] } ]
for (_sch_id, sch_cols, rows) in series_data.read().flat_groups() {
for i in sch_cols.columns().iter() {
column_encoding_map.insert(i.id, i.encoding);
}
// Iterates [ RowData ]
for row in rows.iter() {
// Iterates RowData -> [ Option<FieldVal>, column_id ]
for (val, col) in row.fields.iter().zip(sch_cols.fields().iter()) {
if let Some(v) = val {
let (_, col_vals) = column_values_map
.entry(col.id)
.or_insert_with(|| (v.value_type(), Vec::with_capacity(64)));
col_vals.push((row.ts, v.clone()));
}
}
for series_data in series_datas {
// Build lazy field value extractor.
if let Some(builder) = SeriesDataFieldValuesBuilder::try_new(series_data.read()) {
field_values.extend(builder.build());
}
}
}
field_values.sort_unstable();

let mut values: Vec<(Timestamp, FieldVal)> = Vec::new();
let mut last_field_id = None::<FieldId>;
let mut value_type = ValueType::Unknown;
let mut value_encoding = Encoding::Unknown;
for fv in field_values.iter() {
if let Some(last_fid) = last_field_id {
if fv.field_id() != last_fid {
if !values.is_empty() {
// Flush the values of last field.
values.sort_by_key(|(ts, _)| *ts);
utils::dedup_front_by_key(&mut values, |(ts, _)| *ts);
let encoding = DataBlockEncoding::new(Encoding::Default, value_encoding);
writer
.write_field(last_fid, &values, value_type, encoding, self)
.await?;
values.clear();
}
// Set current field_id and append the values for new field.
last_field_id = Some(fv.field_id());
(value_type, value_encoding) = fv.get(&mut values);
} else {
// Append the value of the same field.
let _ = fv.get(&mut values);
}
} else {
// First loop, set current field_id and append the values.
last_field_id = Some(fv.field_id());
(value_type, value_encoding) = fv.get(&mut values);
}

for (col_id, (value_type, values)) in column_values_map.iter_mut() {
// Sort by timestamp.
values.sort_by_key(|a| a.0);
// Dedup by timestamp.
utils::dedup_front_by_key(values, |a| a.0);

let field_id = model_utils::unite_id(*col_id, *sid);
let encoding = DataBlockEncoding::new(
Encoding::Default,
column_encoding_map.get(col_id).copied().unwrap_or_default(),
);
}
if !values.is_empty() {
// The values of last field not flushed.
if let Some(last_fid) = last_field_id {
values.sort_by_key(|(ts, _)| *ts);
utils::dedup_front_by_key(&mut values, |(ts, _)| *ts);
let encoding = DataBlockEncoding::new(Encoding::Default, value_encoding);
writer
.write_field(field_id, values, value_type, encoding, self)
.write_field(last_fid, &values, value_type, encoding, self)
.await?;
}
}
drop(values);

// Flush the wrote files.
writer.finish().await
Expand Down Expand Up @@ -299,7 +310,7 @@ impl WriterWrapper {
&mut self,
field_id: FieldId,
values: &[(Timestamp, FieldVal)],
value_type: &ValueType,
value_type: ValueType,
encoding: DataBlockEncoding,
flush_task: &FlushTask,
) -> Result<()> {
Expand Down Expand Up @@ -725,7 +736,7 @@ pub mod flush_tests {
expected_delta_data.push((field_id, vec![delta_data_block]));

writer
.write_field(field_id, &values, &value_type, encoding, flush_task)
.write_field(field_id, &values, value_type, encoding, flush_task)
.await
.unwrap();
}
Expand Down
Loading

0 comments on commit 35c541b

Please sign in to comment.