-
Notifications
You must be signed in to change notification settings - Fork 226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rebalancing always waits until maxDelayRebalance with AckMode.EXACTLY_ONCE #371
Labels
❓need-triage
This issue needs triage, hasn't been looked at by a team member yet
Comments
reactorbot
added
the
❓need-triage
This issue needs triage, hasn't been looked at by a team member yet
label
Nov 13, 2023
damn1kk
pushed a commit
to damn1kk/reactor-kafka
that referenced
this issue
Jan 21, 2024
…rebalancing (reactor#371) CommitBatch is not used for AckMode.ExactlyOnce, but increasing the number of uncommitted messages in it causes an infinite loop during rebalancing since they never decrease. Fixes reactor#371.
damn1kk
pushed a commit
to damn1kk/reactor-kafka
that referenced
this issue
Jan 21, 2024
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce. CommitBatch is not used for AckMode.ExactlyOnce, but increasing the number of uncommitted messages in it causes an infinite loop during rebalancing since they never decrease. Fixes reactor#371.
damn1kk
pushed a commit
to damn1kk/reactor-kafka
that referenced
this issue
Jan 21, 2024
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce. Increasing the number of uncommitted messages in CommitBatch causes an infinite loop during rebalancing because they never decrease. Fixes reactor#371.
damn1kk
added a commit
to damn1kk/reactor-kafka
that referenced
this issue
Jan 22, 2024
Don't increment uncommitted messages in CommitBatch for AckMode.ExactlyOnce. Increasing the number of uncommitted messages in CommitBatch causes an infinite loop during rebalancing because they never decrease. Fixes reactor#371.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello, I have a topic with multiple partitions and multiple app instances. I use AckMode.EXACTLY_ONCE with transactional sender.
Rebalance always waits until maxDelayRebalance.
I get a repeated message in the log:
reactor.kafka.receiver.internals.ConsumerEventLoop - Rebalancing; waiting for N records in pipeline
Similar to the issue discussed here in the comments , but I have AckMode.EXACTLY_ONCE.
I think the problem is that the
inPipeline
variable changes in different objects.isPipeline
decreases in object created here:reactor-kafka/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java
Lines 133 to 135 in 5436ae9
but increases in another object here:
reactor-kafka/src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java
Line 377 in 5436ae9
So variable
inPipeline
is always positive and the condition here is always waits until maxDelayRebalance:reactor-kafka/src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java
Lines 170 to 184 in 5436ae9
Expected Behavior
Rebalancing should complete after all read messages have been processed and the transaction has committed.
Actual Behavior
Rebalancing always waits until maxDelayRebalance.
Example to Reproduce
https://github.com/damn1kk/kafka-rebalance-issue/blob/master/src/test/java/KafkaRebalanceTest.java
Environment
The text was updated successfully, but these errors were encountered: