Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce the interface of RemoteJobScheduler #4181

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
0d5b790
refactor: add Compactor trait
zyy17 Jun 2, 2024
a91c5d6
chore: add compact() in Compactor trait and expose compaction module
zyy17 Jun 2, 2024
37b0464
refactor: add CompactionRequest and open_compaction_region
zyy17 Jun 2, 2024
014bc22
refactor: export the compaction api
zyy17 Jun 3, 2024
f9d1e66
refactor: add DefaultCompactor::new_from_request
zyy17 Jun 3, 2024
3eb0dab
refactor: no need to pass mito_config in open_compaction_region()
zyy17 Jun 3, 2024
8cd55a2
refactor: CompactionRequest -> &CompactionRequest
zyy17 Jun 3, 2024
891426d
fix: typo
zyy17 Jun 3, 2024
8ff9c6f
docs: add docs for public apis
zyy17 Jun 4, 2024
acabc07
refactor: remove 'Picker' from Compactor
zyy17 Jun 4, 2024
de8496c
chore: add logs
zyy17 Jun 4, 2024
6bee93e
chore: change pub attribute for Picker
zyy17 Jun 4, 2024
a63c9f5
refactor: remove do_merge_ssts()
zyy17 Jun 4, 2024
3a360c0
refactor: update comments
zyy17 Jun 4, 2024
35feb02
refactor: use CompactionRegion argument in Picker
zyy17 Jun 5, 2024
55033f9
chore: make compaction module public and remove unnessary clone
zyy17 Jun 6, 2024
15c102c
refactor: move build_compaction_task() in CompactionScheduler{}
zyy17 Jun 6, 2024
8ad6fff
chore: use in open_compaction_region() and add some comments for pub…
zyy17 Jun 6, 2024
26ae2a9
refactor: add 'manifest_dir()' in store-api
zyy17 Jun 6, 2024
9a05b6a
refactor: move the default implementation to DefaultCompactor
zyy17 Jun 6, 2024
130fb90
refactor: remove Options from MergeOutput
zyy17 Jun 6, 2024
e141e91
chore: minor modification
zyy17 Jun 6, 2024
e3e6f12
fix: clippy errors
zyy17 Jun 6, 2024
93f5a74
fix: unit test errors
zyy17 Jun 6, 2024
17bf852
refactor: remove 'manifest_dir()' from store-api crate(already have o…
zyy17 Jun 6, 2024
48f9398
refactor: use 'region_dir' in CompactionRequest
zyy17 Jun 6, 2024
2f6d9ef
refactor: refine naming
zyy17 Jun 6, 2024
861ea41
chore: sync main branch
zyy17 Jun 7, 2024
d26c64a
refactor: refine naming
zyy17 Jun 7, 2024
4968186
refactor: remove clone()
zyy17 Jun 7, 2024
574fd9f
chore: add comments
zyy17 Jun 7, 2024
77c085c
refactor: add PickerOutput field in CompactorRequest
zyy17 Jun 7, 2024
f8d88a2
chore: sync main branch
zyy17 Jun 16, 2024
f92c729
feat: introduce RemoteJobScheduler
zyy17 Jun 7, 2024
c0c7b6d
feat: add RemoteJobScheudler in schedule_compaction_request()
zyy17 Jun 7, 2024
84a8ff7
refactor: use Option type for senders field of CompactionFinished
zyy17 Jun 8, 2024
f421d08
refactor: modify CompactionJob
zyy17 Jun 8, 2024
6f24668
refactor: schedule remote compaction job by options
zyy17 Jun 9, 2024
cbe2115
refactor: remove unused Options
zyy17 Jun 9, 2024
66ee079
build: remove unused log
zyy17 Jun 9, 2024
3c183ff
refactor: fallback to local compaction if the remote compaction failed
zyy17 Jun 9, 2024
90a4794
fix: clippy errors
zyy17 Jun 17, 2024
c67b9ad
chore: sync main branch
zyy17 Jun 17, 2024
0f5c4d3
refactor: add plugins in mito2
zyy17 Jun 17, 2024
6ffdae2
refactor: add from_u64() for JobId
zyy17 Jun 18, 2024
06fc1e4
refactor: make schedule module public
zyy17 Jun 18, 2024
01b20b2
refactor: add error for RemoteJobScheduler
zyy17 Jun 18, 2024
b740c41
refactor: add Notifier
zyy17 Jun 18, 2024
4404973
refactor: use Arc for Notifier
zyy17 Jun 18, 2024
25bfdf9
chore: sync main branch
zyy17 Jun 19, 2024
1d6efa4
refactor: add 'remote_compaction' in compaction options
zyy17 Jun 19, 2024
07459d3
fix: clippy errors
zyy17 Jun 19, 2024
d08b56e
fix: unrecognized table option
zyy17 Jun 19, 2024
840e642
refactor: add 'start_time' in CompactionJob
zyy17 Jun 22, 2024
fc0e927
refactor: modify error type of RemoteJobScheduler
zyy17 Jun 22, 2024
2731e76
Merge branch 'main' into feat/add-experimental-remote-job-scheduler
zyy17 Jun 24, 2024
1a8a742
chore: revert changes for request
zyy17 Jun 26, 2024
bf55576
refactor: code refactor by review comment
zyy17 Jun 26, 2024
617f8f4
refactor: use string type for JobId
zyy17 Jun 26, 2024
f220543
Merge branch 'main' into feat/add-experimental-remote-job-scheduler
zyy17 Jun 26, 2024
2f92206
refactor: add 'waiters' field in DefaultNotifier
zyy17 Jun 26, 2024
fbef517
fix: build error
zyy17 Jun 26, 2024
613edc8
refactor: take coderabbit's review comment
zyy17 Jun 27, 2024
4e3ccdf
refactor: use uuid::Uuid as JobId
zyy17 Jun 27, 2024
d299592
refactor: return waiters when schedule failed and add on_failure for …
zyy17 Jun 27, 2024
ca1c8fa
refactor: move waiters from notifier to Job
zyy17 Jun 27, 2024
5246a22
refactor: use ObjectStoreManagerRef in open_compaction_region()
zyy17 Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: use uuid::Uuid as JobId
  • Loading branch information
zyy17 committed Jun 27, 2024
commit 4e3ccdf4895dd5c1e44bb9f63b2c805359345138
4 changes: 2 additions & 2 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ impl CompactionScheduler {
let result = remote_job_scheduler
.schedule(
RemoteJob::CompactionJob(remote_compaction_job),
Arc::new(DefaultNotifier {
Box::new(DefaultNotifier {
request_sender: request_sender.clone(),
waiters: std::mem::take(&mut waiters),
}),
Expand All @@ -305,7 +305,7 @@ impl CompactionScheduler {

if let Ok(job_id) = result {
info!(
"Scheduled remote compaction job {} for region {}",
"Scheduled remote compaction job {:?} for region {}",
job_id, region_id
);

Expand Down
11 changes: 10 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,14 @@ pub enum Error {
reason: String,
},

#[snafu(display("Failed to parse job id"))]
ParseJobId {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: uuid::Error,
},

#[snafu(display("Operation is not supported: {}", err_msg))]
UnsupportedOperation {
err_msg: String,
Expand Down Expand Up @@ -819,7 +827,8 @@ impl ErrorExt for Error {
| InvalidMetadata { .. }
| InvalidRegionOptions { .. }
| InvalidWalReadRequest { .. }
| PartitionOutOfRange { .. } => StatusCode::InvalidArguments,
| PartitionOutOfRange { .. }
| ParseJobId { .. } => StatusCode::InvalidArguments,

InvalidRegionRequestSchemaVersion { .. } => StatusCode::RequestOutdated,

Expand Down
25 changes: 22 additions & 3 deletions src/mito2/src/schedule/remote_job_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ use std::sync::Arc;
use std::time::Instant;

use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;

use crate::compaction::compactor::CompactionRegion;
use crate::compaction::picker::PickerOutput;
use crate::error::Result;
use crate::error::{ParseJobIdSnafu, Result};
use crate::manifest::action::RegionEdit;
use crate::request::{
BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest,
Expand Down Expand Up @@ -50,7 +53,7 @@ pub type RemoteJobSchedulerRef = Arc<dyn RemoteJobScheduler>;
#[async_trait::async_trait]
pub trait RemoteJobScheduler: Send + Sync + 'static {
/// Sends a job to the scheduler and returns a UUID for the job.
async fn schedule(&self, job: RemoteJob, notifier: Arc<dyn Notifier>) -> Result<String>;
async fn schedule(&self, job: RemoteJob, notifier: Box<dyn Notifier>) -> Result<JobId>;
}

/// Notifier is used to notify the mito engine when a remote job is completed.
Expand All @@ -60,6 +63,22 @@ pub trait Notifier: Send + Sync + 'static {
async fn notify(&mut self, result: RemoteJobResult);
}

/// Unique id for a remote job.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct JobId(Uuid);

impl JobId {
/// Parses job id from string.
pub fn parse_str(input: &str) -> Result<JobId> {
Uuid::parse_str(input).map(JobId).context(ParseJobIdSnafu)
}

/// Covert job id to string.
pub fn to_string(&self) -> String {
self.0.to_string()
}
}

/// RemoteJob is a job that can be executed remotely. For example, a remote compaction job.
#[derive(Clone)]
#[allow(dead_code)]
Expand All @@ -85,7 +104,7 @@ pub enum RemoteJobResult {
/// CompactionJobResult is the result of a compaction job.
#[allow(dead_code)]
pub struct CompactionJobResult {
pub job_id: String,
pub job_id: JobId,
pub region_id: RegionId,
pub start_time: Instant,
pub region_edit: Result<RegionEdit>,
Expand Down