RePause All Partitions After Rebalance if user paused any and requested #363
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Resolves #307
In order to achieve graceful shutdown during deployment, We applied the following strategy:
But when new consumers join the consumer group, after rebalance, old consumers wake up, if the newly assigned partitions are not in the pausedByUser, consumer start poll messages which gracefully shutdown are not fully achieved.
Add pauseAllAfterRebalance configuration
When a user pauses topics/partitions before rebalancing, the behavior depends on the value of pauseAllAfterRebalance.If it is set to False, the paused topics/partitions will remain paused after the rebalance. However, if it is set to True, all assigned topics/partitions will be paused after the rebalance.
Test:
ReceiverOptions.create(props).pauseAllAfterRebalance(Boolean.TRUE);
Consumer-1 assigned partitions: testTopic-2
Consumer-2 assigned partitions: testTopic-0, testTopic3-1
Consumer-2 Pause Topic/Partitions
Kafka Consumer Template status=PAUSE topic=testTopic, partition=0
Kafka Consumer Template status=PAUSE topic=testTopic, partition=1
Consumer-3 join - rebalance
Consumer-2 Revoke previously assigned partitions testTopic-0, testTopic-1
Assignment(partitions=[testTopic-2])}
testTopic-2 paused.