Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumption paused indefinitely with out of order commit enabled #304

Open
Gin2022Null opened this issue Nov 29, 2022 · 5 comments
Open
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@Gin2022Null
Copy link
Contributor

We use reactiveKafkaConsumerTemplate to receive messages, then Acknowledge the offset after processing the message. We enabled the out of order commit (maxDeferredCommits=250) and noticed the consumer paused indefinitely in certain situations.
stackoverflow

The events pattern are:

  1. There might be some network glitch or kafka server maintenance. RetriableCommitFailedException triggered
  2. Consumer pause with “Paused - commits are retrying”
  3. Consumer “Resume” and “Emitting records”. But there is no more “Async committing” log. (no message acknowledgement exception identify)
  4. After some “Emitting records” logs, Consumer pause with “Paused - too many deferred commits”
  5. No more “ConsumerEventLoop” log
  6. Rebalance fixes the issue. (We have 3 consumers deployed on 3 hosts, remove 1 host fix the issue)
reactor-kafka-1.3.13.jar
logging:
 level:
   reactor:
     kafka:
       receiver: DEBUG
maxDeferredCommits: 250
ConsumerConfig
auto.commit.interval.ms = 1000
auto.offset.reset = earliest
connections.max.idle.ms = 540000
enable.auto.commit = false
heartbeat.interval.ms = 1000
max.poll.interval.ms = 300000     
max.poll.records = 500
request.timeout.ms = 30000
session.timeout.ms = 10000

Logs:

11/24/22 6:50:06.386 AM DEBUG r.k.r.internals.ConsumerEventLoop Async committing: {
test-0=OffsetAndMetadata{offset=12206778, leaderEpoch=null, metadata=''}, 
test-1=OffsetAndMetadata{offset=12253822, leaderEpoch=null, metadata=''}
test-2=OffsetAndMetadata{offset=12257066, leaderEpoch=null, metadata=''}
test-3=OffsetAndMetadata{offset=12265134, leaderEpoch=null, metadata=''}}

No more “Async committing” after this

11/24/22 6:50:06.451 AM WARN r.k.r.internals.ConsumerEventLoop Commit failed with org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. Caused by: org.apache.kafka.common.errors.DisconnectException: null

11/24/22 6:50:06.452 AM WARN r.k.r.internals.ConsumerEventLoop Commit failed with exceptionorg.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets., retries remaining 99
…
11/24/22 6:50:06.452 AM WARN r.k.r.internals.Commit failed with exceptionorg.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets., retries remaining 93

11/24/22 6:50:06.486 DEBUG r.k.r.internals.ConsumerEventLoop -Paused - commits are retrying
11/24/22 6:50:06.987 DEBUG r.k.r.internals.ConsumerEventLoop -Resumed
11/24/22 6:50:07.387 DEBUG r.k.r.internals.ConsumerEventLoop -Emitting 1 records, requested now 1
11/24/22 6:50:07.387 DEBUG r.k.r.internals.ConsumerEventLoop -onRequest.toAdd 1, paused false
…

11/24/22 6:51:05.248 DEBUG r.k.r.internals.ConsumerEventLoop -Paused - too many deferred commits
11/24/22 6:51:05.248 DEBUG r.k.r.internals.ConsumerEventLoop -Consumer woken

No more “ConsumerEventLoop” log after this until rebalance

code detail:

consumeMessge() {
ReceiverOptions basicReceiverOptions = ReceiverOptions.create(
   consumerProperties)
   .maxDeferredCommits(250)
   .commitInterval(Duration.ofMillis(commitInterval))
   .subscription(topics);

reactiveKafkaConsumerTemplate=new ReactiveKafkaConsumerTemplate<>(basicReceiverOptions);

 return reactiveKafkaConsumerTemplate
     .receive()
     .publishOn(Schedulers.boundedElastic())
     .flatMap(x -> Mono.just(x)
              .delayElement(Duration.ofMillis(500),10))
     .flatMap(receiverRecord ->
            //process the record
             messageServiceImpl.process(receiverRecord)
             .doFinally(x -> {
               //ack offset
               log.info("MessageConsumer ACK offset={} ", receiverRecord.offset());
               receiverRecord.receiverOffset().acknowledge();
             })
             .subscribeOn(Schedulers.boundedElastic())
     )
     .....
}
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Nov 29, 2022
@Gin2022Null Gin2022Null changed the title Consumption stopped indefinitely with out of order commit enabled Consumption paused indefinitely with out of order commit enabled Nov 29, 2022
@almogtavor
Copy link

I'm experiencing the same issue. I also use out of order commits so the indefinite pause in consumption is probably from the same reason. It's a critical problem for us, and we're keenly looking for a resolution or workaround.

@almogtavor
Copy link

almogtavor commented Feb 3, 2024

I have asked a question at the subject related to the addition of .retryWhen & .repeat() to this issue. #214 (comment)

@almogtavor
Copy link

@garyrussell @artembilan WDYT? I'd love to hear your thoughts on this

@artembilan
Copy link
Contributor

Gary is not with our team any more, so don't expect any answers from him.
I'm not familiar with this out of order commit and have some other priorities to look into.
If you see what is wrong and how to fix, feel free to contribute it!
I will be more than happy to review it and merge.
Thanks for understanding!

@almogtavor
Copy link

@artembilan I'm not sure what's the bug exactly but if I'll find it I will contribute the fix. Do you you'll get the time to look at it at the following days? That's a pretty terrible bug because it requires a person to manually restart the pods when it happens so the consumption will resume.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

4 participants