Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaProducer#close() seems to leak Kafka resources. #341

Open
basking2 opened this issue Apr 23, 2023 · 9 comments
Open

KafkaProducer#close() seems to leak Kafka resources. #341

basking2 opened this issue Apr 23, 2023 · 9 comments
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet

Comments

@basking2
Copy link

I create a KafkaSender and give it a Flux that produces records. In the onFinally() of the Flux I close (.close()) the sender.

I observe that the Kafka library says that it prefers the close(Duration) call be used, but closes the underlying object anyway.

I further observe that, as I create and close many senders, my heap use grows and results in an OOM, eventually (~12 hours of operation).

Using jmap I see Kafka Node objects constantly growing. They seem to eventually collect, but the memory is then used by more generic objects like ByteArrays or similar.

When I use only 1 sender for the application, I do not observe a leak.

Expected Behavior

Calling .close() will reliably make all resources collectable by the GC and OOMs will not happen.

Actual Behavior

Over 12 hours, heap grows to 1.5GB and eventually the JVM exits with an out of heap space error.

Steps to Reproduce

In a loop, create 1 sender, send records to Kafka, close the sender in the Flux's onFinally() callback.

Repeat this and Kafka seems to leak resources.

Possible Solution

Do we need to use close(Duration) or call flush() first?

Your Environment

  • Reactor version(s) used:
    • io.projectreactor.kafka:reactor-kafka:jar:1.3.16:compile
    • io.projectreactor:reactor-core:jar:3.4.13:compile
    • org.apache.kafka:kafka-clients:jar:3.1.2:compile
  • JVM 11
  • OS is eclipse-temurin:11 run under Docker compose.
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Apr 23, 2023
@artembilan
Copy link
Contributor

The DefaultKafkaSender has this logic:

    public void close() {
        if (!hasProducer.getAndSet(false)) {
            return;
        }
        producerMono.doOnNext(producer -> producer.close(senderOptions.closeTimeout()))
                    .block();
        if (senderOptions.isTransactional()) {
            senderOptions.scheduler().dispose();
        }
        scheduler.dispose();
    }

So, we really do call close(Duration). It uses a Long.MAX_VALUE for default timeout.
But I don't think it is relevant somehow to the problem.

Would be great to have a simple sample from you to reproduce.

@artembilan artembilan added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Apr 24, 2023
basking2 added a commit to basking2/reactor-kafka-341 that referenced this issue Apr 24, 2023
@basking2
Copy link
Author

@artembilan , I've packaged up a program that exhibits the behavior in VisualVM here.

Thank you!

@artembilan
Copy link
Contributor

May we see a memory report where you think it is a leak?
I'm not sure that I'm going to run your program for 12 hours...

May this behavior can be reproduced just with a plain KafkaProducer?
Then it is already outside of this project scope and has to be reported against Apache Kafka by itself.

@basking2
Copy link
Author

@artembilan The sample code takes about 5 minutes to fail with a 300m heap limit. Attached is a VisualVM screen shot.

reactor-kafka-341

@artembilan
Copy link
Contributor

OK. When I moved that sender.close() out of the doFinally(), just as the last line of that cycle, I don't see any problems with the memory from VisualVM.

So, it looks like that doFinally() is performed within a producer thread, so, we just cannot call the close() while we are sending yet.
See respective logs:

[2023-04-24 15:13:43,039] [kafka-producer-network-thread | producer-223] [org.apache.kafka.clients.producer.KafkaProducer] INFO [Producer clientId=producer-223] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2023-04-24 15:13:43,039] [kafka-producer-network-thread | producer-223] [org.apache.kafka.clients.producer.KafkaProducer] WARN [Producer clientId=producer-223] Overriding close timeout 9223372036854775807 ms to 0 ms in order to prevent useless blocking due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.
[2023-04-24 15:13:43,039] [kafka-producer-network-thread | producer-223] [org.apache.kafka.clients.producer.KafkaProducer] INFO [Producer clientId=producer-223] Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.

So, according to the info from Kafka Client this doFinally() is called from the mentioned "producer call-back".
And even if the rest info looks OK, according to our memory report the producer is really not closed.

Or better to say it is dead-locked.
We use producerMono property in a send() operation and then we call close() for this producerMono with a block() exactly from a send() operation.

Technically it is always better to use a single Kafka producer for the application, but if you cannot, you must close it when you really think you are done with it.
And, yeah, turns out this doFinally() in not the way to go based on a Flux<SenderResult<T>> send() result.

You need to revise your logic to call sender.close() from some other place.

@basking2
Copy link
Author

@artembilan , that's for the great writeup. We already revised the logic when the leak was attributed to the sender. I opened this issue to as this seems a natural and supported use case when, in fact, it's not. I've seen the logs you shared, but they didn't warn of leaking memory or resources.

I would love to warn future users to not try what I did, but I don't have a great idea of how to do that.

Again, thanks!

@artembilan
Copy link
Contributor

Yeah... I see your point.
We will discuss with @garyrussell what we can do on the matter (e.g. exception), when he comes back from PTO next week.

@artembilan artembilan added ❓need-triage This issue needs triage, hasn't been looked at by a team member yet and removed for/user-attention This issue needs user attention (feedback, rework, etc...) labels Apr 25, 2023
@garyrussell
Copy link
Contributor

Just curious why you want to close each time; the KafkaProducer javadocs recommend a single producer per app (although when using transactions, you will likely need more if concurrency is needed).

But, yes, it would be nice if we could detect it.

@basking2
Copy link
Author

basking2 commented May 3, 2023

@garyrussell - Ephemeral senders are not necessary for my use case. This issue is because close() seemed to fail on its promise and caused OOM crashes in early versions of our solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
❓need-triage This issue needs triage, hasn't been looked at by a team member yet
Projects
None yet
Development

No branches or pull requests

4 participants