Skip to content

Commit

Permalink
Merge pull request #8326 from influxdata/idpe-17789/compaction-job-re…
Browse files Browse the repository at this point in the history
…naming

refactor(idpe-17789): renaming abstractions related to partitions source, to compaction jobs source
  • Loading branch information
wiedld committed Jul 27, 2023
2 parents e1626c3 + 78ef536 commit 7ac6c6d
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@ use compactor_scheduler::CompactionJob;
use futures::{stream::BoxStream, StreamExt};

use super::super::{
partition_files_source::rate_limit::RateLimit, partitions_source::PartitionsSource,
compaction_jobs_source::CompactionJobsSource, partition_files_source::rate_limit::RateLimit,
};
use super::PartitionStream;
use super::CompactionJobStream;

#[derive(Debug)]
pub struct EndlessPartititionStream<T>
pub struct EndlessCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
source: Arc<T>,
limiter: RateLimit,
}

impl<T> EndlessPartititionStream<T>
impl<T> EndlessCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
pub fn new(source: T) -> Self {
Self {
Expand All @@ -29,18 +29,18 @@ where
}
}

impl<T> Display for EndlessPartititionStream<T>
impl<T> Display for EndlessCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "endless({})", self.source)
}
}

impl<T> PartitionStream for EndlessPartititionStream<T>
impl<T> CompactionJobStream for EndlessCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
fn stream(&self) -> BoxStream<'_, CompactionJob> {
let source = Arc::clone(&self.source);
Expand Down Expand Up @@ -82,11 +82,11 @@ where
mod tests {
use data_types::PartitionId;

use super::{super::super::partitions_source::mock::MockPartitionsSource, *};
use super::{super::super::compaction_jobs_source::mock::MockCompactionJobsSource, *};

#[test]
fn test_display() {
let stream = EndlessPartititionStream::new(MockPartitionsSource::new(vec![]));
let stream = EndlessCompactionJobStream::new(MockCompactionJobsSource::new(vec![]));
assert_eq!(stream.to_string(), "endless(mock)");
}

Expand All @@ -97,7 +97,7 @@ mod tests {
CompactionJob::new(PartitionId::new(3)),
CompactionJob::new(PartitionId::new(2)),
];
let stream = EndlessPartititionStream::new(MockPartitionsSource::new(ids.clone()));
let stream = EndlessCompactionJobStream::new(MockCompactionJobsSource::new(ids.clone()));

// stream is stateless
for _ in 0..2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub mod endless;
pub mod once;

/// Source for partitions.
pub trait PartitionStream: Debug + Display + Send + Sync {
/// Create new source stream of compaction job.
pub trait CompactionJobStream: Debug + Display + Send + Sync {
/// Create new source stream of compaction jobs.
///
/// This stream may be endless.
fn stream(&self) -> BoxStream<'_, CompactionJob>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ use std::{fmt::Display, sync::Arc};
use compactor_scheduler::CompactionJob;
use futures::{stream::BoxStream, StreamExt};

use super::{super::partitions_source::PartitionsSource, PartitionStream};
use super::{super::compaction_jobs_source::CompactionJobsSource, CompactionJobStream};

#[derive(Debug)]
pub struct OncePartititionStream<T>
pub struct OnceCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
source: Arc<T>,
}

impl<T> OncePartititionStream<T>
impl<T> OnceCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
pub fn new(source: T) -> Self {
Self {
Expand All @@ -24,18 +24,18 @@ where
}
}

impl<T> Display for OncePartititionStream<T>
impl<T> Display for OnceCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "once({})", self.source)
}
}

impl<T> PartitionStream for OncePartititionStream<T>
impl<T> CompactionJobStream for OnceCompactionJobStream<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
fn stream(&self) -> BoxStream<'_, CompactionJob> {
let source = Arc::clone(&self.source);
Expand All @@ -49,11 +49,11 @@ where
mod tests {
use data_types::PartitionId;

use super::{super::super::partitions_source::mock::MockPartitionsSource, *};
use super::{super::super::compaction_jobs_source::mock::MockCompactionJobsSource, *};

#[test]
fn test_display() {
let stream = OncePartititionStream::new(MockPartitionsSource::new(vec![]));
let stream = OnceCompactionJobStream::new(MockCompactionJobsSource::new(vec![]));
assert_eq!(stream.to_string(), "once(mock)");
}

Expand All @@ -64,7 +64,7 @@ mod tests {
CompactionJob::new(PartitionId::new(3)),
CompactionJob::new(PartitionId::new(2)),
];
let stream = OncePartititionStream::new(MockPartitionsSource::new(ids.clone()));
let stream = OnceCompactionJobStream::new(MockCompactionJobsSource::new(ids.clone()));

// stream is stateless
for _ in 0..2 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,38 @@ use async_trait::async_trait;
use compactor_scheduler::CompactionJob;
use observability_deps::tracing::{info, warn};

use super::PartitionsSource;
use super::CompactionJobsSource;

#[derive(Debug)]
pub struct LoggingPartitionsSourceWrapper<T>
pub struct LoggingCompactionJobsWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
inner: T,
}

impl<T> LoggingPartitionsSourceWrapper<T>
impl<T> LoggingCompactionJobsWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
pub fn new(inner: T) -> Self {
Self { inner }
}
}

impl<T> Display for LoggingPartitionsSourceWrapper<T>
impl<T> Display for LoggingCompactionJobsWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "logging({})", self.inner)
}
}

