Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JoinGroup failed: The coordinator is not aware of this member error during startup - Reactive Kafka consumer subscribe #324

Open
rguntu opened this issue Feb 16, 2023 · 7 comments
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) ❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@rguntu
Copy link

rguntu commented Feb 16, 2023

I have Reactive Kafka consumer and producer implementation in springboot project. Seeing the below error in logs as soon as I start the springboot application.

Errors seen the log:
JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group.
JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation

Expected Behavior

Reactive Kafka consumer should be able to join consumer group successfully.

Actual Behavior

23:15:00.825 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1676531700825
23:15:00.829 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Subscribed to topic(s): abc
23:15:01.472 [reactive-kafka-reactivekafkagroupid-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Resetting the last seen epoch of partition abc-0 to 20 since the associated topicId changed from null to ImfebTfnRHKMNhIlb0pPow
23:15:01.474 [reactive-kafka-reactivekafkagroupid-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Cluster ID: TQfWwyIUSBWvEe1gjJRhgQ
23:15:01.475 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Discovered group coordinator qtvr-akf103.tlv.lpnet.com:9092 (id: 2147483495 rack: null)
23:15:01.476 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] (Re-)joining group
23:15:02.113 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Request joining group due to: need to re-join with the given member-id
23:15:02.114 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] (Re-)joining group
23:15:02.330 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='consumer-reactivekafkagroupid-1-b650b53a-9b33-48a6-9aa1-cf6823923290', protocol='null'}
23:15:02.330 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response
23:15:02.331 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response
23:15:02.331 [reactive-kafka-reactivekafkagroupid-1] INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-reactivekafkagroupid-1, groupId=reactivekafkagroupid] (Re-)joining group

Steps to Reproduce

Below is my Reactive Kafka consumer code.
Tried to update various kafka configs but I am getting the errors mentioned in the log above during Spring boot application startup and the moment reactiveKafkaConsumerTemplate.subscribe() is being called. Any help appreciated.
Kafka config:
#spring.kafka.bootstrap-servers=localhost:29092,localhost:39092
spring.kafka.consumer.properties[max.poll.interval.ms]=500
spring.kafka.consumer.properties[session.timeout.ms]=15000
spring.kafka.consumer.max-poll-records=100
spring.kafka.consumer.heartbeat-interval=3000
##spring.kafka.properties[session.timeout.ms]=8000
#spring.kafka.properties[heartbeat.interval.ms]=3000

 @Override
    public void run(String... args) {
        if (kafkaAdminClient.verifyConnection()) {
            log.info("CommandLineRunner - consumeFakeConsumerDTO called");
            consumeEvents();
        }
private void consumeEvents() {
        reactiveKafkaConsumerTemplate
                .receive()
                .doOnError(e -> log.error("Predicate: {}", e.getMessage()))
                .doOnNext(record -> {
                    log.info("Processing: {}@{}", record.value(), record.offset());
                    record.receiverOffset().acknowledge();
                })
                .flatMap(this::processRecordMonoDefer)
                .retryWhen(Retry.backoff(retryTimes, Duration.ofSeconds(1)).transientErrors(true))
                .onErrorResume(error -> {
                    log.error("Something bad happened while consuming: {}", error.getMessage(), error);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }

Possible Solution

Your Environment

  • Kafka version: 2.8.0
  • Reactor version(s) used: reactor-kafka 1.3.10
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): JDK 17
  • OS and version (eg uname -a): MAC OS
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Feb 16, 2023
@garyrussell
Copy link
Contributor

garyrussell commented Feb 16, 2023

reactor-kafka does not use the spring.kafka... properties; those are boot properties for the non-reactive consumer and producer factories.

Please show your ReceiverOptions used to create the template.

The current reactor-kafka version is 1.3.16.

@garyrussell garyrussell added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Feb 16, 2023
@rguntu
Copy link
Author

rguntu commented Feb 16, 2023

@garyrussell Here is the screenshot for ReceiverOptions: Still seeing the same issue. Screenshot 2023-02-16 at 11 57 06 AM

@garyrussell
Copy link
Contributor

Don't use screen shots; show the actual code that builds them. That way I can try it locally so I can see what's wrong.

@rguntu
Copy link
Author

rguntu commented Feb 16, 2023

@garyrussell Thank you so much. here is the code. I am seeing the issue mentioned with non-local kafka bootsrap servers only. No issue with local kafka bootsrap servers and same config. Marked the same in the code.

private void consumeEvents() {
        reactiveKafkaConsumerTemplate
                .receive()
                .doOnError(e -> log.error("Predicate: {}", e.getMessage()))
                .doOnNext(record -> {
                    log.info("Processing: {}@{}", record.value(), record.offset());
                    record.receiverOffset().acknowledge();
                })
                .flatMap(this::processRecordMonoDefer)
                .retryWhen(Retry.backoff(retryTimes, Duration.ofSeconds(1)).transientErrors(true))
                .onErrorResume(error -> {
                    log.error("Something bad happened while consuming: {}", error.getMessage(), error);
                    return Mono.empty();
                })
                .repeat()
                .subscribe();
    }
  @Bean
    public ReceiverOptions<String, Event> kafkaReceiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092,localhost:39092"); //local kafka bootstrap servers
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "nonlocalkafka101:9092,nonlocalkafka102:9092"); //network(non-local) kafka bootstrap servers

        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "500");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "AMHHHH");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        Deserializer messagingDeserializer = new MessagingDeserializer<>();
        messagingDeserializer.configure(Map.of("value.deserializer.class", Event.class.getName()), false);

        ReceiverOptions<String, Event> basicReceiverOptions = ReceiverOptions.create(props)
                .withValueDeserializer(messagingDeserializer);

        return basicReceiverOptions.subscription(Collections.singletonList("abc1"));
    }

@garyrussell
Copy link
Contributor

Can you connect to those remote servers using the command line tools, e.g. kafka-console-consumer.sh?

If not, then my best guess is some problem with the broker/cluster configuration (e.g. advertised listeners).

Either way, you should look for errors in the server logs.

If it works fine with local brokers, I suggest you ask the wider Apache Kafka community, e.g. on Stack Overflow or on one of their mailing lists.

@Zane-XY
Copy link

Zane-XY commented Feb 17, 2023

The issue does not look like caused by reactor-Kafka. Like @garyrussell mentioned, you could verify this by using the same consumer properties with other client or command line tools. Other causes of this error may be like invalid group id, security settings, or Kafka client library version etc.

@dipali1234567
Copy link

@rguntu your issue resolved? If yes please can you post the solution?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) ❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

5 participants