Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply backpressure per partition instead of the entire assignment #540

Open
gabrielgiussi opened this issue Jan 26, 2023 · 6 comments
Open
Labels
enhancement New feature or request

Comments

@gabrielgiussi
Copy link

Currently when the BrokerPollSystem considers there is too much work pending already it pauses all the partitions assigned.

I think we could improve parallelization by applying backpressure per partition instead, and would be specially helpful for the case described in the KIP-41

Prefetching is skipped when there are enough records already available from any partition to satisfy the next call to poll(). When this number dips below max.poll.records, we fetch all partitions as in the current implementation. The only downside to this approach is that it could lead to some partitions going unconsumed for an extended amount of time when there is a large imbalance between the partition's respective message rates. For example, suppose that a consumer with max messages set to 1 fetches data from partitions A and B. If the returned fetch includes 1000 records from A and no records from B, the consumer will have to process all 1000 available records from A before fetching on partition B again.

I'm considering the following hypothetical scenario

  • Topic with 2 partitions
  • Imbalance in message rates: each poll will fetch 1000 records from p0 and 10 from p1.
  • Average processing time: 500 millis
  • max.poll.records = 10
  • maxConcurrency = 2
  • Partition ordering in parallel consumer

In this scenario we won't be achieving good parallelization since the partition ordering will force us to process all the records from partition 0 in serial order, which will lead us to have one thread idle most of the time until the last poll (corresponding to a single fetch of 1000 and 10 records from p0 and p1 respectively) gets records from both partitions.
Of course increasing max.poll.records will reduce the probability of this happening, for example if we were using max.poll.records=501 the third poll will be already be able to parallelize work. And being the poll loop being detached from the processing we don't incur in the risk of being removed from the group by increasing the amount of work until the next poll. However this requires a better understanding of actual message rates and more fine tuning, and a good configuration of max.poll.records today could not work well in the future if the rates change.

I think applying backpressure per partition is a more robust way to handle this scenario, which may not be a common one but I don't see this affecting other scenarios, the only disadvantage being a more complex implementation since we need to track load per partition instead of global load as it is today.
I've focused mainly in the partition ordering for my analysis and I'm not sure if the proposal may have a negative impact on the other two ordering modes, my initial though is that it should be ok (the backpressure won't have anything to do with the ordering since it can only be applied per partition, but it will benefit mostly the partition ordering).

@vigneshwaranr
Copy link

I also need this feature!

@rkolesnev
Copy link
Member

@gabrielgiussi
That is an interesting problem - my main concern is not even the complexity in tracking load per partition, but in how to poll specific partitions using single Consumer instance - for example above we would somehow need to poll only P1 while P0 is paused due to back-pressure.
On the surface - I do not think that is possible with current consumer protocol / API - but i haven't done a deep dive yet on this.

It is possible to tune for that scenario though - I think small fetch size per partition, small buffer in parallel consumer should work in scenario with slow(-ish) processing - as in your example above.
I had added additional Parallel Consumer options for controlling dynamic load factor and internal buffer size to help with this scenario and some suggestions in Partition Ordering section in Readme
In a nutshell due to slow processing we can afford to poll only 5-10 messages at a time from each partition - given with 500ms per message - 10 messages will take 5 seconds - so polls wont be too frequent.
That will ensure that if only P0 has messages - we dont poll too many - and poll again in 5 seconds - if messages arrive to P1 - we will grab them then - so not having to drain a big 1000 buffer from single partition before checking for more messages.

The few parameters to tune in that case would be:
max.partition.fetch.bytes - Kafka Consumer config property - you need to know approximate message size for it
messageBufferSize - ParallelConsumer property (or alternatively min/max load factor) - added in #682 - to set internal Parallel Consumer buffer to numPartitions*messagesPerFetch

Note that max.poll.records doesnt help here - as it only controls handover of records from Kafka Consumer - not actual fetch from brokers - so it will still fetch 1000 from P0 and 10 from P1 - and will only give records from P1 to PC - after all 1000 from P0 is drained from Kafka Consumer to Parallel Consumer.

@rkolesnev rkolesnev added the enhancement New feature or request label Feb 22, 2024
@yura-arab4uk
Copy link

+1, we really need this feature in our project too, which works with multi topics, so one topic messages affect the other topic (messages are stuck until the next fetch) in case with slow processing, per partition back pressure should solve this.

@rkolesnev
Copy link
Member

rkolesnev commented May 29, 2024

@yura-arab4uk - thanks for your feedback.
I do not think we have a good implementation solution for this problem - only way to accomplish this per-partition back-pressure. Consumer protocol really doesn't allow for specific partition polls without switching from subscriptions to direct assignments - but with direct assignments - auto-rebalancing is affected.

Generally as per my earlier comment - there are consumer configuration options to make sure that maximum fetch from single partition is smaller than overall fetch size - that ensures that data from all subscribed partitions is polled if its available.
In addition tuning the buffer size - that controls when back-pressure kicks in - so that buffer is only holding number of seconds worth of records should help.
For example if 1 record takes 1 second to process - with partition based ordering and 10 partitions / concurrency - buffer can be set to ~100 messages - and similarly max fetch set to 100 messages with partition fetch size to 10 messages.
That way when messages are processed at full speed - 1 poll returns 100 messages split evenly and is enough to provide load for 10 seconds of processing - so it will still not poll too often (roughly poll kafka every 10 seconds).
At the same time in worst case scenario of data imbalance - buffer will be filled from 1 partition and will drain within at most 100 seconds (actually it will start polling again when it drops below 100 size) - so there won't be extended period when 1 partition is blocking all other processing.

Similar logic applies to key based ordering / partitions on multiple topics as well.

The size of buffer, size of overall single fetch and size of single partition fetch should all be considered and set as a combination in respect of processing time and concurrency balanced towards not having too many messages fetched from single partition, single poll and not to have too high buffer drain time.

Of course it is not fully ideal in that if processing time fluctuates and for whatever reason messages from 1 topic or partition that have different processing logic / downstream system than messages from other partition / topic are suddenly processing slower than usual - it still can cause some bottlenecks.

Refer to this section in Readme for specific configuration options to tune - 10.3. Ordered by Partition

In cases when single Parallel Consumer instance is processing messages from multiple topics and message processing time is significantly different for messages from different topics - it may be worth looking into having separate Parallel Consumer instances per topic / group of topics with similar processing time. I.e. have Parallel Consumer instance for 'fast' messages and separate one for 'slow' messages.

@yura-arab4uk
Copy link

yura-arab4uk commented May 29, 2024

@rkolesnev thank you for the response with suggestion, can we pause specific partition when it hits its message buffer size and resume when the in memory message number drops?

@rkolesnev
Copy link
Member

rkolesnev commented May 30, 2024

@yura-arab4uk - actually i had a look and KafkaConsumer does support selective pausing of partitions in the subsciption.
That changes things and does make per partition back-pressure a possibility.

So instead of tracking overall buffer size we could do per partition buffer size - it should be technically possible to track the per partition counters and then apply pause / resume logic per partition without impacting rebalancing logic.
That would allow to use larger buffer without having to worry about drain time.

The maximum per partition fetch size will still need to be tuned - but that will be mostly to avoid overshooting per partition buffers too much / guard against OOMs rather than processing bottlenecking due to data imbalance in buffers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants