Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce the interface of RemoteJobScheduler #4181

Open
wants to merge 67 commits into
base: main
Choose a base branch
from

Conversation

zyy17
Copy link
Collaborator

@zyy17 zyy17 commented Jun 20, 2024

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

RemoteJobScheduler

For the storage disaggregated system, we can always offload the CPU-intensive and IO-intensive tasks(for example, compaction and index) to the remote workers. For the above scenario, the PR introduces the abstraction.

RemoteJobScheduler is a trait that defines the API for scheduling remote jobs. Its implementation is in GreptimeDB Enterprise.

/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs.
#[async_trait::async_trait]
pub trait RemoteJobScheduler: Send + Sync + 'static {
    /// Sends a job to the scheduler and returns a unique identifier for the job.
    async fn schedule(&self, job: RemoteJob, notifier: Arc<dyn Notifier>) -> Result<JobId>;
}

/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
    /// Notify the mito engine that a remote job is completed.
    async fn notify(&self, result: RemoteJobResult);
}

The PR modify schedule_compaction_request() to support remote compaction:

  • If the compact request specifies the remote_compaction in region_options and the RemoteJobScheduler is initialized, the Mito will execute remote compaction;
  • If the remote compaction fails, fall back to local compaction;

Other changes

  • Add the async keyword for all the compaction-related functions because the schedule_compaction_request needs to be async;

  • Use Option type for senders in CompactionFinished because we don't need it in remote compaction scenario;

  • Add remote_compaction in compaction options;

TODOs

  • Inject RemoteJobScheduler from the plugin system;
  • Design the API to fetch the Jobs from the scheduler. When the datanode restarts, it can rebuild the context of the remote job;
  • Add the unit tests for the RemoteJobScheduler;

Checklist

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.

Summary by CodeRabbit

  • New Features

    • Introduced remote compaction scheduling with enhanced asynchronous functionality.
    • Added remote compaction option to TwcsOptions.
  • Bug Fixes

    • Improved error handling with new RemoteJobScheduler variant for better debugging.
  • Refactor

    • Updated CompactionScheduler and related functions to support new asynchronous and plugin features.
  • Improvements

    • Enhanced CompactionOptions and TwcsOptions to include remote compaction checks.
    • Added comprehensive remote job scheduling and notification traits for improved job management.

zyy17 added 30 commits June 6, 2024 18:27
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between fc0e927 and 2731e76.

Files selected for processing (4)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/engine.rs (3 hunks)
  • src/mito2/src/error.rs (2 hunks)
  • src/mito2/src/region/options.rs (4 hunks)
Files skipped from review as they are similar to previous changes (3)
  • src/mito2/src/engine.rs
  • src/mito2/src/error.rs
  • src/mito2/src/region/options.rs
Additional comments not posted (7)
src/mito2/src/compaction.rs (7)

107-108: Approve the addition of plugins field to CompactionScheduler struct.

The addition of the plugins field is consistent with the PR's goal to enable remote job scheduling via plugins. This field should facilitate the integration of different plugins needed for compaction tasks.


Line range hint 118-127: Review the updated constructor of CompactionScheduler.

The constructor now properly initializes the plugins field with the provided plugins argument. This change is necessary for supporting the functionality described in the PR.


Line range hint 133-161: Review the asynchronous transformation of schedule_compaction.

The method has been correctly converted to an asynchronous function, which is essential for handling remote compaction tasks that may involve I/O operations or other asynchronous activities. The implementation appears to handle different compaction statuses and schedules compaction requests appropriately.


Line range hint 165-190: Review the updated on_compaction_finished method.

The method has been updated to async and includes logic to handle the scenario where the next compaction task needs to be scheduled. This change aligns with the asynchronous nature of the operations involved and the need for continuous compaction scheduling.


Line range hint 231-350: Review the comprehensive changes in schedule_compaction_request.

This method now handles the decision-making between remote and local compaction based on the compaction options. The handling of remote job scheduling and the fallback mechanism to local compaction if remote scheduling fails are crucial for the system's robustness and flexibility.


352-359: Review the remove_region_on_failure method.

The method correctly handles the scenario where a region needs to be removed due to a failure. This is an important part of maintaining system stability and ensuring that failed tasks do not linger in the system.


29-30: Validate the necessity of new imports.

Ensure that the Plugins from common_base and info from common_telemetry are used appropriately in the file. This helps maintain clarity and avoid unnecessary dependencies.

Verification successful

New imports are used appropriately.

The Plugins and info imports are utilized within the file, ensuring they are necessary and relevant.

  • Plugins is used in the context of the compaction scheduler.
  • info is used for logging purposes.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of newly imported modules.

