Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement the OrderedBatchProducer #4134

Merged
merged 27 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
724a4a6
feat: implement the `OrderedBatchProducer`
WenyXu Jun 12, 2024
016d793
test: add test of cancel safety
WenyXu Jun 12, 2024
dceabaf
chore: apply suggestions from CR
WenyXu Jun 12, 2024
6f1545e
chore: apply suggestions from CR
WenyXu Jun 12, 2024
a1bc532
refactor: simplify the `BackgroundProducerWorker`
WenyXu Jun 13, 2024
23b0951
feat: implement the OrderedBatchProducer v2
WenyXu Jun 13, 2024
1fbea06
refactor: switch to `OrderedBatchProducer`
WenyXu Jun 12, 2024
51bb93b
chore: rename to `MAX_FLUSH_QUEUE_SIZE`
WenyXu Jun 12, 2024
df600a0
refactor: switch to `OrderedBatchProducerV2`
WenyXu Jun 13, 2024
605bcdb
refactor: remove `OrderedBatchProducerV1`
WenyXu Jun 13, 2024
ac898ed
test: add tests
WenyXu Jun 13, 2024
70bc87a
refactor: make config configurable
WenyXu Jun 13, 2024
9db1dde
refactor: minor refactor
WenyXu Jun 13, 2024
a95186a
chore: remove unused code
WenyXu Jun 13, 2024
b414bf0
chore: remove `benchmarks` crate
WenyXu Jun 13, 2024
cc0c8fc
chore: update config doc
WenyXu Jun 13, 2024
a8684db
chore: remove unused comment
WenyXu Jun 14, 2024
710378d
refactor: refactor client registry
WenyXu Jun 14, 2024
7d3a377
refactor: rename `max_batch_size` to `max_batch_bytes`
WenyXu Jun 14, 2024
2ffa89c
chore: use constant value
WenyXu Jun 17, 2024
4931fa6
chore: ensure serialized meta < ESTIMATED_META_SIZE
WenyXu Jun 17, 2024
5246dbd
chore: apply suggestions from CR
WenyXu Jun 17, 2024
44eda34
chore: remove the `CHANNEL_SIZE`
WenyXu Jun 17, 2024
e3c9b4d
chore: apply suggestions from CR
WenyXu Jun 18, 2024
66c0e36
fix: ensure serialized meta < ESTIMATED_META_SIZE
WenyXu Jun 18, 2024
57a3ad8
chore: apply suggestions from CR
WenyXu Jun 18, 2024
826e6d9
chore: apply suggestions from CR
WenyXu Jun 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: switch to OrderedBatchProducerV2
  • Loading branch information
WenyXu committed Jun 17, 2024
commit df600a06a0533fc27dd984d595c00b051c7c6784
26 changes: 16 additions & 10 deletions src/log-store/src/kafka/background_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use rskafka::client::producer::ProducerClient;
use rskafka::record::Record;
use snafu::ResultExt;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
use tokio::sync::oneshot;

use crate::error::{self, Result};

Expand Down Expand Up @@ -183,7 +183,7 @@ impl BackgroundProducerWorker {
}

pub(crate) struct OrderedBatchProducer {
sender: Mutex<Sender<ProduceRequest>>,
sender: Sender<ProduceRequest>,
/// Used to control the [`BackgroundProducerWorker`].
running: Arc<AtomicBool>,
/// The handle of [`BackgroundProducerWorker`].
Expand All @@ -196,6 +196,16 @@ impl Drop for OrderedBatchProducer {
}
}

pub(crate) struct ProduceResultHandle {
receiver: oneshot::Receiver<ProduceResultReceiver>,
}

impl ProduceResultHandle {
pub(crate) async fn wait(self) -> Result<Vec<i64>> {
self.receiver.await.unwrap().wait().await
}
}

