Skip to content

Commit

Permalink
feat(idpe 17789): provide job to CompactionJobDoneSink (formerly know…
Browse files Browse the repository at this point in the history
…n as PartitionDoneSink) (#8368)

* rename PartitionDoneSink to CompactionJobSink. and change signature in trait
* update all trait implementations, including local variables and comments
* rename partition_done_sink in the components and driver, to be compaction_job_done_sink
  • Loading branch information
wiedld committed Aug 2, 2023
1 parent e45c9b6 commit 68ab2c9
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 187 deletions.
32 changes: 16 additions & 16 deletions compactor/src/components/hardcoded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ use super::{
},
parquet_files_sink::{dispatch::DispatchParquetFilesSink, ParquetFilesSink},
partition_done_sink::{
error_kind::ErrorKindPartitionDoneSinkWrapper, logging::LoggingPartitionDoneSinkWrapper,
metrics::MetricsPartitionDoneSinkWrapper, outcome::PartitionDoneSinkToScheduler,
PartitionDoneSink,
error_kind::ErrorKindCompactionJobDoneSinkWrapper,
logging::LoggingCompactionJobDoneSinkWrapper, metrics::MetricsCompactionJobDoneSinkWrapper,
outcome::CompactionJobDoneSinkToScheduler, CompactionJobDoneSink,
},
partition_files_source::{
catalog::{CatalogPartitionFilesSource, QueryRateLimiter},
Expand Down Expand Up @@ -92,16 +92,16 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.metric_registry),
config.shadow_mode,
);
let (compaction_jobs_source, commit, partition_done_sink) =
make_jobs_source_commit_partition_sink(config, Arc::clone(&scheduler));
let (compaction_jobs_source, commit, compaction_job_done_sink) =
make_jobs_source_commit_jobs_sink(config, Arc::clone(&scheduler));

Arc::new(Components {
compaction_job_stream: make_compaction_job_stream(config, compaction_jobs_source),
partition_info_source: make_partition_info_source(config),
partition_files_source: make_partition_files_source(config),
round_info_source: make_round_info_source(config),
partition_filter: make_partition_filter(config),
partition_done_sink,
compaction_job_done_sink,
commit,
ir_planner: make_ir_planner(config),
df_planner: make_df_planner(config),
Expand All @@ -116,27 +116,27 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
})
}