# Test: Search for usage of `Plugins` and `info`.
rg --type rust $'Plugins|info' src/mito2/src/compaction.rs

Length of output: 273

src/mito2/src/request.rs Outdated Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
src/mito2/src/region/options.rs Show resolved Hide resolved
src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
@zyy17 zyy17 force-pushed the feat/add-experimental-remote-job-scheduler branch from 4bf1673 to 1a8a742 Compare June 26, 2024 05:52
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range and nitpick comments (1)
src/mito2/src/compaction.rs (1)

Line range hint 107-118: Check the addition of the plugins field in CompactionScheduler.

The addition of the plugins field to CompactionScheduler is a significant architectural change. It integrates plugin management directly into the compaction process, potentially enhancing flexibility and functionality. Ensure that all plugin interactions are thread-safe and well-managed.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 2731e76 and 51db158.

Files selected for processing (2)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Additional comments not posted (6)
src/mito2/src/schedule/remote_job_scheduler.rs (5)

30-35: Ensure correct async trait usage with async_trait macro.

The async_trait macro is correctly applied to the RemoteJobScheduler trait, allowing for asynchronous method definitions. This is crucial for non-blocking operations in the system.


46-58: Review the JobId struct implementation.

The JobId struct is simple and efficient, providing necessary functionality to work with job IDs as u64. Methods as_u64 and from_u64 are correctly implemented.


60-74: Consider removing #[allow(dead_code)] if the enum is used.

The RemoteJob enum is marked with #[allow(dead_code)], which is typically used to suppress warnings for unused code. If this enum is actively used in the system, consider removing the attribute to enforce code cleanliness.

- #[allow(dead_code)]

91-131: Check DefaultNotifier implementation for potential improvements.

The DefaultNotifier handles the notification logic robustly, encapsulating it within a match statement to distinguish between compaction success and failure. However, consider enhancing the logging details for better traceability of errors and operations.
[REFACTOR_SUGGESTion]

+                        "Compaction job completed with status: {:?}", notify,

37-42: Validate the Notifier trait definition.

The Notifier trait is well-defined for asynchronous notifications. Ensure that the implementation of this trait in various components handles errors properly during notification delivery.

#!/bin/bash
# Description: Search for implementations of the `Notifier` trait to verify error handling.

# Test: Search for the trait implementations. Expect: Proper error handling in all implementations.
ast-grep --lang rust --pattern $'impl Notifier for $_ { $$$ }'
src/mito2/src/compaction.rs (1)

Line range hint 133-161: Review asynchronous compaction scheduling logic.

The method schedule_compaction has been correctly updated to be asynchronous. This is essential for non-blocking I/O operations in compaction tasks. Ensure that the error handling and asynchronous logic are robust and that failures in scheduling do not lead to unhandled exceptions or resource leaks.

#!/bin/bash
# Description: Verify error handling in asynchronous compaction scheduling.

# Test: Search for error handling in the async compaction scheduling method. Expect: Robust error handling.
ast-grep --lang rust --pattern $'async fn schedule_compaction($$$) { $$$ }'

src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
@zyy17 zyy17 force-pushed the feat/add-experimental-remote-job-scheduler branch from 51db158 to bf55576 Compare June 26, 2024 13:13
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range and nitpick comments (1)
src/mito2/src/compaction.rs (1)

Line range hint 107-127: Review plugin integration in CompactionScheduler.

The integration of Plugins into CompactionScheduler is crucial for extensibility. Ensure that the plugin system is robustly tested and well-documented to prevent future issues.

Would you like me to help draft the unit tests or documentation for this integration?

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 51db158 and bf55576.