impl OrderedBatchProducer {
pub(crate) fn new(
client: Arc<dyn ProducerClient>,
Expand All @@ -218,26 +228,22 @@ impl OrderedBatchProducer {
};

Self {
sender: Mutex::new(tx),
sender: tx,
running,
handle: tokio::spawn(async move { worker.run().await }),
}
}

pub(crate) async fn produce(
&self,
batch: Vec<Record>,
) -> Result<oneshot::Receiver<ProduceResultReceiver>> {
pub(crate) async fn produce(&self, batch: Vec<Record>) -> Result<ProduceResultHandle> {
let receiver = {
let (tx, rx) = oneshot::channel();
let sender = self.sender.lock().await;
sender
self.sender
.send(ProduceRequest { batch, sender: tx })
.await
.expect("worker panic");
rx
};

Ok(receiver)
Ok(ProduceResultHandle { receiver })
}
}
13 changes: 2 additions & 11 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ use crate::kafka::util::record::{
};
use crate::metrics;

/// The max flush queue size.
pub(crate) const MAX_FLUSH_QUEUE_SIZE: usize = 512;

/// A log store backed by Kafka.
#[derive(Debug)]
pub struct KafkaLogStore {
Expand Down Expand Up @@ -77,10 +74,8 @@ impl KafkaLogStore {
})?;
let producer_registry = ProducerRegistry::new(
client,
config.linger,
config.max_batch_size.as_bytes() as usize,
config.compression,
MAX_FLUSH_QUEUE_SIZE,
);

Ok(Self {
Expand Down Expand Up @@ -205,17 +200,13 @@ impl LogStore for KafkaLogStore {
for (region_id, records) in region_grouped_records {
region_ids.push(region_id);
let producer = region_grouped_producers.get(&region_id).unwrap();
let mut receivers = Vec::with_capacity(records.len());
for record in records {
receivers.push(producer.produce(record).await?);
}
region_grouped_result_receivers.push(receivers)
region_grouped_result_receivers.push(producer.produce(records).await?)
}

let region_grouped_offsets = try_join_all(
region_grouped_result_receivers
.into_iter()
.map(|handles| try_join_all(handles.into_iter().map(|handle| handle.wait()))),
.map(|handle| handle.wait()),
)
.await?;

Expand Down
20 changes: 5 additions & 15 deletions src/log-store/src/kafka/producer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;

use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use snafu::ResultExt;
use store_api::logstore::provider::KafkaProvider;
use tokio::sync::RwLock;

use crate::error;
use crate::error::Result;
use crate::kafka::producer::OrderedBatchProducer;
use crate::kafka::background_producer::OrderedBatchProducer;

pub type OrderedBatchProducerRef = Arc<OrderedBatchProducer<RecordAggregator>>;
pub type OrderedBatchProducerRef = Arc<OrderedBatchProducer>;

// Each topic only has one partition for now.
// The `DEFAULT_PARTITION` refers to the index of the partition.
Expand All @@ -39,12 +37,9 @@ pub(crate) const MIN_FLUSH_BATCH_SIZE: usize = 4 * 1024;
/// The registry or [OrderedBatchProducer].
pub struct ProducerRegistry {
registry: RwLock<HashMap<Arc<KafkaProvider>, OrderedBatchProducerRef>>,

client: rskafka::client::Client,
linger: Duration,
aggregator_batch_size: usize,
compression: Compression,
flush_queue_size: usize,
}

impl Debug for ProducerRegistry {
Expand All @@ -56,19 +51,15 @@ impl Debug for ProducerRegistry {
impl ProducerRegistry {
pub fn new(
client: rskafka::client::Client,
linger: Duration,
aggregator_batch_size: usize,
compression: Compression,
flush_queue_size: usize,
) -> Self {
let aggregator_batch_size = aggregator_batch_size.max(MIN_FLUSH_BATCH_SIZE);
Self {
registry: RwLock::new(HashMap::new()),
client,
linger,
aggregator_batch_size,
compression,
flush_queue_size,
}
}

Expand All @@ -89,13 +80,12 @@ impl ProducerRegistry {
partition: DEFAULT_PARTITION,
})?;

let aggregator = RecordAggregator::new(self.aggregator_batch_size);
let producer = OrderedBatchProducer::new(
aggregator,
Arc::new(partition_client),
self.linger,
self.compression,
self.flush_queue_size,
128,
64,
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
self.aggregator_batch_size,
);

Ok(Arc::new(producer))
Expand Down