Skip to content

Commit

Permalink
Merge pull request #8357 from influxdata/idpe-17789/rename-internal-v…
Browse files Browse the repository at this point in the history
…ariables

refactor(idpe-17789): rename internal variables
  • Loading branch information
wiedld committed Jul 28, 2023
2 parents 4f9c901 + cfcef35 commit 61b65a9
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 53 deletions.
2 changes: 1 addition & 1 deletion compactor/src/components/compaction_job_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::stream::BoxStream;
pub mod endless;
pub mod once;

/// Source for partitions.
/// Source for compaction jobs.
pub trait CompactionJobStream: Debug + Display + Send + Sync {
/// Create new source stream of compaction jobs.
///
Expand Down
29 changes: 14 additions & 15 deletions compactor/src/components/compaction_jobs_source/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ where
T: CompactionJobsSource,
{
async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await;
info!(n_partitions = partitions.len(), "Fetch partitions",);
if partitions.is_empty() {
warn!("No partition found");
let jobs = self.inner.fetch().await;
info!(n_jobs = jobs.len(), "Fetch jobs",);
if jobs.is_empty() {
warn!("No compaction job found");
}
partitions
jobs
}
}

Expand All @@ -68,26 +68,25 @@ mod tests {
// logs normal log message (so it's easy search for every single call) but also an extra warning
assert_eq!(
capture.to_string(),
"level = INFO; message = Fetch partitions; n_partitions = 0; \
\nlevel = WARN; message = No partition found; ",
"level = INFO; message = Fetch jobs; n_jobs = 0; \
\nlevel = WARN; message = No compaction job found; ",
);
}

#[tokio::test]
async fn test_fetch_some() {
let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let partitions = vec![p_1, p_2, p_3];
let cj_1 = CompactionJob::new(PartitionId::new(5));
let cj_2 = CompactionJob::new(PartitionId::new(1));
let cj_3 = CompactionJob::new(PartitionId::new(12));
let jobs = vec![cj_1, cj_2, cj_3];

let source =
LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(partitions.clone()));
let source = LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(jobs.clone()));
let capture = TracingCapture::new();
assert_eq!(source.fetch().await, partitions,);
assert_eq!(source.fetch().await, jobs,);
// just the ordinary log message, no warning
assert_eq!(
capture.to_string(),
"level = INFO; message = Fetch partitions; n_partitions = 3; ",
"level = INFO; message = Fetch jobs; n_jobs = 3; ",
);
}
}
6 changes: 3 additions & 3 deletions compactor/src/components/compaction_jobs_source/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ where
T: CompactionJobsSource,
{
async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await;
let jobs = self.inner.fetch().await;
self.partitions_fetch_counter.inc(1);
self.partitions_counter.inc(partitions.len() as u64);
partitions
self.partitions_counter.inc(jobs.len() as u64);
jobs
}
}

Expand Down
20 changes: 10 additions & 10 deletions compactor/src/components/compaction_jobs_source/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ use super::CompactionJobsSource;
/// A mock structure for providing [compaction jobs](CompactionJob).
#[derive(Debug)]
pub struct MockCompactionJobsSource {
partitions: Mutex<Vec<CompactionJob>>,
compaction_jobs: Mutex<Vec<CompactionJob>>,
}