Files selected for processing (3)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/region/options.rs (5 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Additional comments not posted (4)
src/mito2/src/schedule/remote_job_scheduler.rs (2)

30-35: Consider using more descriptive names for job and notifier parameters.

The parameters job and notifier in the schedule method could be more descriptive to enhance code readability and maintainability.

- async fn schedule(&self, job: RemoteJob, notifier: Arc<dyn Notifier>) -> Result<JobId>;
+ async fn schedule(&self, remote_job: RemoteJob, job_notifier: Arc<dyn Notifier>) -> Result<JobId>;

46-57: Simplify JobId struct with newtype pattern.

The JobId struct is currently a tuple struct. Consider using the newtype pattern for better type safety and encapsulation.

- pub struct JobId(u64);
+ pub struct JobId(pub u64);

Likely invalid or redundant comment.

src/mito2/src/region/options.rs (2)

148-150: Clarify serialization details for remote_compaction field.

The remote_compaction field uses serde_as for serialization. Ensure the serialization format is clearly documented and understood, especially if it deviates from typical boolean serialization.


121-125: Ensure consistency in remote compaction feature check.

The method remote_compaction checks if remote compaction is enabled. Ensure that this setting is consistently used wherever relevant throughout the codebase.

src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
@zyy17 zyy17 requested review from evenyag and sunng87 June 26, 2024 14:36
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Outside diff range and nitpick comments (1)
src/mito2/src/schedule/remote_job_scheduler.rs (1)

44-49: Validate the usage of RemoteJob enum.

The RemoteJob enum currently only has one variant, CompactionJob. Consider future scalability by potentially adding more job types or providing a more generic mechanism for job handling to accommodate different types of remote tasks beyond compaction.

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between bf55576 and 617f8f4.

Files selected for processing (2)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Files not reviewed due to errors (1)
  • src/mito2/src/compaction.rs (no review received)
Additional comments not posted (1)
src/mito2/src/schedule/remote_job_scheduler.rs (1)

51-58: Review the fields of CompactionJob for potential optimization.

The CompactionJob struct includes compaction_region, picker_output, and start_time. Ensure these fields are necessary and optimally used. For instance, start_time might be redundant if timestamps are handled elsewhere in the job lifecycle.

src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 617f8f4 and f220543.

Files selected for processing (2)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/error.rs (2 hunks)
Additional comments not posted (3)
src/mito2/src/error.rs (1)

758-763: Well-integrated new error type for remote job scheduler issues.

The RemoteJobScheduler error type is well-defined, with appropriate use of the Location type for context and a clear display message. It's correctly integrated into the ErrorExt implementation to return an Internal status code, which is suitable for server-side issues.

src/mito2/src/compaction.rs (2)

108-109: Proper integration of plugins for remote job scheduling.

The addition of the plugins field in CompactionScheduler is well-implemented. It is properly initialized in the constructor and is crucial for enabling the integration of remote job scheduling. This change supports the extensibility and modularity of the compaction scheduling system.

Also applies to: 119-128


286-327: Robust handling of remote compaction scheduling with appropriate fallback.

The implementation robustly handles the possibility of remote compaction by checking for the availability of a remote job scheduler and scheduling a remote job accordingly. It appropriately falls back to local compaction if remote scheduling is not possible, ensuring that compaction tasks are reliably handled.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between f220543 and fbef517.

Files selected for processing (3)
  • src/mito2/src/compaction.rs (15 hunks)
  • src/mito2/src/error.rs (2 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/mito2/src/error.rs
Additional comments not posted (6)
src/mito2/src/schedule/remote_job_scheduler.rs (3)

32-37: Consider enhancing the documentation for RemoteJobScheduler.

The trait RemoteJobScheduler is crucial as it defines the API for scheduling remote jobs. However, it lacks comprehensive documentation that explains its purpose, usage, and the parameters of its methods. This is especially important for asynchronous methods in a distributed system context.


39-44: Ensure proper error handling in the Notifier trait.

The notify method currently does not return a Result, which might imply that all errors are handled internally. However, it's crucial to allow error propagation to the caller to handle them appropriately, especially in a distributed environment where network issues and other errors are common.


77-119: Refactor error handling in DefaultNotifier.notify.

The error handling in the notify method should be improved to provide better visibility and handle errors more gracefully. The current implementation logs the error but does not propagate it, which might lead to silent failures in the system.

-                    error!(
-                        "Failed to notify compaction job status for region {}, request: {:?}",
-                        result.region_id, e.0
-                    );
+                    error!("Failed to notify compaction job status for region {}, error: {:?}", result.region_id, e);
src/mito2/src/compaction.rs (3)

108-109: Review the integration of plugins within CompactionScheduler.

The addition of plugins to CompactionScheduler is a significant change. Ensure that the integration is seamless and that the plugins are being utilized correctly, particularly in the context of remote compaction.

+    /// Plugins for the compaction scheduler.
+    plugins: Plugins,

Also applies to: 119-119, 128-128


286-317: Assess the fallback mechanism for remote compaction.

The logic to fallback to local compaction if remote scheduling fails is crucial for reliability. Review the implementation to ensure that the fallback mechanism is seamless and does not introduce delays or inconsistencies in the compaction process. Consider adding more detailed logging at each step of the fallback to improve observability.

+                debug!("Fallback initiated for region {}", region_id);

Line range hint 134-162: Evaluate the asynchronous compaction scheduling logic.

The changes to make schedule_compaction and related functions asynchronous are significant. It's crucial to ensure that these changes do not introduce any race conditions or logic errors, especially given the complexity of handling compaction tasks asynchronously.

+    pub(crate) async fn schedule_compaction(
+        &mut self,
+        region_id: RegionId,
+        compact_options: compact_request::Options,
+        version_control: &VersionControlRef,
+        access_layer: &AccessLayerRef,
+        waiter: OptionOutputTx,
+        manifest_ctx: &ManifestContextRef,
+    ) -> Result<()>

Also applies to: 166-191, 232-347

src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between fbef517 and 613edc8.

Files selected for processing (1)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Additional comments not posted (2)
src/mito2/src/schedule/remote_job_scheduler.rs (2)

63-68: Confirm the usage and future plans for RemoteJob enum.

The #[allow(dead_code)] attribute suggests that not all variants are currently used. It's important to confirm if this is for future extensibility or if there's unnecessary code that could be cleaned up.


70-77: CompactionJob struct is well-defined.

The fields are appropriately chosen to represent a compaction job, and the use of Instant for start_time is suitable for performance tracking.

Comment on lines 33 to 53
/// RemoteJobScheduler is a trait that defines the API to schedule remote jobs.
/// For example, a compaction job can be scheduled remotely as the following workflow:
/// ```mermaid
/// participant User
/// participant MitoEngine
/// participant CompactionScheduler
/// participant Plugins
/// participant RemoteJobScheduler
///
/// User->>MitoEngine: Initiates compaction
/// MitoEngine->>CompactionScheduler: schedule_compaction()
/// CompactionScheduler->>Plugins: Handle plugins
/// CompactionScheduler->>RemoteJobScheduler: schedule(CompactionJob)
/// RemoteJobScheduler-->>CompactionScheduler: Returns Job UUID
/// CompactionScheduler-->>MitoEngine: Task scheduled with Job UUID
/// MitoEngine-->>User: Compaction task scheduled
/// ```
#[async_trait::async_trait]
pub trait RemoteJobScheduler: Send + Sync + 'static {
/// Sends a job to the scheduler and returns a UUID for the job.
async fn schedule(&self, job: RemoteJob, notifier: Arc<dyn Notifier>) -> Result<String>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance documentation for RemoteJobScheduler.schedule.

While the trait and the schedule method are documented, the parameters job and notifier could benefit from more detailed explanations, especially since this method is crucial for the system's operation.

Would you like me to draft additional documentation detailing these parameters?

Comment on lines 56 to 60
/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
/// Notify the mito engine that a remote job is completed.
async fn notify(&mut self, result: RemoteJobResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider modifying the Notifier.notify method to return a Result.

Returning a Result from notify would allow error propagation to the caller, which is crucial in a distributed system where failures are common.

-    async fn notify(&mut self, result: RemoteJobResult);
+    async fn notify(&mut self, result: RemoteJobResult) -> Result<()>;
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
/// Notify the mito engine that a remote job is completed.
async fn notify(&mut self, result: RemoteJobResult);
/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
/// Notify the mito engine that a remote job is completed.
async fn notify(&mut self, result: RemoteJobResult) -> Result<()>;

Comment on lines 85 to 92
/// CompactionJobResult is the result of a compaction job.
#[allow(dead_code)]
pub struct CompactionJobResult {
pub job_id: String,
pub region_id: RegionId,
pub start_time: Instant,
pub region_edit: Result<RegionEdit>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using Uuid for job_id in CompactionJobResult.

Using a Uuid type instead of String for job_id would enhance type safety and consistency with typical UUID usage patterns.

-    pub job_id: String,
+    pub job_id: Uuid,
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// CompactionJobResult is the result of a compaction job.
#[allow(dead_code)]
pub struct CompactionJobResult {
pub job_id: String,
pub region_id: RegionId,
pub start_time: Instant,
pub region_edit: Result<RegionEdit>,
}
/// CompactionJobResult is the result of a compaction job.
#[allow(dead_code)]
pub struct CompactionJobResult {
pub job_id: Uuid,
pub region_id: RegionId,
pub start_time: Instant,
pub region_edit: Result<RegionEdit>,
}

Comment on lines 94 to 135
/// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
pub(crate) struct DefaultNotifier {
pub(crate) request_sender: Sender<WorkerRequest>,
pub(crate) waiters: Vec<OutputTx>,
}

#[async_trait::async_trait]
impl Notifier for DefaultNotifier {
async fn notify(&mut self, result: RemoteJobResult) {
match result {
RemoteJobResult::CompactionJobResult(result) => {
let notify = {
match result.region_edit {
Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: result.region_id,
senders: std::mem::take(&mut self.waiters),
start_time: result.start_time,
edit,
}),
Err(err) => BackgroundNotify::CompactionFailed(CompactionFailed {
region_id: result.region_id,
err: Arc::new(err),
}),
}
};

if let Err(e) = self
.request_sender
.send(WorkerRequest::Background {
region_id: result.region_id,
notify,
})
.await
{
error!(
"Failed to notify compaction job status for region {}, error: {:?}",
result.region_id, e
);
}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refine error handling and logging in DefaultNotifier.notify.

The current implementation logs errors but does not propagate them. Enhancing error propagation and making the error messages more specific could improve system robustness and debuggability.

-                    error!(
-                        "Failed to notify compaction job status for region {}, error: {:?}",
-                        result.region_id, e
-                    );
+                    return Err(Error::from(e));

Consider implementing a more robust error handling strategy that includes error propagation to improve system resilience.

Committable suggestion was skipped due to low confidence.


/// Notifier is used to notify the mito engine when a remote job is completed.
#[async_trait::async_trait]
pub trait Notifier: Send + Sync + 'static {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the notifier need to be Sync?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it needs to be Sync because it will start a periodic task to check the remote compaction status and update the manifest.

src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
src/mito2/src/schedule/remote_job_scheduler.rs Outdated Show resolved Hide resolved
Comment on lines 297 to 304
.schedule(
RemoteJob::CompactionJob(remote_compaction_job),
Arc::new(DefaultNotifier {
request_sender: request_sender.clone(),
waiters: std::mem::take(&mut waiters),
}),
)
.await;
Copy link
Contributor

@evenyag evenyag Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the remote job scheduler returns an error, we will lose all waiters.

Maybe we can put the waiters in the Error so that we can take them back. We can define a dedicated error for the scheduler which wraps mito::error::Error and the waiters, like errors of channels.

Otherwise, we might need to track waiters by the job id. If the scheduler returns an error, we fall back to the local scheduler with the waiters. Once a remote job is finished, we can notify those waiters.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow the original implementation, maybe I need to add the following code:

impl DefaultNotifier {
    fn on_failure(&mut self, err: Arc<Error>, region_id: RegionId) {
        COMPACTION_FAILURE_COUNT.inc();
        for waiter in self.waiters.drain(..) {
            waiter.send(Err(err.clone()).context(CompactRegionSnafu {
                region_id,
            }));
        }
    }
}

#[async_trait::async_trait]
impl Notifier for DefaultNotifier {
    async fn notify(&mut self, result: RemoteJobResult) {
        match result {
            RemoteJobResult::CompactionJobResult(result) => {
                let notify = {
                    match result.region_edit {
                        Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
                            region_id: result.region_id,
                            senders: std::mem::take(&mut self.waiters),
                            start_time: result.start_time,
                            edit,
                        }),
                        Err(err) => {
                            error!("Compaction failed for region {}: {:?}", result.region_id, err);
                            let err = Arc::new(err);
                            self.on_failure(err.clone(), result.region_id);
                            BackgroundNotify::CompactionFailed(CompactionFailed {
                                region_id: result.region_id,
                                err,
                            })
                        }
                    }
                };

                if let Err(e) = self
                    .request_sender
                    .send(WorkerRequest::Background {
                        region_id: result.region_id,
                        notify,
                    })
                    .await
                {
                    error!(
                        "Failed to notify compaction job status for region {}, error: {:?}",
                        result.region_id, e
                    );
                }
            }
        }
    }
}

Should we need to add the new type of error or we can put it in the new PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new PR has the following refactoring:

  1. Add the waiters in RemoteJobScheduler error;
  2. Move waiters from DefaultNotifier to Job, and it will be convenient to pass between multiple objects;

src/mito2/src/compaction.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 613edc8 and d299592.

Files selected for processing (4)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/error.rs (5 hunks)
  • src/mito2/src/request.rs (1 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/mito2/src/error.rs
Additional comments not posted (7)
src/mito2/src/schedule/remote_job_scheduler.rs (1)

60-64: Consider modifying the Notifier.notify method to return a Result.

Returning a Result from notify would allow error propagation to the caller, which is crucial in a distributed system where failures are common.
The previous comment on this issue is still valid.

src/mito2/src/compaction.rs (1)

108-109: Approve addition of plugins field and async modifications.

The addition of the plugins field to CompactionScheduler and the conversion of methods to async are well-aligned with the system's evolution towards supporting remote compactions and asynchronous operations.

Also applies to: 119-128

src/mito2/src/request.rs (5)

390-390: Well-structured and clear implementation of OutputTx.

The OutputTx struct and its send method are well-implemented, providing a clear and encapsulated way to handle result transmission. The choice to ignore the send result is appropriate here, avoiding unnecessary complexity.

Also applies to: 397-398


Line range hint 408-408: Consider simplifying OptionOutputTx by removing the Option wrapper.

The implementation of OptionOutputTx is robust, but as per previous discussions in the PR, consider whether the Option wrapper is necessary. Using a direct OutputTx or an empty vector might simplify the code.
[REFACTOR_SUGGESTion]

Also applies to: 410-410, 412-413, 415-416, 418-419, 421-425


Line range hint 427-429: Good use of the OnFailure trait for consistent error handling.

The introduction of the OnFailure trait helps standardize how errors are handled across different parts of the system, which is a good practice for maintainability.


Line range hint 431-434: Clear and effective struct definitions for request handling.

SenderWriteRequest and WorkerRequest are well-defined and align well with the system’s architecture for handling different types of requests.


Line range hint 436-493: Comprehensive implementation of background job notifications.

The BackgroundNotify enum and related structs like FlushFinished and CompactionFinished are well-implemented, providing detailed handling for various background job results. This enhances the system's responsiveness and reliability.

Comment on lines +54 to +58
#[async_trait::async_trait]
pub trait RemoteJobScheduler: Send + Sync + 'static {
/// Sends a job to the scheduler and returns a UUID for the job.
async fn schedule(&self, job: RemoteJob, notifier: Box<dyn Notifier>) -> Result<JobId>;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance documentation for RemoteJobScheduler.schedule.

The schedule method is a core part of the remote job scheduling functionality, but it lacks detailed documentation. Detailed explanations on the parameters job and notifier and the method's purpose would be beneficial.

Would you like me to draft additional documentation detailing these parameters?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Outside diff range and nitpick comments (1)
src/mito2/src/compaction.rs (1)

Line range hint 232-354: Evaluate the fallback mechanism for remote compaction.

The logic to fallback to local compaction if remote scheduling fails is crucial for reliability. Review the implementation to ensure that the fallback mechanism is seamless and does not introduce delays or inconsistencies in the compaction process. Consider adding more detailed logging at each step of the fallback to improve observability.

+                debug!("Fallback initiated for region {}", region_id);
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between d299592 and dfb24c3.

Files selected for processing (2)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Files skipped from review as they are similar to previous changes (1)
  • src/mito2/src/schedule/remote_job_scheduler.rs
Additional comments not posted (7)
src/mito2/src/compaction.rs (7)

29-30: Approved the enhanced logging capability.

The addition of info logging is consistent with the PR's objectives to enhance telemetry within the system.


108-109: Approved the addition of the plugins field to CompactionScheduler.

Integrating the Plugins struct is crucial for managing the remote job scheduler functionality, aligning with the PR's objectives to enhance compaction task handling.


119-119: Approved the changes to the CompactionScheduler constructor.

Accepting a Plugins parameter and initializing the plugins field are necessary steps to enable the use of the plugin system for remote job scheduling.

Also applies to: 128-128


134-134: Approved the asynchronous modification of schedule_compaction.

Converting schedule_compaction to an asynchronous method is essential for non-blocking operations, especially given the potentially long-running nature of compaction tasks.

Also applies to: 162-162


166-166: Approved the asynchronous modification of on_compaction_finished.

Making on_compaction_finished asynchronous ensures that the method can handle complex logic and external calls effectively without blocking the system.

Also applies to: 191-191


356-363: Approved the changes to remove_region_on_failure.

Enhancing the remove_region_on_failure method to handle region removal on failure improves the robustness and cleanliness of state management within the system.


Line range hint 640-762: Approved the addition of test cases for compaction scheduling.

The new test cases effectively verify the behavior of the compaction scheduling logic under various conditions, enhancing the reliability and test coverage of the system.

@zyy17 zyy17 requested a review from evenyag June 27, 2024 14:57
@zyy17 zyy17 force-pushed the feat/add-experimental-remote-job-scheduler branch from dfb24c3 to ca1c8fa Compare June 27, 2024 15:08
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Outside diff range and nitpick comments (5)
src/mito2/src/compaction.rs (5)

108-109: Review the integration of Plugins into CompactionScheduler.

The integration of Plugins to manage compaction-related plugins is a strategic enhancement. It aligns with the PR's objective to facilitate plugin-based remote job scheduling. Ensure that the Plugins object is properly managed across its lifecycle, particularly regarding ownership and mutability concerns.

Also applies to: 119-128


Line range hint 166-191: Assess the error handling in on_compaction_finished.

The method correctly handles the scenario where no compaction tasks are pending for a region. However, the error handling could be more robust by ensuring that all potential error paths are logged and handled appropriately to prevent silent failures.

+            error!("No pending compaction tasks for region {}", region_id);

[!TIP]
Codebase Verification

Asynchronous Handling of schedule_compaction Calls Needs Adjustment

The calls to the schedule_compaction function in the following files are not properly handling its asynchronous nature. These calls should use the .await keyword to ensure correct execution:

  • src/mito2/src/worker/handle_compaction.rs

    .compaction_scheduler
    .schedule_compaction(
  • src/mito2/src/worker/handle_flush.rs

    .compaction_scheduler
    .schedule_compaction(

Please update these calls to handle the asynchronous nature correctly by using .await.

Analysis chain

Line range hint 134-162: Evaluate the asynchronous implementation of schedule_compaction.

Transforming schedule_compaction to an async function supports the non-blocking scheduling of compaction tasks. However, ensure that all calling functions are also adapted to handle this asynchronous nature to prevent runtime issues.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify all function calls to `schedule_compaction` are properly handling its asynchronous nature.

# Test: Search for the function usage. Expect: Correct async handling in all occurrences.
rg --type rust $'schedule_compaction'

Length of output: 910



Script:

#!/bin/bash
# Description: Verify all function calls to `schedule_compaction` are properly handling its asynchronous nature.

# Check each call to `schedule_compaction` for proper async handling.
rg --type rust 'schedule_compaction' -A 2

Length of output: 2730



Script:

#!/bin/bash
# Description: Verify all function calls to `schedule_compaction` are properly handling its asynchronous nature.

# Check the context of calls to `schedule_compaction` in the specified files for proper async handling.
rg 'schedule_compaction' src/mito2/src/worker/handle_compaction.rs -A 5 -B 5
rg 'schedule_compaction' src/mito2/src/worker/handle_flush.rs -A 5 -B 5

Length of output: 883


[!TIP]
Codebase Verification

Enhance logging for fallback to local compaction

The fallback to local compaction is currently logged with the statement:

debug!("Remote compaction is not enabled, fallback to local compaction for region {}", region_id);

However, the review comment suggests adding a more explicit log statement to improve traceability. Consider adding the following log statement to ensure comprehensive logging:

debug!("Initiating fallback to local compaction for region {}", region_id);
  • File: src/mito2/src/compaction.rs
  • Lines: Add the suggested log statement before the existing fallback log statement.
+            debug!("Initiating fallback to local compaction for region {}", region_id);
Analysis chain

Line range hint 232-354: Review the comprehensive implementation of schedule_compaction_request.

This function now handles both local and remote compaction scenarios, including fallback mechanisms. It's crucial to ensure that the fallback to local compaction is seamless and does not introduce additional latency or resource overhead. Also, consider enhancing the logging during the fallback process to improve traceability.

+            debug!("Initiating fallback to local compaction for region {}", region_id);
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the seamless operation of the fallback mechanism in `schedule_compaction_request`.

# Test: Search for the function usage. Expect: No reported issues or delays in fallback scenarios.
rg --type rust $'schedule_compaction_request'

Length of output: 279



Script:

#!/bin/bash
# Description: Verify the presence of the debug log statement indicating the fallback to local compaction.

# Search for the debug log statement within the `schedule_compaction_request` function.
rg 'debug!\("Initiating fallback to local compaction for region \{\}", region_id\);' --type rust

Length of output: 98



Script:

#!/bin/bash
# Description: Verify the presence of debug log statements related to the fallback mechanism in `schedule_compaction_request`.

# Search for all debug log statements within the `schedule_compaction_request` function.
rg 'debug!' --type rust -A 5 -B 5

Length of output: 121233


[!TIP]
Codebase Verification

Ensure comprehensive testing of asynchronous compaction scheduling.

The current codebase lacks direct test functions for schedule_compaction. It is crucial to add comprehensive test cases covering various scenarios, including edge cases and error paths, to ensure robust and reliable compaction scheduling.

  • Add test cases for schedule_compaction in src/mito2/src/compaction.rs and related files.
  • Ensure coverage includes edge cases and fallback mechanisms in remote compaction.
Analysis chain

Line range hint 640-659: Ensure comprehensive testing of asynchronous compaction scheduling.

The tests provided seem to cover various scenarios, including empty and populated compaction tasks. However, ensure that these tests are comprehensive enough to cover all edge cases, especially those involving error paths and fallback mechanisms in remote compaction.

Would you like help in enhancing the test coverage or designing specific test cases for edge scenarios?

Also applies to: 699-728, 740-762

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the coverage and effectiveness of tests for `schedule_compaction`.

# Test: Search for additional test cases in the test suite. Expect: Comprehensive coverage including edge cases.
rg --type rust $'test_schedule_compaction'

Length of output: 41



Script:

#!/bin/bash
# Description: Verify the coverage and effectiveness of tests for `schedule_compaction`.

# Search for the `schedule_compaction` function definition to understand its context.
ast-grep --lang rust --pattern 'fn schedule_compaction($$_) { $$$ }'

# Search for test cases that call the `schedule_compaction` function.
rg --type rust 'schedule_compaction'

Length of output: 981



Script:

#!/bin/bash
# Description: Identify test functions in files where `schedule_compaction` is invoked.

# List the files where `schedule_compaction` is used.
files=$(rg --files-with-matches 'schedule_compaction')

# Search for test functions within these files.
for file in $files; do
  echo "Searching for test functions in $file"
  ast-grep --lang rust --pattern 'fn test_$$_() { $$$ }' $file
done

Length of output: 1276



Script:

#!/bin/bash
# Description: Identify all test functions across the codebase and check if any involve `schedule_compaction`.

# Search for all test functions across the codebase.
ast-grep --lang rust --pattern 'fn test_$$_() { $$$ }'

# Search for invocations of `schedule_compaction` within the identified test functions.
rg --type rust 'schedule_compaction'

Length of output: 967

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between dfb24c3 and ca1c8fa.

Files selected for processing (3)
  • src/mito2/src/compaction.rs (16 hunks)
  • src/mito2/src/error.rs (5 hunks)
  • src/mito2/src/schedule/remote_job_scheduler.rs (1 hunks)
Additional comments not posted (7)
src/mito2/src/schedule/remote_job_scheduler.rs (3)

62-65: Consider making Notifier.notify method return a Result.

The notify method in the Notifier trait currently does not return a Result, which might limit error handling capabilities. Consider modifying this to return a Result<(), Error> to allow propagation of errors to the caller.
[REFACTOR_Suggestion]

-    async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>);
+    async fn notify(&self, result: RemoteJobResult, waiters: Vec<OutputTx>) -> Result<(), Error>;

130-174: Review error handling strategy in DefaultNotifier.notify.

The method notify in the DefaultNotifier implementation logs errors but does not propagate them. Consider enhancing error propagation and making the error messages more specific to improve system robustness and debuggability.
[REFACTOR_Suggestion]

-                    error!(
-                        "Failed to notify compaction job status for region {}, error: {:?}",
-                        result.region_id, e
-                    );
+                    return Err(Error::from(e));

54-58: Ensure Notifier is provided correctly in RemoteJobScheduler.schedule.

The method schedule in the RemoteJobScheduler trait takes a Box<dyn Notifier> as a parameter. Ensure that the lifecycle and ownership of this Notifier are managed properly, especially since it's being passed around in an async environment.

src/mito2/src/error.rs (3)

758-765: Review new RemoteJobScheduler error variant.

The new RemoteJobScheduler error variant includes waiters which is a good design for notifying about fallbacks. Ensure that this pattern is consistently used across other similar error handling scenarios.


767-773: Check the implementation of ParseJobId error variant.

The ParseJobId error variant is well-implemented, leveraging the underlying uuid::Error. This is a good practice as it uses existing error handling mechanisms provided by the uuid crate.


799-805: Ensure correct usage of waiters_in_remote_job_scheduler method.

The method waiters_in_remote_job_scheduler extracts waiters from the RemoteJobScheduler error variant. Verify that this method is used correctly throughout the codebase to ensure proper error recovery and notification.

Verification successful

Correct usage of waiters_in_remote_job_scheduler method confirmed.

The method is used within an error handling block to manage waiters properly when remote compaction scheduling fails, ensuring proper fallback to local compaction.

  • src/mito2/src/compaction.rs: Correctly handles errors and uses the method to return waiters for local compaction.
Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify the usage of `waiters_in_remote_job_scheduler` method across the codebase.

# Test: Search for usage of the method. Expect: Correct usage patterns.
rg --type rust $'waiters_in_remote_job_scheduler'

Length of output: 234



Script:

#!/bin/bash
# Description: Extract the context where `waiters_in_remote_job_scheduler` is used in `src/mito2/src/compaction.rs`.

# Search for the method usage and extract surrounding lines for context.
rg --type rust -C 5 'waiters_in_remote_job_scheduler' src/mito2/src/compaction.rs

Length of output: 602

src/mito2/src/compaction.rs (1)

29-30: Approval of new imports for telemetry and plugins.

The addition of common_base::Plugins and telemetry functions (debug, error, info) are appropriate for the new functionalities related to remote job scheduling and enhanced logging capabilities.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs-not-required This change does not impact docs.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants