Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behaviour of ReceiverOptions.addAssignListener changed #334

Open
gorkemaygul opened this issue Apr 5, 2023 · 3 comments
Open

Behaviour of ReceiverOptions.addAssignListener changed #334

gorkemaygul opened this issue Apr 5, 2023 · 3 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@gorkemaygul
Copy link

gorkemaygul commented Apr 5, 2023

On version 1.3.13, after rebalance operation, addAssignListener callback is called only if a partition is assigned to a consumer. So, when we have more consumers than partition size, addAssignListener callback is only triggering on the instances with partition assignment.

After version 1.3.15 it's called(with empty list) even when there is no assignment to the consumer. It is also working like this on version 1.3.17.

In our case, we had a logic inside that method and we made a fix to handle this case.
I didn't see any changes about this on 1.3.15 release notes so I'm wondering if it is working as intended now or it is a bug?

Expected Behavior

When we have more consumers(with same group-id) than topic partitions, addAssignListener callback should be triggered only on partition assigned consumers.

Actual Behavior

When we have more consumers(with same group-id) than partitions, addAssignListener callback is triggering on every consumer(With empty partition list) even if the consumer is not assigned to a partition.

Steps to Reproduce

I have a topic with 1 partition and I'm running multiple consumers with same group-id. When I start first consumer instance addAssignListener callback is called with [topic-0] . Then I start a new instance and after rebalance let's say first instance is assigned again to partition, revoke listener and assign listener is called on the first instance as expected.

Previously(on version 1.3.13), assignListener callback wasn't triggering on 2nd instance since there is no partitions left to consume, however after version 1.3.15, it is triggering with an empty array.

 Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        ReceiverOptions<String, String> receiverOptions = ReceiverOptions.create(props);

        receiverOptions.subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> {
                    log.info("Partitions assigned <{}>.", partitions);
                   //...
                    log.info("Initialization completed for partitions <{}>.",
                            partitions);
                })
                .addRevokeListener(partitions -> {
                    log.info("onPartitionsRevoked to be executed on context {}", partitions);
                    //...
                    log.info("onPartitionsRevoked executed on context {}", partitions);
                });

        var kafkaReceiver = KafkaReceiver.create(receiverOptions);
        var flux = kafkaReceiver.receive();

        flux.subscribe(reactiveSubscriber);
        log.info("Pipeline started.");

Your Environment

  • Reactor version(s) used: 1.3.17 (It occured after version 1.3.15)
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): 17
  • OS and version (eg uname -a):
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Apr 5, 2023
@garyrussell
Copy link
Contributor

This was not an intentional change; it was a side effect of the fix for #305

However, I believe the current behavior is correct; otherwise an application would have no indication that it was running with an idle consumer.

@rasgele
Copy link

rasgele commented Apr 6, 2023

The revoke listener is not called if onAssign callback was called with an empty collection. Hence new behavior removed the symmetry.
With that perspective(which makes sense), now the application has no indication that it stopped running with an idle consumer as well. Should it call onRevoke in case onAssign was called with empty list of partitions?

@garyrussell
Copy link
Contributor

That is because the kafka-clients does not call the ConsumerRebalanceListener in that case (if none were previously assigned), which makes sense - if you weren't assigned any partitions, why do you need to be told that nothing was revoked? In any case, that is out of this project's control.

In my opinion, the listeners should be a simple pass-through of the CRL calls.

no indication that it stopped running with an idle consumer

revocation occurs under 2 circumstances - a rebalance, or when the Disposable is disposed (or the consumer is stopped); in the first case, you don't need a notification because you have no assignments; in the second case, it means you stopped the app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

4 participants