Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facing Constant Rebalancing and not becoming Stable #308

Open
VijayIndia opened this issue Dec 29, 2022 · 2 comments
Open

Facing Constant Rebalancing and not becoming Stable #308

VijayIndia opened this issue Dec 29, 2022 · 2 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@VijayIndia
Copy link

i)I am able to receive CommitResponse as successful, whereas Heartbeat request is sent to Kafka , whereas No Heartbeat response is being received , not even receiving any Failure Heartbeat response.
ii)I am able to process each Kafka Message consumed in 5 ms, on average I am consuming around 400,000 messages/min by 100 kubernetes pods.

Expected Behavior

Kafka Consumer shouldn't rebalance

Actual Behavior

Kafka Consumer is constantly rebalancing and not at all becoming stable for a week

Steps to Reproduce

      Properties props = new Properties();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CommentedbootStrapServersUrl");
      props.put(ConsumerConfig.GROUP_ID_CONFIG,"iro-groupid-1");
      props.put(ConsumerConfig.CLIENT_ID_CONFIG,"client-1");
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
      props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,360000);
      props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,300500);
      props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,30500);
      props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,300000);
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,5);
      props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");

ReceiverOptions<String,String> receiverOptions = ReceiverOptions.<String,String>create(props)
              .addAssignListener(partitions ->LOGGER.info("onPartitionAssigned {}",partitions))
              .addRevokeListener(partitions -> LOGGER.info("onPartitionRevoked {}",partitions))
              .commitRetryInterval(Duration.ofMillis(3000))
              .commitBatchSize(3)
              .commitInterval(Duration.ofMillis(2000))
              .maxDelayRebalance(Duration.ofSeconds(60))
              .subscription(Collections.singleton("commentedTopicName"));

      Flux<ReceiverRecord<String,String>> inboundFlux = KafkaReceiver.create(receiverOptions)
              .receive()
              .onErrorContinue((throwable,o) -> {
                  LOGGER.error("Error while consuming in this pod");
              })
              .retryWhen(Retry.backoff(3, Duration.ofSeconds(3)).transientErrors(true))
              .repeat();

              try {
                   inboundFlux
                           .doOnNext(kafkaEvent -> {
                                           callBusinessLogic(kafkaEvent);
                           })
                           .doOnError(e->e.printStackTrace())
                           .subscribe();
               } catch (Exception e){
                   e.printStackTrace();
               }

Possible Solution

Your Environment

  • Reactor version(s) used: 1.3.12
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): jdk 17
  • OS and version (eg uname -a): Linux
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Dec 29, 2022
@VijayIndia
Copy link
Author

Attached Consumer Logs for Reference
Consumerlogs.txt

@VijayIndia
Copy link
Author

I have already tried using PublishOn and runOn
Scheduler schedulers = Schedulers.newBoundedElastic(5,5,"PublishedThread");

                      inboundFlux
                       .publishOn(schedulers)
                       .parallel()
                       .runOn(schedulers)
                       .doOnNext(kafkaEvent -> {
                                       callBusinessLogic(kafkaEvent);
                       })
                       .doOnError(e->e.printStackTrace())
                       .subscribe();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

2 participants