Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement CacheContainer & TableFlownodeSetCache #3885

Merged
merged 13 commits into from
May 9, 2024
Prev Previous commit
Next Next commit
feat: add metrics
  • Loading branch information
WenyXu committed May 9, 2024
commit 933159f66fde3e8321655fc8b2b6d8af9e967f76
18 changes: 15 additions & 3 deletions src/common/meta/src/cache/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use snafu::{OptionExt, ResultExt};
use crate::cache_invalidator::{CacheInvalidator, Context};
use crate::error::{self, Error, Result};
use crate::instruction::CacheIdent;
use crate::metrics;

/// Filters out unused [CacheToken]s
pub type TokenFilter<CacheToken> = Box<dyn Fn(&CacheToken) -> bool + Send + Sync>;
Expand All @@ -38,6 +39,7 @@ pub type Initializer<K, V> = Arc<dyn Fn(&'_ K) -> BoxFuture<'_, Result<Option<V>
/// - Cache value loaded by [Initializer].
/// - Invalidate caches by [Invalidator].
pub struct CacheContainer<K, V, CacheToken> {
name: String,
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
Expand All @@ -52,12 +54,14 @@ where
{
/// Constructs an [CacheContainer].
pub fn new(
name: String,
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: TokenFilter<CacheToken>,
) -> Self {
Self {
name,
cache,
invalidator,
initializer,
Expand Down Expand Up @@ -128,9 +132,17 @@ where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
metrics::CACHE_CONTAINER_CACHE_GET
.with_label_values(&[&self.name])
.inc();
let moved_init = self.initializer.clone();
let moved_key = key.to_owned();

let init = async move {
metrics::CACHE_CONTAINER_CACHE_MISS
.with_label_values(&[&self.name])
.inc();

moved_init(&moved_key)
.await
.transpose()
Expand Down Expand Up @@ -174,7 +186,7 @@ mod tests {
let invalidator: Invalidator<NameKey, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));

let adv_cache = CacheContainer::new(cache, invalidator, init, filter);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let key = NameKey { name: "key" };
let value = adv_cache.get(key).await.unwrap().unwrap();
assert_eq!(value, "hi");
Expand All @@ -193,7 +205,7 @@ mod tests {
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));

let adv_cache = CacheContainer::new(cache, invalidator, init, filter);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
Expand Down Expand Up @@ -221,7 +233,7 @@ mod tests {
})
});

let adv_cache = CacheContainer::new(cache, invalidator, init, filter);
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
Expand Down
9 changes: 5 additions & 4 deletions src/common/meta/src/cache/flow/table_flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent

/// Constructs a [TableFlownodeSetCache].
pub fn new_table_flownode_set_cache(
name: String,
cache: Cache<TableId, FlownodeSet>,
kv_backend: KvBackendRef,
) -> TableFlownodeSetCache {
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
let init = init_factory(table_flow_manager);

CacheContainer::new(cache, Box::new(invalidator), init, Box::new(filter))
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}

fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
Expand Down Expand Up @@ -146,7 +147,7 @@ mod tests {
async fn test_cache_empty_set() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache(cache, mem_kv);
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let set = cache.get(1024).await.unwrap().unwrap();
assert!(set.is_empty());
}
Expand All @@ -155,7 +156,7 @@ mod tests {
async fn test_create_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache(cache, mem_kv);
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
Expand All @@ -171,7 +172,7 @@ mod tests {
async fn test_drop_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache(cache, mem_kv);
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
Expand Down
14 changes: 14 additions & 0 deletions src/common/meta/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,18 @@ lazy_static! {
&["step"]
)
.unwrap();
/// Cache container cache get counter.
pub static ref CACHE_CONTAINER_CACHE_GET: IntCounterVec = register_int_counter_vec!(
"greptime_meta_cache_container_cache_get",
"cache container cache get",
&["name"]
)
.unwrap();
/// Cache container cache miss counter.
pub static ref CACHE_CONTAINER_CACHE_MISS: IntCounterVec = register_int_counter_vec!(
"greptime_meta_cache_container_cache_miss",
"cache container cache miss",
&["name"]
)
.unwrap();
}
Loading