fn make_jobs_source_commit_partition_sink(
fn make_jobs_source_commit_jobs_sink(
config: &Config,
scheduler: Arc<dyn Scheduler>,
) -> (
Arc<dyn CompactionJobsSource>,
Arc<CommitToScheduler>,
Arc<dyn PartitionDoneSink>,
Arc<dyn CompactionJobDoneSink>,
) {
let compaction_jobs_source = ScheduledCompactionJobsSource::new(Arc::clone(&scheduler));

let commit = CommitToScheduler::new(Arc::clone(&scheduler));

let partition_done_sink = PartitionDoneSinkToScheduler::new(Arc::clone(&scheduler));
let compaction_job_done_sink = CompactionJobDoneSinkToScheduler::new(Arc::clone(&scheduler));

// compactors are responsible for error classification
// and any future decisions regarding graceful shutdown
let partition_done_sink: Arc<dyn PartitionDoneSink> = if config.all_errors_are_fatal {
Arc::new(partition_done_sink)
let compaction_job_done_sink: Arc<dyn CompactionJobDoneSink> = if config.all_errors_are_fatal {
Arc::new(compaction_job_done_sink)
} else {
Arc::new(ErrorKindPartitionDoneSinkWrapper::new(
partition_done_sink,
Arc::new(ErrorKindCompactionJobDoneSinkWrapper::new(
compaction_job_done_sink,
ErrorKind::variants()
.iter()
.filter(|kind| {
Expand All @@ -151,8 +151,8 @@ fn make_jobs_source_commit_partition_sink(
scheduler,
))
};
let partition_done_sink = Arc::new(LoggingPartitionDoneSinkWrapper::new(
MetricsPartitionDoneSinkWrapper::new(partition_done_sink, &config.metric_registry),
let compaction_job_done_sink = Arc::new(LoggingCompactionJobDoneSinkWrapper::new(
MetricsCompactionJobDoneSinkWrapper::new(compaction_job_done_sink, &config.metric_registry),
));

// Note: Place "not empty" wrapper at the very last so that the logging and metric wrapper work
Expand All @@ -177,7 +177,7 @@ fn make_jobs_source_commit_partition_sink(
(
compaction_jobs_source,
Arc::new(commit),
partition_done_sink,
compaction_job_done_sink,
)
}

Expand Down
6 changes: 3 additions & 3 deletions compactor/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use self::{
compaction_job_stream::CompactionJobStream, df_plan_exec::DataFusionPlanExec,
df_planner::DataFusionPlanner, divide_initial::DivideInitial, file_classifier::FileClassifier,
ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_done_sink::CompactionJobDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource,
post_classification_partition_filter::PostClassificationPartitionFilter,
round_info_source::RoundInfoSource, round_split::RoundSplit, scratchpad::ScratchpadGen,
Expand Down Expand Up @@ -58,8 +58,8 @@ pub struct Components {
pub partition_filter: Arc<dyn PartitionFilter>,
/// condition to avoid running out of resources during compaction
pub post_classification_partition_filter: Arc<dyn PostClassificationPartitionFilter>,
/// Records "partition is done" status for given partition.
pub partition_done_sink: Arc<dyn PartitionDoneSink>,
/// Records "compaction job is done" status for given partition.
pub compaction_job_done_sink: Arc<dyn CompactionJobDoneSink>,
/// Commits changes (i.e. deletion and creation).
pub commit: Arc<CommitToScheduler>,
/// Creates `PlanIR` that describes what files should be compacted and updated
Expand Down
69 changes: 31 additions & 38 deletions compactor/src/components/partition_done_sink/error_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,24 @@ use compactor_scheduler::{
CompactionJob, CompactionJobStatus, CompactionJobStatusResponse, CompactionJobStatusVariant,
ErrorKind as SchedulerErrorKind, Scheduler,
};
use data_types::PartitionId;

use crate::error::{DynError, ErrorKind, ErrorKindExt};

use super::PartitionDoneSink;
use super::CompactionJobDoneSink;

#[derive(Debug)]
pub struct ErrorKindPartitionDoneSinkWrapper<T>
pub struct ErrorKindCompactionJobDoneSinkWrapper<T>
where
T: PartitionDoneSink,
T: CompactionJobDoneSink,
{
kind: HashSet<ErrorKind>,
inner: T,
scheduler: Arc<dyn Scheduler>,
}

impl<T> ErrorKindPartitionDoneSinkWrapper<T>
impl<T> ErrorKindCompactionJobDoneSinkWrapper<T>
where
T: PartitionDoneSink,
T: CompactionJobDoneSink,
{
pub fn new(inner: T, kind: HashSet<ErrorKind>, scheduler: Arc<dyn Scheduler>) -> Self {
Self {
Expand All @@ -34,9 +33,9 @@ where
}
}

impl<T> Display for ErrorKindPartitionDoneSinkWrapper<T>
impl<T> Display for ErrorKindCompactionJobDoneSinkWrapper<T>
where
T: PartitionDoneSink,
T: CompactionJobDoneSink,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut kinds = self.kind.iter().copied().collect::<Vec<_>>();
Expand All @@ -46,17 +45,13 @@ where
}

#[async_trait]
impl<T> PartitionDoneSink for ErrorKindPartitionDoneSinkWrapper<T>
impl<T> CompactionJobDoneSink for ErrorKindCompactionJobDoneSinkWrapper<T>
where
T: PartitionDoneSink,
T: CompactionJobDoneSink,
{
async fn record(
&self,
partition: PartitionId,
res: Result<(), DynError>,
) -> Result<(), DynError> {
async fn record(&self, job: CompactionJob, res: Result<(), DynError>) -> Result<(), DynError> {
match res {
Ok(()) => self.inner.record(partition, Ok(())).await,
Ok(()) => self.inner.record(job, Ok(())).await,
Err(e) if self.kind.contains(&e.classify()) => {
let scheduler_error = match SchedulerErrorKind::from(e.classify()) {
SchedulerErrorKind::OutOfMemory => SchedulerErrorKind::OutOfMemory,
Expand All @@ -68,7 +63,7 @@ where
match self
.scheduler
.update_job_status(CompactionJobStatus {
job: CompactionJob::new(partition),
job: job.clone(),
status: CompactionJobStatusVariant::Error(scheduler_error),
})
.await?
Expand All @@ -79,7 +74,7 @@ where
}
}

self.inner.record(partition, Err(e)).await
self.inner.record(job, Err(e)).await
}
Err(e) => {
// contract of this abstraction,
Expand All @@ -95,17 +90,18 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use compactor_scheduler::create_test_scheduler;
use data_types::PartitionId;
use datafusion::error::DataFusionError;
use iox_tests::TestCatalog;
use iox_time::{MockProvider, Time};
use object_store::Error as ObjectStoreError;

use super::{super::mock::MockPartitionDoneSink, *};
use super::{super::mock::MockCompactionJobDoneSink, *};

#[test]
fn test_display() {
let sink = ErrorKindPartitionDoneSinkWrapper::new(
MockPartitionDoneSink::new(),
let sink = ErrorKindCompactionJobDoneSinkWrapper::new(
MockCompactionJobDoneSink::new(),
HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]),
create_test_scheduler(
TestCatalog::new().catalog(),
Expand All @@ -118,8 +114,8 @@ mod tests {

#[tokio::test]
async fn test_record() {
let inner = Arc::new(MockPartitionDoneSink::new());
let sink = ErrorKindPartitionDoneSinkWrapper::new(
let inner = Arc::new(MockCompactionJobDoneSink::new());
let sink = ErrorKindCompactionJobDoneSinkWrapper::new(
Arc::clone(&inner),
HashSet::from([ErrorKind::ObjectStore, ErrorKind::OutOfMemory]),
create_test_scheduler(
Expand All @@ -129,39 +125,36 @@ mod tests {
),
);

let cj_1 = CompactionJob::new(PartitionId::new(1));
let cj_2 = CompactionJob::new(PartitionId::new(2));
let cj_3 = CompactionJob::new(PartitionId::new(3));
let cj_4 = CompactionJob::new(PartitionId::new(4));

sink.record(
PartitionId::new(1),
cj_1.clone(),
Err(Box::new(ObjectStoreError::NotImplemented)),
)
.await
.expect("record failed");
sink.record(
PartitionId::new(2),
cj_2.clone(),
Err(Box::new(DataFusionError::ResourcesExhausted(String::from(
"foo",
)))),
)
.await
.expect("record failed");
sink.record(PartitionId::new(3), Err("foo".into()))
.await
.unwrap_err();
sink.record(PartitionId::new(4), Ok(()))
sink.record(cj_3, Err("foo".into())).await.unwrap_err();
sink.record(cj_4.clone(), Ok(()))
.await
.expect("record failed");

assert_eq!(
inner.results(),
HashMap::from([
(
PartitionId::new(1),
Err(String::from("Operation not yet implemented.")),
),
(
PartitionId::new(2),
Err(String::from("Resources exhausted: foo")),
),
(PartitionId::new(4), Ok(()),),
(cj_1, Err(String::from("Operation not yet implemented.")),),
(cj_2, Err(String::from("Resources exhausted: foo")),),
(cj_4, Ok(()),),
]),
);
}
Expand Down
Loading

0 comments on commit 68ab2c9

Please sign in to comment.