Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer does not rejoin group after heartbeat timeout #336

Open
62mkv opened this issue Apr 14, 2023 · 7 comments
Open

Consumer does not rejoin group after heartbeat timeout #336

62mkv opened this issue Apr 14, 2023 · 7 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@62mkv
Copy link

62mkv commented Apr 14, 2023

Expected Behavior

Actual Behavior

We have a Reactive Spring Boot application that employs "reactor-kafka" for Kafka consumers and producers.

we use 1 KafkaReceiver per topic, that is subscribed to and kept in a Spring bean field.

I observe that sometimes, some or all of the underlying Consumer-s just stop with an error message as follows:

"[Consumer clientId=consumer-my-service-2-11, groupId=my-service-2] Member consumer-my-service-2-11-2ebeee54-566c-4ae8-ac43-1d5710fee1fa sending LeaveGroup request to coordinator 192.168.0.224:14493 (id: 2147483619 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records."

(this is the last message in the log thus far; the application lives happily for a day already, after all 11 of consumers have stuck in this limbo; topic is consumed by other pods)

Regardless of what the error says, should not consumer still be restarted by the library/Kafka internals? Or should it be application author's responsibility to somehow track this state and react accordingly (for example, by implementing liveness health check around this somehow)?

Steps to Reproduce

Possible Solution

Your Environment

  • Reactor version(s) used: 1.0.11
  • Other relevant libraries versions (eg. netty, ...): reactor-kafka: 1.3.17
  • JVM version (java -version): 11.0.16.1
  • OS and version (eg uname -a): Linux x64
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Apr 14, 2023
@62mkv 62mkv changed the title Consumer does not rejoin group after heartbeat errors Consumer does not rejoin group after heartbeat timeout Apr 14, 2023
@artembilan
Copy link
Contributor

Looks like this is this your SO question, too: https://stackoverflow.com/questions/76015312/how-to-properly-deal-with-zombie-kafka-consumers-in-reactive-spring-boot-appli

@62mkv
Copy link
Author

62mkv commented Apr 14, 2023

this was originally asked on SO here: https://stackoverflow.com/questions/76015312/how-to-properly-deal-with-zombie-kafka-consumers-in-reactive-spring-boot-appli but as I read the code afterwards, it feels more and more like a bug :(

@artembilan
Copy link
Contributor

I see. Any thoughts about the possible fix?
Or share, please, with us what part of the project code you think is producing such a bug?

@62mkv
Copy link
Author

62mkv commented Apr 14, 2023 via email

@garyrussell
Copy link
Contributor

You need to add retry (and possibly repeat) to the pipeline: https://projectreactor.io/docs/kafka/release/reference/#_error_handling_2

@62mkv
Copy link
Author

62mkv commented Apr 17, 2023

@garyrussell I see. Thanks so much for your help!

I will give it a try and close the ticket when I can confirm it's no longer manifesting.

May I also ask about a different thing: how useful would it be, to have a partition revoke handler commit offsets?

        var receiverOptions = ReceiverOptions.create(getConsumerProperties())
                                             .commitInterval(DEFAULT_COMMIT_INTERVAL)
                                             .addAssignListener(this::handlePartitionsAssignment)
                                             .addRevokeListener(this::handlePartitionsRevoking)
                                             .subscription(activeDestinations);
...
    @SneakyThrows(IllegalAccessException.class)
    private void handlePartitionsRevoking(Collection<ReceiverPartition> revokedPartitions) {
        var consumer = (Consumer<Object, Object>) FieldUtils.readField(kafkaReceiver.consumerHandler(), "consumer", true);

        try {
            consumer.commitSync(latestAcks, Duration.ofMillis(1000));
        } catch (Exception e) {
            log.warn("Ignored error on partition revoke", e);
        }
    }

It seems that the documentation says that downstream consumer should not be concerned with this:

All acknowledged offsets are committed when partitions are revoked during rebalance and when the receive Flux is terminated

So this quoted code in the handler, presented above, is some legacy code and could/should be removed? Or it might be needed still? We have WakeupException inside this handler occasionally so I thought I'd ask some experts..

@garyrussell
Copy link
Contributor

I don't know what latestAcks is there but, indeed, the CommitEvent is run when partitions are revoked so any acknowledged records will be committed. See ConsumerEventLoop.onPartitionsRevoked:

private void onPartitionsRevoked(Collection<TopicPartition> partitions) {
log.debug("onPartitionsRevoked {}", partitions);
if (!partitions.isEmpty()) {
// It is safe to use the consumer here since we are in a poll()
if (ackMode != AckMode.ATMOST_ONCE) {
commitEvent.runIfRequired(true);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

4 participants