#[async_trait]
impl<T> PartitionsSource for LoggingPartitionsSourceWrapper<T>
impl<T> CompactionJobsSource for LoggingCompactionJobsWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await;
Expand All @@ -52,17 +52,17 @@ mod tests {
use data_types::PartitionId;
use test_helpers::tracing::TracingCapture;

use super::{super::mock::MockPartitionsSource, *};
use super::{super::mock::MockCompactionJobsSource, *};

#[test]
fn test_display() {
let source = LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(vec![]));
let source = LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(vec![]));
assert_eq!(source.to_string(), "logging(mock)",);
}

#[tokio::test]
async fn test_fetch_empty() {
let source = LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(vec![]));
let source = LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(vec![]));
let capture = TracingCapture::new();
assert_eq!(source.fetch().await, vec![],);
// logs normal log message (so it's easy search for every single call) but also an extra warning
Expand All @@ -81,7 +81,7 @@ mod tests {
let partitions = vec![p_1, p_2, p_3];

let source =
LoggingPartitionsSourceWrapper::new(MockPartitionsSource::new(partitions.clone()));
LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(partitions.clone()));
let capture = TracingCapture::new();
assert_eq!(source.fetch().await, partitions,);
// just the ordinary log message, no warning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ use async_trait::async_trait;
use compactor_scheduler::CompactionJob;
use metric::{Registry, U64Counter};

use super::PartitionsSource;
use super::CompactionJobsSource;

const METRIC_NAME_PARTITIONS_FETCH_COUNT: &str = "iox_compactor_partitions_fetch_count";
const METRIC_NAME_PARTITIONS_COUNT: &str = "iox_compactor_partitions_count";

#[derive(Debug)]
pub struct MetricsPartitionsSourceWrapper<T>
pub struct MetricsCompactionJobsSourceWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
partitions_fetch_counter: U64Counter,
partitions_counter: U64Counter,
inner: T,
}

impl<T> MetricsPartitionsSourceWrapper<T>
impl<T> MetricsCompactionJobsSourceWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
pub fn new(inner: T, registry: &Registry) -> Self {
let partitions_fetch_counter = registry
Expand All @@ -45,19 +45,19 @@ where
}
}

impl<T> Display for MetricsPartitionsSourceWrapper<T>
impl<T> Display for MetricsCompactionJobsSourceWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "metrics({})", self.inner)
}
}

#[async_trait]
impl<T> PartitionsSource for MetricsPartitionsSourceWrapper<T>
impl<T> CompactionJobsSource for MetricsCompactionJobsSourceWrapper<T>
where
T: PartitionsSource,
T: CompactionJobsSource,
{
async fn fetch(&self) -> Vec<CompactionJob> {
let partitions = self.inner.fetch().await;
Expand All @@ -72,13 +72,15 @@ mod tests {
use data_types::PartitionId;
use metric::assert_counter;

use super::{super::mock::MockPartitionsSource, *};
use super::{super::mock::MockCompactionJobsSource, *};

#[test]
fn test_display() {
let registry = Registry::new();
let source =
MetricsPartitionsSourceWrapper::new(MockPartitionsSource::new(vec![]), &registry);
let source = MetricsCompactionJobsSourceWrapper::new(
MockCompactionJobsSource::new(vec![]),
&registry,
);
assert_eq!(source.to_string(), "metrics(mock)",);
}

Expand All @@ -90,8 +92,8 @@ mod tests {
CompactionJob::new(PartitionId::new(1)),
CompactionJob::new(PartitionId::new(12)),
];
let source = MetricsPartitionsSourceWrapper::new(
MockPartitionsSource::new(partitions.clone()),
let source = MetricsCompactionJobsSourceWrapper::new(
MockCompactionJobsSource::new(partitions.clone()),
&registry,
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,38 @@ use async_trait::async_trait;
use compactor_scheduler::CompactionJob;
use parking_lot::Mutex;

use super::PartitionsSource;
use super::CompactionJobsSource;

/// A mock structure for providing [partitions](CompactionJob).
/// A mock structure for providing [compaction jobs](CompactionJob).
#[derive(Debug)]
pub struct MockPartitionsSource {
pub struct MockCompactionJobsSource {
partitions: Mutex<Vec<CompactionJob>>,
}

impl MockPartitionsSource {
impl MockCompactionJobsSource {
#[allow(dead_code)]
/// Create a new MockPartitionsSource.
/// Create a new MockCompactionJobsSource.
pub fn new(partitions: Vec<CompactionJob>) -> Self {
Self {
partitions: Mutex::new(partitions),
}
}

/// Set CompactionJobs for MockPartitionsSource.
/// Set CompactionJobs for MockCompactionJobsSource.
#[allow(dead_code)] // not used anywhere
pub fn set(&self, partitions: Vec<CompactionJob>) {
*self.partitions.lock() = partitions;
}
}

impl std::fmt::Display for MockPartitionsSource {
impl std::fmt::Display for MockCompactionJobsSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "mock")
}
}

#[async_trait]
impl PartitionsSource for MockPartitionsSource {
impl CompactionJobsSource for MockCompactionJobsSource {
async fn fetch(&self) -> Vec<CompactionJob> {
self.partitions.lock().clone()
}
Expand All @@ -47,12 +47,12 @@ mod tests {

#[test]
fn test_display() {
assert_eq!(MockPartitionsSource::new(vec![]).to_string(), "mock",);
assert_eq!(MockCompactionJobsSource::new(vec![]).to_string(), "mock",);
}

#[tokio::test]
async fn test_fetch() {
let source = MockPartitionsSource::new(vec![]);
let source = MockCompactionJobsSource::new(vec![]);
assert_eq!(source.fetch().await, vec![],);

let p_1 = CompactionJob::new(PartitionId::new(5));
Expand Down
Loading

0 comments on commit 7ac6c6d

Please sign in to comment.