impl MockCompactionJobsSource {
#[allow(dead_code)]
/// Create a new MockCompactionJobsSource.
pub fn new(partitions: Vec<CompactionJob>) -> Self {
pub fn new(jobs: Vec<CompactionJob>) -> Self {
Self {
partitions: Mutex::new(partitions),
compaction_jobs: Mutex::new(jobs),
}
}

/// Set CompactionJobs for MockCompactionJobsSource.
#[allow(dead_code)] // not used anywhere
pub fn set(&self, partitions: Vec<CompactionJob>) {
*self.partitions.lock() = partitions;
pub fn set(&self, jobs: Vec<CompactionJob>) {
*self.compaction_jobs.lock() = jobs;
}
}

Expand All @@ -35,7 +35,7 @@ impl std::fmt::Display for MockCompactionJobsSource {
#[async_trait]
impl CompactionJobsSource for MockCompactionJobsSource {
async fn fetch(&self) -> Vec<CompactionJob> {
self.partitions.lock().clone()
self.compaction_jobs.lock().clone()
}
}

Expand All @@ -55,10 +55,10 @@ mod tests {
let source = MockCompactionJobsSource::new(vec![]);
assert_eq!(source.fetch().await, vec![],);

let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let parts = vec![p_1, p_2, p_3];
let cj_1 = CompactionJob::new(PartitionId::new(5));
let cj_2 = CompactionJob::new(PartitionId::new(1));
let cj_3 = CompactionJob::new(PartitionId::new(12));
let parts = vec![cj_1, cj_2, cj_3];
source.set(parts.clone());
assert_eq!(source.fetch().await, parts,);
}
Expand Down
2 changes: 1 addition & 1 deletion compactor/src/components/compaction_jobs_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use compactor_scheduler::CompactionJob;
/// A source of partitions, noted by [`CompactionJob`](compactor_scheduler::CompactionJob), that may potentially need compacting.
#[async_trait]
pub trait CompactionJobsSource: Debug + Display + Send + Sync {
/// Get partition IDs.
/// Get compaction jobs. (For now, 1 job equals 1 partition ID).
///
/// This method performs retries.
///
Expand Down
28 changes: 14 additions & 14 deletions compactor/src/components/compaction_jobs_source/randomize_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ where
T: CompactionJobsSource,
{
async fn fetch(&self) -> Vec<CompactionJob> {
let mut partitions = self.inner.fetch().await;
let mut compaction_jobs = self.inner.fetch().await;
let mut rng = StdRng::seed_from_u64(self.seed);
partitions.shuffle(&mut rng);
partitions
compaction_jobs.shuffle(&mut rng);
compaction_jobs
}
}

Expand Down Expand Up @@ -72,46 +72,46 @@ mod tests {

#[tokio::test]
async fn test_fetch_some() {
let p_1 = CompactionJob::new(PartitionId::new(5));
let p_2 = CompactionJob::new(PartitionId::new(1));
let p_3 = CompactionJob::new(PartitionId::new(12));
let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()];
let cj_1 = CompactionJob::new(PartitionId::new(5));
let cj_2 = CompactionJob::new(PartitionId::new(1));
let cj_3 = CompactionJob::new(PartitionId::new(12));
let compaction_jobs = vec![cj_1.clone(), cj_2.clone(), cj_3.clone()];

// shuffles
let source = RandomizeOrderCompactionJobsSourcesWrapper::new(
MockCompactionJobsSource::new(partitions.clone()),
MockCompactionJobsSource::new(compaction_jobs.clone()),
123,
);
assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
vec![cj_3.clone(), cj_2.clone(), cj_1.clone(),],
);

// is deterministic in same source
for _ in 0..100 {
assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
vec![cj_3.clone(), cj_2.clone(), cj_1.clone(),],
);
}

// is deterministic with new source
for _ in 0..100 {
let source = RandomizeOrderCompactionJobsSourcesWrapper::new(
MockCompactionJobsSource::new(partitions.clone()),
MockCompactionJobsSource::new(compaction_jobs.clone()),
123,
);
assert_eq!(
source.fetch().await,
vec![p_3.clone(), p_2.clone(), p_1.clone(),],
vec![cj_3.clone(), cj_2.clone(), cj_1.clone(),],
);
}

// different seed => different output
let source = RandomizeOrderCompactionJobsSourcesWrapper::new(
MockCompactionJobsSource::new(partitions.clone()),
MockCompactionJobsSource::new(compaction_jobs.clone()),
1234,
);
assert_eq!(source.fetch().await, vec![p_2, p_3, p_1,],);
assert_eq!(source.fetch().await, vec![cj_2, cj_3, cj_1,],);
}
}
16 changes: 8 additions & 8 deletions compactor/src/components/partition_source/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,25 @@ mod tests {

#[tokio::test]
async fn test_fetch_by_id() {
let p_1 = PartitionBuilder::new(5).build();
let p_2 = PartitionBuilder::new(1).build();
let p_3 = PartitionBuilder::new(12).build();
let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()];
let source = MockPartitionSource::new(partitions);
let cj_1 = PartitionBuilder::new(5).build();
let cj_2 = PartitionBuilder::new(1).build();
let cj_3 = PartitionBuilder::new(12).build();
let compaction_jobs = vec![cj_1.clone(), cj_2.clone(), cj_3.clone()];
let source = MockPartitionSource::new(compaction_jobs);

assert_eq!(
source.fetch_by_id(PartitionId::new(5)).await,
Some(p_1.clone())
Some(cj_1.clone())
);
assert_eq!(
source.fetch_by_id(PartitionId::new(1)).await,
Some(p_2.clone())
Some(cj_2.clone())
);

// fetching does not drain
assert_eq!(
source.fetch_by_id(PartitionId::new(5)).await,
Some(p_1.clone())
Some(cj_1.clone())
);

// unknown table => None result
Expand Down
3 changes: 2 additions & 1 deletion compactor/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ pub async fn compact(
.map(|job| {
let components = Arc::clone(components);

// A root span is created for each partition. Later this can be linked to the
// A root span is created for each compaction job (a.k.a. partition).
// Later this can be linked to the
// scheduler's span via something passed through compaction_job_stream.
let root_span: Option<Span> = trace_collector
.as_ref()
Expand Down

0 comments on commit 61b65a9

Please sign in to comment.