Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

producer is not releasing memory occupied by byte[] #298

Open
kuldeep992 opened this issue Oct 17, 2022 · 0 comments
Open

producer is not releasing memory occupied by byte[] #298

kuldeep992 opened this issue Oct 17, 2022 · 0 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@kuldeep992
Copy link

kuldeep992 commented Oct 17, 2022

This issue involves using StringSerialiser/Deserialiser for key and ByteArraySerialiser/Deserialiser.
When I try to consume messages in batch, with manual acknowledgement, and pass the messages to producer to send these messages to different output topic, the producer is holding on to byte[] and not releasing the memory. It leads to OOM after the memory limit is reached.

image

Expected Behavior

The producer should ideally release the memory and shouldn't hold it back.

Actual Behavior

Producer somehow keeps the reference to all byte[] and hence garbage collector doesn't collect it. It leads to OOM.

Steps to Reproduce

Following code can be referenced in order to reproduce this. Run the below code with millions of messages in input topic and observe in profiler that the memory usage starts increasing and once its exhausted, the application crashes.

:: Consumer Class ::

 public void consumeMessages(String topic) {

    ReceiverOptions<String, byte[]> options = receiverOptions.pollTimeout(Duration.ofSeconds(20))
            .subscription(Collections.singleton(topic))
            .commitInterval(Duration.ZERO)
            .commitBatchSize(0)
            .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
            .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

    Flux<ReceiverRecord<String, byte[]>> kafkaFlux = KafkaReceiver.create(options).receive();

    kafkaFlux
        .buffer(batchSize)
        .doOnNext(records -> { 
            kafkaProducer.sendMessage(records);
            records.forEach(record -> record.receiverOffset().acknowledge());
        })
        .doOnError(error -> log.error("Error receiving event, will retry", error)) 
        .onErrorResume(e -> {
            Throwable ex = e.getCause();
            log.error("Retries exhausted for ", ex);
            return Mono.empty();
        }) 
        .subscribe();
}

:: Producer Class ::

public void sendMessage(List<ReceiverRecord<String, byte[]>> records) {
    String topic = kafkaConfiguration.getOutputTopic();

    sender.send(Flux.fromIterable(records.stream().map(record -> SenderRecord.create(
        new ProducerRecord<>(topic, record.key(), record.value()), UUID.randomUUID().toString())).collect(Collectors.toList()))
            .retry(1)
            .onErrorResume(ex->Flux.empty()))
        .doOnError(e -> log.error("Failed to send message", e))
        .subscribe(); 
} 

Possible Solution

If we can identify what's holding the memory and not releasing it, then it can be fixed.

Your Environment

  • Reactor version(s) used: Reactor-Kafka: 1.3.12
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version): openjdk 11.0.15 2022-04-19
  • OS and version (eg uname -a): Linux 34624abc07f2 5.10.104-linuxkit #1 SMP PREEMPT Thu Mar 17 17:05:54 UTC 2022 x86_64 Toybox

For more details: https://stackoverflow.com/questions/74055236/getting-oom-while-processing-records-in-batches-with-manual-offset-acknowledgeme

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

2 participants