From 994a70644f0c7e099474cd426f58b8580f373e74 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 8 May 2024 10:58:52 +0000 Subject: [PATCH 01/13] feat: implement the `CacheContainer` --- Cargo.lock | 1 + src/common/meta/Cargo.toml | 1 + src/common/meta/src/cache.rs | 189 +++++++++++++++++++++++++++++++++++ src/common/meta/src/error.rs | 11 +- src/common/meta/src/lib.rs | 3 + 5 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 src/common/meta/src/cache.rs diff --git a/Cargo.lock b/Cargo.lock index 2560897f6186..c0f4257dfea6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1934,6 +1934,7 @@ dependencies = [ "hyper", "itertools 0.10.5", "lazy_static", + "moka", "prometheus", "prost 0.12.4", "rand", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 40df9d6180c0..ece51ea6ab73 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -38,6 +38,7 @@ hex = { version = "0.4" } humantime-serde.workspace = true itertools.workspace = true lazy_static.workspace = true +moka.workspace = true prometheus.workspace = true prost.workspace = true rand.workspace = true diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs new file mode 100644 index 000000000000..6b5f5bbccd18 --- /dev/null +++ b/src/common/meta/src/cache.rs @@ -0,0 +1,189 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Borrow; +use std::hash::Hash; +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::{OptionExt, ResultExt}; + +use crate::cache_invalidator::{CacheInvalidator, Context}; +use crate::error::{self, Error, Result}; +use crate::instruction::CacheIdent; + +/// Filters out unused [CacheToken]s +pub type TokenFilter = Box) -> Vec + Send + Sync>; + +/// Invalidates cached values by [CacheToken]s. +pub type Invalidator = + Box>, Vec) -> BoxFuture<'static, Result<()>> + Send + Sync>; + +/// Initializes value (i.e., fetches from remote). +pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; + +/// [CacheContainer] provides ability to: +/// - Cache value loaded by [Init]. +/// - Invalidate caches by [Invalidator]. +pub struct CacheContainer { + cache: Arc>, + invalidator: Invalidator, + initializer: Initializer, + token_filter: TokenFilter, +} + +impl CacheContainer +where + K: Send + Sync, + V: Send + Sync, + CacheToken: Send + Sync, +{ + /// Constructs an [AdvancedCache]. + pub fn new( + cache: Cache, + invalidator: Invalidator, + init: Initializer, + filter: TokenFilter, + ) -> Self { + Self { + cache: Arc::new(cache), + invalidator, + initializer: init, + token_filter: filter, + } + } +} + +#[async_trait::async_trait] +impl CacheInvalidator for CacheContainer +where + K: Send + Sync, + V: Send + Sync, +{ + async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { + let caches = (self.token_filter)(caches); + if !caches.is_empty() { + (self.invalidator)(self.cache.clone(), caches).await?; + } + Ok(()) + } +} + +impl CacheContainer +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Invalidates cache by [CacheToken]. + pub async fn invalidate(&self, caches: Vec) -> Result<()> { + let caches = (self.token_filter)(caches); + if !caches.is_empty() { + (self.invalidator)(self.cache.clone(), caches).await?; + } + Ok(()) + } + + /// Returns a _clone_ of the value corresponding to the key. + pub async fn get(&self, key: &Q) -> Result> + where + K: Borrow, + Q: ToOwned + Hash + Eq + ?Sized, + { + let moved_init = self.initializer.clone(); + let moved_key = key.to_owned(); + let init = async move { + moved_init(&moved_key) + .await + .transpose() + .context(error::ValueNotExistSnafu)? + }; + + match self.cache.try_get_with_by_ref(key, init).await { + Ok(value) => Ok(Some(value)), + Err(err) => match err.as_ref() { + Error::ValueNotExist { .. } => Ok(None), + _ => Err(err).context(error::GetCacheSnafu), + }, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicI32, Ordering}; + use std::sync::Arc; + + use moka::future::{Cache, CacheBuilder}; + + use super::*; + + #[tokio::test] + async fn test_get() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter = Box::new(|caches| caches); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let value = adv_cache.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + let value = adv_cache.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 1); + let value = adv_cache.get("bar").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 2); + } + + #[tokio::test] + async fn test_invalidate() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter = Box::new(|caches| caches); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = Box::new(|cache, keys| { + Box::pin(async move { + for key in keys { + cache.invalidate(&key).await; + } + Ok(()) + }) + }); + + let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let value = adv_cache.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + let value = adv_cache.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 1); + adv_cache + .invalidate(vec!["foo".to_string(), "bar".to_string()]) + .await + .unwrap(); + let value = adv_cache.get("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 2); + } +} diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index cd7583967188..f24d95522ee8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::str::Utf8Error; +use std::sync::Arc; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; @@ -447,6 +448,12 @@ pub enum Error { error: std::string::FromUtf8Error, location: Location, }, + + #[snafu(display("Value not exists"))] + ValueNotExist { location: Location }, + + #[snafu(display("Failed to get cache"))] + GetCache { source: Arc }, } pub type Result = std::result::Result; @@ -460,7 +467,9 @@ impl ErrorExt for Error { | EtcdFailed { .. } | EtcdTxnFailed { .. } | ConnectEtcd { .. } - | MoveValues { .. } => StatusCode::Internal, + | MoveValues { .. } + | ValueNotExist { .. } + | GetCache { .. } => StatusCode::Internal, SerdeJson { .. } | ParseOption { .. } diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 5236a09767de..8c75799efe75 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -18,7 +18,10 @@ #![feature(let_chains)] #![feature(extract_if)] #![feature(hash_extract_if)] +#![feature(type_alias_impl_trait)] +#![feature(impl_trait_in_fn_trait_return)] +pub mod cache; pub mod cache_invalidator; pub mod cluster; pub mod ddl; From df76d8a1a6bf7e0c34e64b4208125729ff8fc5fe Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 8 May 2024 12:20:41 +0000 Subject: [PATCH 02/13] feat: implement the `TableFlownodeSetCache` --- src/common/meta/src/cache.rs | 4 +- src/common/meta/src/cache/flow.rs | 16 ++ .../meta/src/cache/flow/table_flownode.rs | 209 ++++++++++++++++++ src/common/meta/src/cache_invalidator.rs | 4 + src/common/meta/src/instruction.rs | 18 +- 5 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 src/common/meta/src/cache/flow.rs create mode 100644 src/common/meta/src/cache/flow/table_flownode.rs diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 6b5f5bbccd18..635a343862df 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod flow; + use std::borrow::Borrow; use std::hash::Hash; use std::sync::Arc; @@ -50,7 +52,7 @@ where V: Send + Sync, CacheToken: Send + Sync, { - /// Constructs an [AdvancedCache]. + /// Constructs an [CacheContainer]. pub fn new( cache: Cache, invalidator: Invalidator, diff --git a/src/common/meta/src/cache/flow.rs b/src/common/meta/src/cache/flow.rs new file mode 100644 index 000000000000..51b16d8999e6 --- /dev/null +++ b/src/common/meta/src/cache/flow.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod table_flownode; +pub use table_flownode::{new_table_flownode_set_cache, TableFlownodeSetCache}; diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs new file mode 100644 index 000000000000..ff9ca5b9ca1c --- /dev/null +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -0,0 +1,209 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::TryStreamExt; +use moka::future::Cache; +use moka::ops::compute::Op; +use table::metadata::TableId; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error::Result; +use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; +use crate::key::flow::{TableFlowManager, TableFlowManagerRef}; +use crate::kv_backend::KvBackendRef; +use crate::FlownodeId; + +type FlownodeSet = HashSet; + +/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping. +pub type TableFlownodeSetCache = CacheContainer; + +/// Constructs a [TableFlownodeSetCache]. +pub fn new_table_flownode_set_cache( + cache: Cache, + kv_backend: KvBackendRef, +) -> TableFlownodeSetCache { + let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend)); + let init = init_factory(table_flow_manager); + + CacheContainer::new(cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer { + Arc::new(move |&table_id| { + let table_flow_manager = table_flow_manager.clone(); + Box::pin(async move { + table_flow_manager + .flows(table_id) + .map_ok(|key| key.flownode_id()) + .try_collect::>() + .await + .map(Some) + }) + }) +} + +async fn invalidate_create_flow( + cache: &Cache, + CreateFlow { + source_table_ids, + flownode_ids, + }: CreateFlow, +) { + for table_id in source_table_ids { + let entry = cache.entry(table_id); + entry + .and_compute_with( + async |entry: Option>>| match entry { + Some(entry) => { + let mut set = entry.into_value(); + set.extend(flownode_ids.clone()); + + Op::Put(set) + } + None => Op::Put(HashSet::from_iter(flownode_ids.clone())), + }, + ) + .await; + } +} + +async fn invalidate_drop_flow( + cache: &Cache, + DropFlow { + source_table_ids, + flownode_ids, + }: DropFlow, +) { + for table_id in source_table_ids { + let entry = cache.entry(table_id); + entry + .and_compute_with( + async |entry: Option>>| match entry { + Some(entry) => { + let mut set = entry.into_value(); + for flownode_id in &flownode_ids { + set.remove(flownode_id); + } + + Op::Put(set) + } + None => { + // Do nothing + Op::Nop + } + }, + ) + .await; + } +} + +fn invalidator( + cache: Arc>, + caches: Vec, +) -> BoxFuture<'static, Result<()>> { + Box::pin(async move { + for ident in caches { + match ident { + CacheIdent::CreateFlow(create_flow) => { + invalidate_create_flow(&cache, create_flow).await + } + CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(&cache, drop_flow).await, + _ => {} + } + } + + Ok(()) + }) +} + +fn filter(caches: Vec) -> Vec { + caches + .into_iter() + .filter(|cache| matches!(cache, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))) + .collect() +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + use std::sync::Arc; + + use moka::future::CacheBuilder; + + use crate::cache::flow::table_flownode::new_table_flownode_set_cache; + use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; + use crate::kv_backend::memory::MemoryKvBackend; + + #[tokio::test] + async fn test_cache_empty_set() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache(cache, mem_kv); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert!(set.is_empty()); + } + + #[tokio::test] + async fn test_create_flow() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache(cache, mem_kv); + let ident = vec![CacheIdent::CreateFlow(CreateFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![1, 2, 3, 4, 5], + })]; + cache.invalidate(ident).await.unwrap(); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert_eq!(set.len(), 5); + let set = cache.get(&1025).await.unwrap().unwrap(); + assert_eq!(set.len(), 5); + } + + #[tokio::test] + async fn test_drop_flow() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache(cache, mem_kv); + let ident = vec![ + CacheIdent::CreateFlow(CreateFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![1, 2, 3, 4, 5], + }), + CacheIdent::CreateFlow(CreateFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![11, 12], + }), + ]; + cache.invalidate(ident).await.unwrap(); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert_eq!(set.len(), 7); + let set = cache.get(&1025).await.unwrap().unwrap(); + assert_eq!(set.len(), 7); + + let ident = vec![CacheIdent::DropFlow(DropFlow { + source_table_ids: vec![1024, 1025], + flownode_ids: vec![1, 2, 3, 4, 5], + })]; + cache.invalidate(ident).await.unwrap(); + let set = cache.get(&1024).await.unwrap().unwrap(); + assert_eq!(set, HashSet::from([11, 12])); + let set = cache.get(&1025).await.unwrap().unwrap(); + assert_eq!(set, HashSet::from([11, 12])); + } +} diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index fb62e7a61a25..f547040b9a5c 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -112,6 +112,10 @@ where let key: SchemaNameKey = (&schema_name).into(); self.invalidate_key(&key.to_bytes()).await; } + CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => { + // TODO(weny): implements it + unimplemented!() + } } } Ok(()) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 7981f2449d35..79a641edc2c6 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -23,7 +23,7 @@ use table::metadata::TableId; use crate::key::schema_name::SchemaName; use crate::table_name::TableName; -use crate::{ClusterId, DatanodeId}; +use crate::{ClusterId, DatanodeId, FlownodeId}; #[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct RegionIdent { @@ -152,12 +152,26 @@ pub struct UpgradeRegion { pub wait_for_replay_timeout: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] /// The identifier of cache. pub enum CacheIdent { TableId(TableId), TableName(TableName), SchemaName(SchemaName), + CreateFlow(CreateFlow), + DropFlow(DropFlow), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CreateFlow { + pub source_table_ids: Vec, + pub flownode_ids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DropFlow { + pub source_table_ids: Vec, + pub flownode_ids: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] From 286ddcd56f200b302d77acab9743630e0f1df5dc Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 8 May 2024 13:20:57 +0000 Subject: [PATCH 03/13] chore: remove unused feature --- src/common/meta/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 8c75799efe75..971e92ad79a9 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -18,8 +18,6 @@ #![feature(let_chains)] #![feature(extract_if)] #![feature(hash_extract_if)] -#![feature(type_alias_impl_trait)] -#![feature(impl_trait_in_fn_trait_return)] pub mod cache; pub mod cache_invalidator; From b245cef77a054a6fb8a286c235cf13f8193072f5 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 8 May 2024 14:22:28 +0000 Subject: [PATCH 04/13] chore: remove unused `Arc` --- src/common/meta/src/cache.rs | 20 +++++++++---------- .../meta/src/cache/flow/table_flownode.rs | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 635a343862df..bce55c2a1c56 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -31,16 +31,16 @@ pub type TokenFilter = Box) -> Vec = - Box>, Vec) -> BoxFuture<'static, Result<()>> + Send + Sync>; + Box, Vec) -> BoxFuture<'_, Result<()>> + Send + Sync>; /// Initializes value (i.e., fetches from remote). -pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; +pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; /// [CacheContainer] provides ability to: /// - Cache value loaded by [Init]. /// - Invalidate caches by [Invalidator]. pub struct CacheContainer { - cache: Arc>, + cache: Cache, invalidator: Invalidator, initializer: Initializer, token_filter: TokenFilter, @@ -56,14 +56,14 @@ where pub fn new( cache: Cache, invalidator: Invalidator, - init: Initializer, - filter: TokenFilter, + initializer: Initializer, + token_filter: TokenFilter, ) -> Self { Self { - cache: Arc::new(cache), + cache, invalidator, - initializer: init, - token_filter: filter, + initializer, + token_filter, } } } @@ -77,7 +77,7 @@ where async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { let caches = (self.token_filter)(caches); if !caches.is_empty() { - (self.invalidator)(self.cache.clone(), caches).await?; + (self.invalidator)(&self.cache, caches).await?; } Ok(()) } @@ -92,7 +92,7 @@ where pub async fn invalidate(&self, caches: Vec) -> Result<()> { let caches = (self.token_filter)(caches); if !caches.is_empty() { - (self.invalidator)(self.cache.clone(), caches).await?; + (self.invalidator)(&self.cache, caches).await?; } Ok(()) } diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index ff9ca5b9ca1c..7a6e76696e38 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -114,16 +114,16 @@ async fn invalidate_drop_flow( } fn invalidator( - cache: Arc>, + cache: &'_ Cache, caches: Vec, -) -> BoxFuture<'static, Result<()>> { +) -> BoxFuture<'_, Result<()>> { Box::pin(async move { for ident in caches { match ident { CacheIdent::CreateFlow(create_flow) => { - invalidate_create_flow(&cache, create_flow).await + invalidate_create_flow(cache, create_flow).await } - CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(&cache, drop_flow).await, + CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(cache, drop_flow).await, _ => {} } } From 630da814f818a7673016ca11cfe7d818ac1f135a Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 8 May 2024 15:47:03 +0000 Subject: [PATCH 05/13] refactor: refactor origin `get` to `get_by_ref` --- src/common/meta/src/cache.rs | 64 +++++++++++++++++-- .../meta/src/cache/flow/table_flownode.rs | 14 ++-- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index bce55c2a1c56..34086fe54adc 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -83,6 +83,32 @@ where } } +impl CacheContainer +where + K: Copy + Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Returns a _clone_ of the value corresponding to the key. + pub async fn get(&self, key: K) -> Result> { + let moved_init = self.initializer.clone(); + let moved_key = key; + let init = async move { + moved_init(&moved_key) + .await + .transpose() + .context(error::ValueNotExistSnafu)? + }; + + match self.cache.try_get_with(key, init).await { + Ok(value) => Ok(Some(value)), + Err(err) => match err.as_ref() { + Error::ValueNotExist { .. } => Ok(None), + _ => Err(err).context(error::GetCacheSnafu), + }, + } + } +} + impl CacheContainer where K: Hash + Eq + Send + Sync + 'static, @@ -98,7 +124,7 @@ where } /// Returns a _clone_ of the value corresponding to the key. - pub async fn get(&self, key: &Q) -> Result> + pub async fn get_by_ref(&self, key: &Q) -> Result> where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, @@ -131,8 +157,32 @@ mod tests { use super::*; + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] + struct NameKey<'a> { + name: &'a str, + } + #[tokio::test] async fn test_get() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter = Box::new(|caches| caches); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let key = NameKey { name: "key" }; + let value = adv_cache.get(key).await.unwrap().unwrap(); + assert_eq!(value, "hi"); + } + + #[tokio::test] + async fn test_get_by_ref() { let cache: Cache = CacheBuilder::new(128).build(); let filter = Box::new(|caches| caches); let counter = Arc::new(AtomicI32::new(0)); @@ -145,12 +195,12 @@ mod tests { Box::new(|_, _| Box::pin(async { Ok(()) })); let adv_cache = CacheContainer::new(cache, invalidator, init, filter); - let value = adv_cache.get("foo").await.unwrap().unwrap(); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); - let value = adv_cache.get("foo").await.unwrap().unwrap(); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); assert_eq!(counter.load(Ordering::Relaxed), 1); - let value = adv_cache.get("bar").await.unwrap().unwrap(); + let value = adv_cache.get_by_ref("bar").await.unwrap().unwrap(); assert_eq!(value, "hi"); assert_eq!(counter.load(Ordering::Relaxed), 2); } @@ -175,16 +225,16 @@ mod tests { }); let adv_cache = CacheContainer::new(cache, invalidator, init, filter); - let value = adv_cache.get("foo").await.unwrap().unwrap(); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); - let value = adv_cache.get("foo").await.unwrap().unwrap(); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); assert_eq!(counter.load(Ordering::Relaxed), 1); adv_cache .invalidate(vec!["foo".to_string(), "bar".to_string()]) .await .unwrap(); - let value = adv_cache.get("foo").await.unwrap().unwrap(); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); assert_eq!(counter.load(Ordering::Relaxed), 2); } diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 7a6e76696e38..5e44923dc211 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -155,7 +155,7 @@ mod tests { let mem_kv = Arc::new(MemoryKvBackend::default()); let cache = CacheBuilder::new(128).build(); let cache = new_table_flownode_set_cache(cache, mem_kv); - let set = cache.get(&1024).await.unwrap().unwrap(); + let set = cache.get(1024).await.unwrap().unwrap(); assert!(set.is_empty()); } @@ -169,9 +169,9 @@ mod tests { flownode_ids: vec![1, 2, 3, 4, 5], })]; cache.invalidate(ident).await.unwrap(); - let set = cache.get(&1024).await.unwrap().unwrap(); + let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set.len(), 5); - let set = cache.get(&1025).await.unwrap().unwrap(); + let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!(set.len(), 5); } @@ -191,9 +191,9 @@ mod tests { }), ]; cache.invalidate(ident).await.unwrap(); - let set = cache.get(&1024).await.unwrap().unwrap(); + let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set.len(), 7); - let set = cache.get(&1025).await.unwrap().unwrap(); + let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!(set.len(), 7); let ident = vec![CacheIdent::DropFlow(DropFlow { @@ -201,9 +201,9 @@ mod tests { flownode_ids: vec![1, 2, 3, 4, 5], })]; cache.invalidate(ident).await.unwrap(); - let set = cache.get(&1024).await.unwrap().unwrap(); + let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set, HashSet::from([11, 12])); - let set = cache.get(&1025).await.unwrap().unwrap(); + let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!(set, HashSet::from([11, 12])); } } From 11626e2ff86ed19afaefbf0ada4fc601d28c87a8 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 8 May 2024 15:49:21 +0000 Subject: [PATCH 06/13] chore: update comments --- src/common/meta/src/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 34086fe54adc..fb84f181c2a1 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -37,7 +37,7 @@ pub type Invalidator = pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; /// [CacheContainer] provides ability to: -/// - Cache value loaded by [Init]. +/// - Cache value loaded by [Initializer]. /// - Invalidate caches by [Invalidator]. pub struct CacheContainer { cache: Cache, From f578f4a5675fac91186ee470f9133bedfa336d74 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 02:56:29 +0000 Subject: [PATCH 07/13] refactor: refactor `CacheContainer` --- src/common/meta/src/cache.rs | 33 +++++++------- .../meta/src/cache/flow/table_flownode.rs | 44 ++++++++----------- 2 files changed, 34 insertions(+), 43 deletions(-) diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index fb84f181c2a1..c934bf9ff844 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -27,11 +27,11 @@ use crate::error::{self, Error, Result}; use crate::instruction::CacheIdent; /// Filters out unused [CacheToken]s -pub type TokenFilter = Box) -> Vec + Send + Sync>; +pub type TokenFilter = Box bool + Send + Sync>; /// Invalidates cached values by [CacheToken]s. pub type Invalidator = - Box, Vec) -> BoxFuture<'_, Result<()>> + Send + Sync>; + Box Fn(&'a Cache, &'a CacheToken) -> BoxFuture<'a, Result<()>> + Send + Sync>; /// Initializes value (i.e., fetches from remote). pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; @@ -75,9 +75,11 @@ where V: Send + Sync, { async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { - let caches = (self.token_filter)(caches); - if !caches.is_empty() { - (self.invalidator)(&self.cache, caches).await?; + for token in caches + .into_iter() + .filter(|token| (self.token_filter)(token)) + { + (self.invalidator)(&self.cache, &token).await?; } Ok(()) } @@ -115,10 +117,9 @@ where V: Clone + Send + Sync + 'static, { /// Invalidates cache by [CacheToken]. - pub async fn invalidate(&self, caches: Vec) -> Result<()> { - let caches = (self.token_filter)(caches); - if !caches.is_empty() { - (self.invalidator)(&self.cache, caches).await?; + pub async fn invalidate(&self, caches: &[CacheToken]) -> Result<()> { + for token in caches.iter().filter(|token| (self.token_filter)(token)) { + (self.invalidator)(&self.cache, token).await?; } Ok(()) } @@ -165,7 +166,7 @@ mod tests { #[tokio::test] async fn test_get() { let cache: Cache = CacheBuilder::new(128).build(); - let filter = Box::new(|caches| caches); + let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -184,7 +185,7 @@ mod tests { #[tokio::test] async fn test_get_by_ref() { let cache: Cache = CacheBuilder::new(128).build(); - let filter = Box::new(|caches| caches); + let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { @@ -208,18 +209,16 @@ mod tests { #[tokio::test] async fn test_invalidate() { let cache: Cache = CacheBuilder::new(128).build(); - let filter = Box::new(|caches| caches); + let filter: TokenFilter = Box::new(|_| true); let counter = Arc::new(AtomicI32::new(0)); let moved_counter = counter.clone(); let init: Initializer = Arc::new(move |_| { moved_counter.fetch_add(1, Ordering::Relaxed); Box::pin(async { Ok(Some("hi".to_string())) }) }); - let invalidator: Invalidator = Box::new(|cache, keys| { + let invalidator: Invalidator = Box::new(|cache, key| { Box::pin(async move { - for key in keys { - cache.invalidate(&key).await; - } + cache.invalidate(key).await; Ok(()) }) }); @@ -231,7 +230,7 @@ mod tests { assert_eq!(value, "hi"); assert_eq!(counter.load(Ordering::Relaxed), 1); adv_cache - .invalidate(vec!["foo".to_string(), "bar".to_string()]) + .invalidate(&["foo".to_string(), "bar".to_string()]) .await .unwrap(); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 5e44923dc211..82c94860e588 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -63,10 +63,10 @@ async fn invalidate_create_flow( CreateFlow { source_table_ids, flownode_ids, - }: CreateFlow, + }: &CreateFlow, ) { for table_id in source_table_ids { - let entry = cache.entry(table_id); + let entry = cache.entry(*table_id); entry .and_compute_with( async |entry: Option>>| match entry { @@ -88,16 +88,16 @@ async fn invalidate_drop_flow( DropFlow { source_table_ids, flownode_ids, - }: DropFlow, + }: &DropFlow, ) { for table_id in source_table_ids { - let entry = cache.entry(table_id); + let entry = cache.entry(*table_id); entry .and_compute_with( async |entry: Option>>| match entry { Some(entry) => { let mut set = entry.into_value(); - for flownode_id in &flownode_ids { + for flownode_id in flownode_ids { set.remove(flownode_id); } @@ -113,30 +113,22 @@ async fn invalidate_drop_flow( } } -fn invalidator( - cache: &'_ Cache, - caches: Vec, -) -> BoxFuture<'_, Result<()>> { +fn invalidator<'a>( + cache: &'a Cache, + ident: &'a CacheIdent, +) -> BoxFuture<'a, Result<()>> { Box::pin(async move { - for ident in caches { - match ident { - CacheIdent::CreateFlow(create_flow) => { - invalidate_create_flow(cache, create_flow).await - } - CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(cache, drop_flow).await, - _ => {} - } + match ident { + CacheIdent::CreateFlow(create_flow) => invalidate_create_flow(cache, create_flow).await, + CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(cache, drop_flow).await, + _ => {} } - Ok(()) }) } -fn filter(caches: Vec) -> Vec { - caches - .into_iter() - .filter(|cache| matches!(cache, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))) - .collect() +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_)) } #[cfg(test)] @@ -168,7 +160,7 @@ mod tests { source_table_ids: vec![1024, 1025], flownode_ids: vec![1, 2, 3, 4, 5], })]; - cache.invalidate(ident).await.unwrap(); + cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set.len(), 5); let set = cache.get(1025).await.unwrap().unwrap(); @@ -190,7 +182,7 @@ mod tests { flownode_ids: vec![11, 12], }), ]; - cache.invalidate(ident).await.unwrap(); + cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set.len(), 7); let set = cache.get(1025).await.unwrap().unwrap(); @@ -200,7 +192,7 @@ mod tests { source_table_ids: vec![1024, 1025], flownode_ids: vec![1, 2, 3, 4, 5], })]; - cache.invalidate(ident).await.unwrap(); + cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set, HashSet::from([11, 12])); let set = cache.get(1025).await.unwrap().unwrap(); From f05f2b71323cdbbabeab00808ca2207e94051d43 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 02:59:00 +0000 Subject: [PATCH 08/13] chore: move `CacheContainer` to container.rs --- src/common/meta/src/cache.rs | 229 +----------------------- src/common/meta/src/cache/container.rs | 238 +++++++++++++++++++++++++ 2 files changed, 242 insertions(+), 225 deletions(-) create mode 100644 src/common/meta/src/cache/container.rs diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index c934bf9ff844..f7de3027fdab 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -12,229 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod flow; +mod container; +mod flow; -use std::borrow::Borrow; -use std::hash::Hash; -use std::sync::Arc; - -use futures::future::BoxFuture; -use moka::future::Cache; -use snafu::{OptionExt, ResultExt}; - -use crate::cache_invalidator::{CacheInvalidator, Context}; -use crate::error::{self, Error, Result}; -use crate::instruction::CacheIdent; - -/// Filters out unused [CacheToken]s -pub type TokenFilter = Box bool + Send + Sync>; - -/// Invalidates cached values by [CacheToken]s. -pub type Invalidator = - Box Fn(&'a Cache, &'a CacheToken) -> BoxFuture<'a, Result<()>> + Send + Sync>; - -/// Initializes value (i.e., fetches from remote). -pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; - -/// [CacheContainer] provides ability to: -/// - Cache value loaded by [Initializer]. -/// - Invalidate caches by [Invalidator]. -pub struct CacheContainer { - cache: Cache, - invalidator: Invalidator, - initializer: Initializer, - token_filter: TokenFilter, -} - -impl CacheContainer -where - K: Send + Sync, - V: Send + Sync, - CacheToken: Send + Sync, -{ - /// Constructs an [CacheContainer]. - pub fn new( - cache: Cache, - invalidator: Invalidator, - initializer: Initializer, - token_filter: TokenFilter, - ) -> Self { - Self { - cache, - invalidator, - initializer, - token_filter, - } - } -} - -#[async_trait::async_trait] -impl CacheInvalidator for CacheContainer -where - K: Send + Sync, - V: Send + Sync, -{ - async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { - for token in caches - .into_iter() - .filter(|token| (self.token_filter)(token)) - { - (self.invalidator)(&self.cache, &token).await?; - } - Ok(()) - } -} - -impl CacheContainer -where - K: Copy + Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, -{ - /// Returns a _clone_ of the value corresponding to the key. - pub async fn get(&self, key: K) -> Result> { - let moved_init = self.initializer.clone(); - let moved_key = key; - let init = async move { - moved_init(&moved_key) - .await - .transpose() - .context(error::ValueNotExistSnafu)? - }; - - match self.cache.try_get_with(key, init).await { - Ok(value) => Ok(Some(value)), - Err(err) => match err.as_ref() { - Error::ValueNotExist { .. } => Ok(None), - _ => Err(err).context(error::GetCacheSnafu), - }, - } - } -} - -impl CacheContainer -where - K: Hash + Eq + Send + Sync + 'static, - V: Clone + Send + Sync + 'static, -{ - /// Invalidates cache by [CacheToken]. - pub async fn invalidate(&self, caches: &[CacheToken]) -> Result<()> { - for token in caches.iter().filter(|token| (self.token_filter)(token)) { - (self.invalidator)(&self.cache, token).await?; - } - Ok(()) - } - - /// Returns a _clone_ of the value corresponding to the key. - pub async fn get_by_ref(&self, key: &Q) -> Result> - where - K: Borrow, - Q: ToOwned + Hash + Eq + ?Sized, - { - let moved_init = self.initializer.clone(); - let moved_key = key.to_owned(); - let init = async move { - moved_init(&moved_key) - .await - .transpose() - .context(error::ValueNotExistSnafu)? - }; - - match self.cache.try_get_with_by_ref(key, init).await { - Ok(value) => Ok(Some(value)), - Err(err) => match err.as_ref() { - Error::ValueNotExist { .. } => Ok(None), - _ => Err(err).context(error::GetCacheSnafu), - }, - } - } -} - -#[cfg(test)] -mod tests { - use std::sync::atomic::{AtomicI32, Ordering}; - use std::sync::Arc; - - use moka::future::{Cache, CacheBuilder}; - - use super::*; - - #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] - struct NameKey<'a> { - name: &'a str, - } - - #[tokio::test] - async fn test_get() { - let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); - let counter = Arc::new(AtomicI32::new(0)); - let moved_counter = counter.clone(); - let init: Initializer = Arc::new(move |_| { - moved_counter.fetch_add(1, Ordering::Relaxed); - Box::pin(async { Ok(Some("hi".to_string())) }) - }); - let invalidator: Invalidator = - Box::new(|_, _| Box::pin(async { Ok(()) })); - - let adv_cache = CacheContainer::new(cache, invalidator, init, filter); - let key = NameKey { name: "key" }; - let value = adv_cache.get(key).await.unwrap().unwrap(); - assert_eq!(value, "hi"); - } - - #[tokio::test] - async fn test_get_by_ref() { - let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); - let counter = Arc::new(AtomicI32::new(0)); - let moved_counter = counter.clone(); - let init: Initializer = Arc::new(move |_| { - moved_counter.fetch_add(1, Ordering::Relaxed); - Box::pin(async { Ok(Some("hi".to_string())) }) - }); - let invalidator: Invalidator = - Box::new(|_, _| Box::pin(async { Ok(()) })); - - let adv_cache = CacheContainer::new(cache, invalidator, init, filter); - let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); - assert_eq!(value, "hi"); - let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); - assert_eq!(value, "hi"); - assert_eq!(counter.load(Ordering::Relaxed), 1); - let value = adv_cache.get_by_ref("bar").await.unwrap().unwrap(); - assert_eq!(value, "hi"); - assert_eq!(counter.load(Ordering::Relaxed), 2); - } - - #[tokio::test] - async fn test_invalidate() { - let cache: Cache = CacheBuilder::new(128).build(); - let filter: TokenFilter = Box::new(|_| true); - let counter = Arc::new(AtomicI32::new(0)); - let moved_counter = counter.clone(); - let init: Initializer = Arc::new(move |_| { - moved_counter.fetch_add(1, Ordering::Relaxed); - Box::pin(async { Ok(Some("hi".to_string())) }) - }); - let invalidator: Invalidator = Box::new(|cache, key| { - Box::pin(async move { - cache.invalidate(key).await; - Ok(()) - }) - }); - - let adv_cache = CacheContainer::new(cache, invalidator, init, filter); - let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); - assert_eq!(value, "hi"); - let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); - assert_eq!(value, "hi"); - assert_eq!(counter.load(Ordering::Relaxed), 1); - adv_cache - .invalidate(&["foo".to_string(), "bar".to_string()]) - .await - .unwrap(); - let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); - assert_eq!(value, "hi"); - assert_eq!(counter.load(Ordering::Relaxed), 2); - } -} +pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; +pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache}; diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs new file mode 100644 index 000000000000..fb5cf087b586 --- /dev/null +++ b/src/common/meta/src/cache/container.rs @@ -0,0 +1,238 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Borrow; +use std::hash::Hash; +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::{OptionExt, ResultExt}; + +use crate::cache_invalidator::{CacheInvalidator, Context}; +use crate::error::{self, Error, Result}; +use crate::instruction::CacheIdent; + +/// Filters out unused [CacheToken]s +pub type TokenFilter = Box bool + Send + Sync>; + +/// Invalidates cached values by [CacheToken]s. +pub type Invalidator = + Box Fn(&'a Cache, &'a CacheToken) -> BoxFuture<'a, Result<()>> + Send + Sync>; + +/// Initializes value (i.e., fetches from remote). +pub type Initializer = Arc BoxFuture<'_, Result>> + Send + Sync>; + +/// [CacheContainer] provides ability to: +/// - Cache value loaded by [Initializer]. +/// - Invalidate caches by [Invalidator]. +pub struct CacheContainer { + cache: Cache, + invalidator: Invalidator, + initializer: Initializer, + token_filter: TokenFilter, +} + +impl CacheContainer +where + K: Send + Sync, + V: Send + Sync, + CacheToken: Send + Sync, +{ + /// Constructs an [CacheContainer]. + pub fn new( + cache: Cache, + invalidator: Invalidator, + initializer: Initializer, + token_filter: TokenFilter, + ) -> Self { + Self { + cache, + invalidator, + initializer, + token_filter, + } + } +} + +#[async_trait::async_trait] +impl CacheInvalidator for CacheContainer +where + K: Send + Sync, + V: Send + Sync, +{ + async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { + for token in caches + .into_iter() + .filter(|token| (self.token_filter)(token)) + { + (self.invalidator)(&self.cache, &token).await?; + } + Ok(()) + } +} + +impl CacheContainer +where + K: Copy + Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Returns a _clone_ of the value corresponding to the key. + pub async fn get(&self, key: K) -> Result> { + let moved_init = self.initializer.clone(); + let moved_key = key; + let init = async move { + moved_init(&moved_key) + .await + .transpose() + .context(error::ValueNotExistSnafu)? + }; + + match self.cache.try_get_with(key, init).await { + Ok(value) => Ok(Some(value)), + Err(err) => match err.as_ref() { + Error::ValueNotExist { .. } => Ok(None), + _ => Err(err).context(error::GetCacheSnafu), + }, + } + } +} + +impl CacheContainer +where + K: Hash + Eq + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + /// Invalidates cache by [CacheToken]. + pub async fn invalidate(&self, caches: &[CacheToken]) -> Result<()> { + for token in caches.iter().filter(|token| (self.token_filter)(token)) { + (self.invalidator)(&self.cache, token).await?; + } + Ok(()) + } + + /// Returns a _clone_ of the value corresponding to the key. + pub async fn get_by_ref(&self, key: &Q) -> Result> + where + K: Borrow, + Q: ToOwned + Hash + Eq + ?Sized, + { + let moved_init = self.initializer.clone(); + let moved_key = key.to_owned(); + let init = async move { + moved_init(&moved_key) + .await + .transpose() + .context(error::ValueNotExistSnafu)? + }; + + match self.cache.try_get_with_by_ref(key, init).await { + Ok(value) => Ok(Some(value)), + Err(err) => match err.as_ref() { + Error::ValueNotExist { .. } => Ok(None), + _ => Err(err).context(error::GetCacheSnafu), + }, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicI32, Ordering}; + use std::sync::Arc; + + use moka::future::{Cache, CacheBuilder}; + + use super::*; + + #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] + struct NameKey<'a> { + name: &'a str, + } + + #[tokio::test] + async fn test_get() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter: TokenFilter = Box::new(|_| true); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let key = NameKey { name: "key" }; + let value = adv_cache.get(key).await.unwrap().unwrap(); + assert_eq!(value, "hi"); + } + + #[tokio::test] + async fn test_get_by_ref() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter: TokenFilter = Box::new(|_| true); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 1); + let value = adv_cache.get_by_ref("bar").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 2); + } + + #[tokio::test] + async fn test_invalidate() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter: TokenFilter = Box::new(|_| true); + let counter = Arc::new(AtomicI32::new(0)); + let moved_counter = counter.clone(); + let init: Initializer = Arc::new(move |_| { + moved_counter.fetch_add(1, Ordering::Relaxed); + Box::pin(async { Ok(Some("hi".to_string())) }) + }); + let invalidator: Invalidator = Box::new(|cache, key| { + Box::pin(async move { + cache.invalidate(key).await; + Ok(()) + }) + }); + + let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 1); + adv_cache + .invalidate(&["foo".to_string(), "bar".to_string()]) + .await + .unwrap(); + let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 2); + } +} From 933159f66fde3e8321655fc8b2b6d8af9e967f76 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 03:06:52 +0000 Subject: [PATCH 09/13] feat: add metrics --- src/common/meta/src/cache/container.rs | 18 +++++++++++++++--- .../meta/src/cache/flow/table_flownode.rs | 9 +++++---- src/common/meta/src/metrics.rs | 14 ++++++++++++++ 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index fb5cf087b586..528ff55898dc 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -23,6 +23,7 @@ use snafu::{OptionExt, ResultExt}; use crate::cache_invalidator::{CacheInvalidator, Context}; use crate::error::{self, Error, Result}; use crate::instruction::CacheIdent; +use crate::metrics; /// Filters out unused [CacheToken]s pub type TokenFilter = Box bool + Send + Sync>; @@ -38,6 +39,7 @@ pub type Initializer = Arc BoxFuture<'_, Result /// - Cache value loaded by [Initializer]. /// - Invalidate caches by [Invalidator]. pub struct CacheContainer { + name: String, cache: Cache, invalidator: Invalidator, initializer: Initializer, @@ -52,12 +54,14 @@ where { /// Constructs an [CacheContainer]. pub fn new( + name: String, cache: Cache, invalidator: Invalidator, initializer: Initializer, token_filter: TokenFilter, ) -> Self { Self { + name, cache, invalidator, initializer, @@ -128,9 +132,17 @@ where K: Borrow, Q: ToOwned + Hash + Eq + ?Sized, { + metrics::CACHE_CONTAINER_CACHE_GET + .with_label_values(&[&self.name]) + .inc(); let moved_init = self.initializer.clone(); let moved_key = key.to_owned(); + let init = async move { + metrics::CACHE_CONTAINER_CACHE_MISS + .with_label_values(&[&self.name]) + .inc(); + moved_init(&moved_key) .await .transpose() @@ -174,7 +186,7 @@ mod tests { let invalidator: Invalidator = Box::new(|_, _| Box::pin(async { Ok(()) })); - let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); let key = NameKey { name: "key" }; let value = adv_cache.get(key).await.unwrap().unwrap(); assert_eq!(value, "hi"); @@ -193,7 +205,7 @@ mod tests { let invalidator: Invalidator = Box::new(|_, _| Box::pin(async { Ok(()) })); - let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); @@ -221,7 +233,7 @@ mod tests { }) }); - let adv_cache = CacheContainer::new(cache, invalidator, init, filter); + let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); assert_eq!(value, "hi"); let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap(); diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 82c94860e588..2a7ab7515d2e 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -35,13 +35,14 @@ pub type TableFlownodeSetCache = CacheContainer, kv_backend: KvBackendRef, ) -> TableFlownodeSetCache { let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend)); let init = init_factory(table_flow_manager); - CacheContainer::new(cache, Box::new(invalidator), init, Box::new(filter)) + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) } fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer { @@ -146,7 +147,7 @@ mod tests { async fn test_cache_empty_set() { let mem_kv = Arc::new(MemoryKvBackend::default()); let cache = CacheBuilder::new(128).build(); - let cache = new_table_flownode_set_cache(cache, mem_kv); + let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let set = cache.get(1024).await.unwrap().unwrap(); assert!(set.is_empty()); } @@ -155,7 +156,7 @@ mod tests { async fn test_create_flow() { let mem_kv = Arc::new(MemoryKvBackend::default()); let cache = CacheBuilder::new(128).build(); - let cache = new_table_flownode_set_cache(cache, mem_kv); + let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let ident = vec![CacheIdent::CreateFlow(CreateFlow { source_table_ids: vec![1024, 1025], flownode_ids: vec![1, 2, 3, 4, 5], @@ -171,7 +172,7 @@ mod tests { async fn test_drop_flow() { let mem_kv = Arc::new(MemoryKvBackend::default()); let cache = CacheBuilder::new(128).build(); - let cache = new_table_flownode_set_cache(cache, mem_kv); + let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let ident = vec![ CacheIdent::CreateFlow(CreateFlow { source_table_ids: vec![1024, 1025], diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 34bb95dc0cb7..8b123dc76493 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -69,4 +69,18 @@ lazy_static! { &["step"] ) .unwrap(); + /// Cache container cache get counter. + pub static ref CACHE_CONTAINER_CACHE_GET: IntCounterVec = register_int_counter_vec!( + "greptime_meta_cache_container_cache_get", + "cache container cache get", + &["name"] + ) + .unwrap(); + /// Cache container cache miss counter. + pub static ref CACHE_CONTAINER_CACHE_MISS: IntCounterVec = register_int_counter_vec!( + "greptime_meta_cache_container_cache_miss", + "cache container cache miss", + &["name"] + ) + .unwrap(); } From 35adebcec74568574897588abd8ac6d65025c78f Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 03:14:16 +0000 Subject: [PATCH 10/13] chore: update tests --- src/common/meta/src/cache/container.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index 528ff55898dc..11c46ed10af0 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -190,6 +190,11 @@ mod tests { let key = NameKey { name: "key" }; let value = adv_cache.get(key).await.unwrap().unwrap(); assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 1); + let key = NameKey { name: "key" }; + let value = adv_cache.get(key).await.unwrap().unwrap(); + assert_eq!(value, "hi"); + assert_eq!(counter.load(Ordering::Relaxed), 1); } #[tokio::test] From ed61e5a476c7cbb19d1477b169d61aed7ed0a8d7 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 07:49:56 +0000 Subject: [PATCH 11/13] test: add tests for value not exists --- src/common/meta/src/cache/container.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index 11c46ed10af0..2c0c94460a80 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -221,6 +221,20 @@ mod tests { assert_eq!(counter.load(Ordering::Relaxed), 2); } + #[tokio::test] + async fn test_get_value_not_exits() { + let cache: Cache = CacheBuilder::new(128).build(); + let filter: TokenFilter = Box::new(|_| true); + let init: Initializer = + Arc::new(move |_| Box::pin(async { error::ValueNotExistSnafu {}.fail() })); + let invalidator: Invalidator = + Box::new(|_, _| Box::pin(async { Ok(()) })); + + let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter); + let value = adv_cache.get_by_ref("foo").await.unwrap(); + assert!(value.is_none()); + } + #[tokio::test] async fn test_invalidate() { let cache: Cache = CacheBuilder::new(128).build(); From cd243e43cd8887bdad8b8060e4d2fdedf8370e66 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 09:08:09 +0000 Subject: [PATCH 12/13] test: add test for get --- .../meta/src/cache/flow/table_flownode.rs | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index 2a7ab7515d2e..d7a72d751df3 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -134,14 +134,18 @@ fn filter(ident: &CacheIdent) -> bool { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{BTreeMap, HashSet}; use std::sync::Arc; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use moka::future::CacheBuilder; use crate::cache::flow::table_flownode::new_table_flownode_set_cache; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; + use crate::key::flow::flow_info::FlowInfoValue; + use crate::key::flow::FlowMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; + use crate::table_name::TableName; #[tokio::test] async fn test_cache_empty_set() { @@ -152,6 +156,41 @@ mod tests { assert!(set.is_empty()); } + #[tokio::test] + async fn test_get() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); + flownode_metadata_manager + .create_flow_metadata( + 1024, + FlowInfoValue { + source_table_ids: vec![1024, 1025], + sink_table_name: TableName { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "sink_table".to_string(), + }, + flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]), + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + flow_name: "my_flow".to_string(), + raw_sql: "sql".to_string(), + expire_when: "expire".to_string(), + comment: "comment".to_string(), + options: Default::default(), + }, + ) + .await + .unwrap(); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); + let set = cache.get(1024).await.unwrap().unwrap(); + assert_eq!(set, HashSet::from([1, 2, 3])); + let set = cache.get(1025).await.unwrap().unwrap(); + assert_eq!(set, HashSet::from([1, 2, 3])); + let result = cache.get(1026).await.unwrap().unwrap(); + assert_eq!(result.len(), 0); + } + #[tokio::test] async fn test_create_flow() { let mem_kv = Arc::new(MemoryKvBackend::default()); From 8ad0b448136ad9845bbc03159d470c345dc267a1 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 9 May 2024 09:09:39 +0000 Subject: [PATCH 13/13] chore: apply suggestions from CR --- src/common/meta/src/cache/flow/table_flownode.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index d7a72d751df3..a339a4daa2de 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -59,7 +59,7 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer, CreateFlow { source_table_ids, @@ -73,18 +73,18 @@ async fn invalidate_create_flow( async |entry: Option>>| match entry { Some(entry) => { let mut set = entry.into_value(); - set.extend(flownode_ids.clone()); + set.extend(flownode_ids.iter().cloned()); Op::Put(set) } - None => Op::Put(HashSet::from_iter(flownode_ids.clone())), + None => Op::Put(HashSet::from_iter(flownode_ids.iter().cloned())), }, ) .await; } } -async fn invalidate_drop_flow( +async fn handle_drop_flow( cache: &Cache, DropFlow { source_table_ids, @@ -120,8 +120,8 @@ fn invalidator<'a>( ) -> BoxFuture<'a, Result<()>> { Box::pin(async move { match ident { - CacheIdent::CreateFlow(create_flow) => invalidate_create_flow(cache, create_flow).await, - CacheIdent::DropFlow(drop_flow) => invalidate_drop_flow(cache, drop_flow).await, + CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await, + CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await, _ => {} } Ok(())