-
Notifications
You must be signed in to change notification settings - Fork 314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: introduce FlowRouteValue
#4263
Conversation
WalkthroughThis update removes the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant FlowRouteManager
participant KvBackendRef
participant Peer
User->>FlowRouteManager: Retrieve route
FlowRouteManager->>KvBackendRef: Fetch route from backend
KvBackendRef-->>FlowRouteManager: Return route data
FlowRouteManager-->>User: Return flow route
User->>FlowRouteManager: Create transaction
FlowRouteManager->>KvBackendRef: Update route in backend
KvBackendRef-->>FlowRouteManager: Confirm update
FlowRouteManager-->>User: Transaction successful
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Outside diff range and nitpick comments (14)
src/common/meta/src/key/flow/flow_route.rs (10)
40-43
: Correct the documentation comment.The layout mentioned in the comment is incorrect. It should be
__flow/route/{flow_id}/{partition_id}
instead of__flow/info/{flow_id}/{partition_id}
.-/// The layout: `__flow/info/{flow_id}/{partition_id}`. +/// The layout: `__flow/route/{flow_id}/{partition_id}`.
53-58
: Improve method documentation.Add documentation comments for the
range_start_key
method to clarify its purpose and usage./// The prefix used to retrieve all [FlowRouteKey]s with the specified `flow_id`. pub fn range_start_key(flow_id: FlowId) -> Vec<u8> {
83-88
: Enhance struct documentation.Add documentation comments for the
FlowRouteKeyInner
struct to explain its purpose and fields./// The key of flow route metadata. #[derive(Debug, Clone, Copy, PartialEq)] struct FlowRouteKeyInner { /// The ID of the flow. flow_id: FlowId, /// The ID of the partition. partition_id: FlowPartitionId, }
99-101
: Fix the prefix format.The prefix format in the
prefix
method should includeFLOW_ROUTE_KEY_PREFIX
as a constant.fn prefix(flow_id: FlowId) -> String { format!("{FLOW_ROUTE_KEY_PREFIX}/{flow_id}/") }
140-144
: Enhance struct documentation.Add documentation comments for the
FlowRouteValue
struct to explain its purpose and fields./// The route info of flow. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FlowRouteValue { /// The peer associated with the flow. pub(crate) peer: Peer, }
153-157
: Improve error handling.Add context to the error handling in the
flow_route_decoder
function to provide more detailed error messages.pub fn flow_route_decoder(kv: KeyValue) -> Result<(FlowRouteKey, FlowRouteValue)> { let key = FlowRouteKey::from_bytes(&kv.key).context(error::InvalidFlowRouteKeySnafu)?; let value = FlowRouteValue::try_from_raw_value(&kv.value).context(error::InvalidFlowRouteValueSnafu)?; Ok((key, value)) }
171-186
: Improve method documentation.Add documentation comments for the
routes
method to explain its purpose, parameters, and return value./// Retrieves all [FlowRouteValue]s of the specified `flow_id`. /// /// # Arguments /// /// * `flow_id` - The ID of the flow. /// /// # Returns /// /// A stream of results containing tuples of [FlowRouteKey] and [FlowRouteValue]. pub fn routes( &self, flow_id: FlowId, ) -> BoxStream<'static, Result<(FlowRouteKey, FlowRouteValue)>> {
188-206
: Improve method documentation.Add documentation comments for the
build_create_txn
method to explain its purpose, parameters, and return value./// Builds a create flow routes transaction. /// /// Puts `__flow/route/{flownode_id}/{partitions}` keys. /// /// # Arguments /// /// * `flow_id` - The ID of the flow. /// * `flow_routes` - An iterator of tuples containing `FlowPartitionId` and `FlowRouteValue`. /// /// # Returns /// /// A result containing the transaction or an error. pub(crate) fn build_create_txn<I: IntoIterator<Item = (FlowPartitionId, FlowRouteValue)>>(
214-218
: Enhance test coverage.Add more test cases to cover different scenarios, such as invalid inputs and edge cases.
#[test] fn test_key_serialization() { let flow_route_key = FlowRouteKey::new(1, 2); assert_eq!(b"__flow/route/1/2".to_vec(), flow_route_key.to_bytes()); // Add more test cases let flow_route_key = FlowRouteKey::new(0, 0); assert_eq!(b"__flow/route/0/0".to_vec(), flow_route_key.to_bytes()); }
220-226
: Enhance test coverage.Add more test cases to cover different scenarios, such as invalid inputs and edge cases.
#[test] fn test_key_deserialization() { let bytes = b"__flow/route/1/2".to_vec(); let key = FlowRouteKey::from_bytes(&bytes).unwrap(); assert_eq!(key.flow_id(), 1); assert_eq!(key.partition_id(), 2); // Add more test cases let bytes = b"__flow/route/0/0".to_vec(); let key = FlowRouteKey::from_bytes(&bytes).unwrap(); assert_eq!(key.flow_id(), 0); assert_eq!(key.partition_id(), 0); }src/common/meta/src/cache/flow/table_flownode.rs (4)
Line range hint
79-81
: Improve error handling.Add error handling for the
and_compute_with
method to handle potential errors during the cache update.entry .and_compute_with( async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry { Some(entry) => { let mut set = entry.into_value(); set.extend(flownode_ids.iter().cloned()); Op::Put(set) } None => Op::Put(HashSet::from_iter(flownode_ids.iter().cloned())), }, ) .await .unwrap_or_else(|e| { common_telemetry::error!("Failed to update cache: {}", e); });
Line range hint
98-100
: Improve error handling.Add error handling for the
and_compute_with
method to handle potential errors during the cache update.entry .and_compute_with( async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry { Some(entry) => { let mut set = entry.into_value(); for flownode_id in flownode_ids { set.remove(flownode_id); } Op::Put(set) } None => { // Do nothing Op::Nop } }, ) .await .unwrap_or_else(|e| { common_telemetry::error!("Failed to update cache: {}", e); });
Line range hint
119-187
: Enhance test coverage.Add more test cases to cover different scenarios, such as invalid inputs and edge cases.
#[tokio::test] async fn test_get() { let mem_kv = Arc::new(MemoryKvBackend::default()); let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone()); flownode_metadata_manager .create_flow_metadata( 1024, FlowInfoValue { source_table_ids: vec![1024, 1025], sink_table_name: TableName { catalog_name: DEFAULT_CATALOG_NAME.to_string(), schema_name: DEFAULT_SCHEMA_NAME.to_string(), table_name: "sink_table".to_string(), }, flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]), catalog_name: DEFAULT_CATALOG_NAME.to_string(), flow_name: "my_flow".to_string(), raw_sql: "sql".to_string(), expire_after: Some(300), comment: "comment".to_string(), options: Default::default(), }, vec![], ) .await .unwrap(); let cache = CacheBuilder::new(128).build(); let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set, HashSet::from([1, 2, 3])); let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!(set, HashSet::from([1, 2, 3])); let result = cache.get(1026).await.unwrap().unwrap(); assert_eq!(result.len(), 0); // Add more test cases let result = cache.get(9999).await.unwrap(); assert!(result.is_none()); }
Line range hint
189-200
: Enhance test coverage.Add more test cases to cover different scenarios, such as invalid inputs and edge cases.
#[tokio::test] async fn test_create_flow() { let mem_kv = Arc::new(MemoryKvBackend::default()); let cache = CacheBuilder::new(128).build(); let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv); let ident = vec![CacheIdent::CreateFlow(CreateFlow { source_table_ids: vec![1024, 1025], flownode_ids: vec![1, 2, 3, 4, 5], })]; cache.invalidate(&ident).await.unwrap(); let set = cache.get(1024).await.unwrap().unwrap(); assert_eq!(set.len(), 5); let set = cache.get(1025).await.unwrap().unwrap(); assert_eq!(set.len(), 5); // Add more test cases let result = cache.get(9999).await.unwrap(); assert!(result.is_none()); }
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- src/common/meta/src/cache/flow/table_flownode.rs (1 hunks)
- src/common/meta/src/ddl/create_flow.rs (4 hunks)
- src/common/meta/src/key.rs (4 hunks)
- src/common/meta/src/key/flow.rs (14 hunks)
- src/common/meta/src/key/flow/flow_route.rs (1 hunks)
Files not reviewed due to errors (1)
- src/common/meta/src/key/flow.rs (no review received)
Additional comments not posted (2)
src/common/meta/src/key.rs (2)
114-114
: Update import statement.The
FlowRouteValue
import is added, which is necessary for the new functionality.
1195-1196
: Approve new table meta values.The
FlowNameValue
andFlowRouteValue
are correctly added to theimpl_table_meta_value
macro.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (3)
- src/common/meta/src/ddl/drop_flow.rs (5 hunks)
- src/common/meta/src/ddl/drop_flow/metadata.rs (2 hunks)
- src/common/meta/src/error.rs (2 hunks)
Additional comments not posted (8)
src/common/meta/src/ddl/drop_flow/metadata.rs (1)
Line range hint
16-52
: LGTM! Ensure tests cover new flow route logic.The function
fill_flow_metadata
is correctly implemented with proper error handling and validation. Ensure that the new logic for flow routes is covered in tests.src/common/meta/src/ddl/drop_flow.rs (5)
38-38
: LGTM! Ensure theflow_route_values
field is utilized correctly.The addition of the
flow_route_values
field in theDropFlowProcedure
struct is consistent with the new flow route functionality. Ensure that this field is utilized correctly throughout the procedure.Also applies to: 62-62, 224-224
34-34
: LGTM! Verify that flow existence check handles edge cases.The function
on_prepare
is correctly implemented with proper error handling and state updates. Ensure that the flow existence check handles all edge cases.
Line range hint
107-126
: LGTM! Ensure proper handling of async tasks and errors.The function
on_flownode_drop_flows
is correctly implemented with proper error handling and async task management. Ensure that the handling of async tasks and errors is robust.
Line range hint
128-138
: LGTM! Ensure metadata deletion is thoroughly tested.The function
on_delete_metadata
is correctly implemented with proper error handling and state updates. Ensure that metadata deletion is thoroughly tested.
Line range hint
140-164
: LGTM! Ensure cache invalidation logic is thoroughly tested.The function
on_broadcast
is correctly implemented with proper error handling and state updates. Ensure that the cache invalidation logic is thoroughly tested.src/common/meta/src/error.rs (2)
374-380
: LGTM! EnsureFlowRouteNotFound
is used consistently.The addition of the
FlowRouteNotFound
error variant is consistent with the new flow route functionality. Ensure that this error variant is used consistently throughout the codebase.
718-718
: LGTM! Ensure status codes are correctly mapped.The update to the
status_code
method to include the newFlowRouteNotFound
error variant is consistent with the existing error handling logic. Ensure that status codes are correctly mapped for all error variants.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4263 +/- ##
==========================================
- Coverage 84.97% 84.73% -0.24%
==========================================
Files 1054 1055 +1
Lines 187160 187318 +158
==========================================
- Hits 159031 158720 -311
- Misses 28129 28598 +469 |
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (9)
- src/cmd/src/standalone.rs (2 hunks)
- src/common/meta/src/ddl.rs (2 hunks)
- src/common/meta/src/ddl_manager.rs (2 hunks)
- src/common/meta/src/key.rs (4 hunks)
- src/common/meta/src/peer.rs (1 hunks)
- src/common/meta/src/test_util.rs (2 hunks)
- src/meta-srv/src/metasrv/builder.rs (1 hunks)
- src/meta-srv/src/procedure/utils.rs (2 hunks)
- tests-integration/src/standalone.rs (2 hunks)
Files skipped from review due to trivial changes (6)
- src/cmd/src/standalone.rs
- src/common/meta/src/ddl.rs
- src/common/meta/src/ddl_manager.rs
- src/common/meta/src/peer.rs
- src/meta-srv/src/metasrv/builder.rs
- tests-integration/src/standalone.rs
Files skipped from review as they are similar to previous changes (1)
- src/common/meta/src/key.rs
Additional comments not posted (2)
src/common/meta/src/test_util.rs (1)
36-36
: Ensure the necessity of the imported modules.The added import statement for
Peer
andPeerLookupService
should be verified for necessity within this file.Verification successful
The imported modules are necessary.
The
Peer
andPeerLookupService
are used multiple times within the filesrc/common/meta/src/test_util.rs
, confirming the necessity of the import statement.
- Lines where
Peer
is used: 38, 39, 40, 41, 42, 43, 44, 45, 46, 47- Lines where
PeerLookupService
is used: 48, 49, 50, 51, 52Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify if `Peer` and `PeerLookupService` are used in the file. # Test: Search for usage of `Peer` and `PeerLookupService`. Expect: At least one match for each. rg --type rust $'Peer|PeerLookupService' src/common/meta/src/test_util.rsLength of output: 939
src/meta-srv/src/procedure/utils.rs (1)
122-122
: Ensure the necessity of the imported module.The added import statement for
Peer
should be verified for necessity within this file.Verification successful
The import statement for
Peer
is necessary and correctly used in the file.
Peer
is used in thenew_client
function.Peer
is instantiated multiple times withPeer::new
.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify if `Peer` is used in the file. # Test: Search for usage of `Peer`. Expect: At least one match. rg --type rust $'Peer' src/meta-srv/src/procedure/utils.rsLength of output: 317
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- src/common/meta/src/key/flow/flow_route.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
- src/common/meta/src/key/flow/flow_route.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
Introduce
FlowRouteValue
to store flow routesChecklist
Summary by CodeRabbit
New Features
Refactor
StandalonePeerLookupService
and associated code from multiple modules, simplifying the codebase.