Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application Pod Crashed: Kafka Producer Exhausted 3GB Heap Memory When Broker Failed #385

Open
owaiscs001 opened this issue Mar 1, 2024 · 0 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@owaiscs001
Copy link

Expected Behavior

Kafka Sender
KafkaSender<Object, Object> kafkaSender;
Create sender options and update kafkaSender
this.senderOptions = SenderOptions.<Object, Object>create(configurationProperties);
this.kafkaSender = KafkaSender.create(senderOptions);
Producer Record
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, value);
Sender Record
SenderRecord<Object, Object, Object> senderRecord = SenderRecord.create(record, requestId);
Send API
kafkaSender.send(Mono.just(senderRecord)).subscribe()
ACK 0 and bufferMemory: 33554432

When sending 3.15K messages per second (MPS) using two Kafka broker pods alongside the mentioned KafkaSender and sender record within the specified send API, scaling down the first broker pod initially resulted in no observable impact. However, upon scaling down the second broker pod, the application pod restarted due to an Out of Memory (OOM) kill with error code 137.

Even though buffer memory is limited to 32MB, the complete depletion of the 3GB heap memory caused by Kafka Producer crashed the application

Actual Behavior

The application is expected to handle exceptions gracefully, preventing the complete depletion of heap memory.

Steps to Reproduce

Kafka Sender
KafkaSender<Object, Object> kafkaSender;
Create sender options and update kafkaSender
this.senderOptions = SenderOptions.<Object, Object>create(configurationProperties);
this.kafkaSender = KafkaSender.create(senderOptions);
Producer Record
ProducerRecord<Object, Object> record = new ProducerRecord<>(topic, value);
Sender Record
SenderRecord<Object, Object, Object> senderRecord = SenderRecord.create(record, requestId);
Send API
kafkaSender.send(Mono.just(senderRecord)).subscribe()

@Test
void reproCase() {

}

Possible Solution

Your Environment

  • Reactor version(s) used: 1.3.22
  • Other relevant libraries versions (eg. netty, ...): apache kafka
  • JVM version (java -version): 17.0.10
  • OS and version (eg uname -a): Linux
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Mar 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

2 participants