Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JVM shutdown hook to KafkaReceiver to close KafkaConsumer #247

Open
SujanKumarMitra opened this issue Oct 5, 2021 · 15 comments
Open
Labels
status/need-decision This needs team attention and discussion type/enhancement A general enhancement
Milestone

Comments

@SujanKumarMitra
Copy link

When the JVM receives a SIGINT,
the KafkaReceiver does NOT close the underlying KafkaConsumer to prevent resource leaks.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Oct 5, 2021
@simonbasle simonbasle added status/need-decision This needs team attention and discussion type/enhancement A general enhancement and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Oct 5, 2021
@simonbasle simonbasle added this to the 1.3.x Backlog milestone Oct 5, 2021
@simonbasle
Copy link
Member

Is that a typical arrangement @garyrussell @artembilan ?
If the kafka-client (KafkaConsumer) doesn't install such a hook and puts the responsibility on the user, doesn't it make sense that the same would be required for KafkaSender (and its #close() method)?

@artembilan
Copy link
Contributor

Well, Spring for Apache Kafka project is fully rely on the ApplicationContext lifecycle.
So, whatever calls its close() would initiate the underlying KafkaConsumer.close().

Doesn't look like Spring adds such a hook by default: https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-factory-shutdown.
But at least there is such a API for end-user consideration.

Looks like Spring Boot has its own opinion: https://github.com/spring-projects/spring-boot/blob/main/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/SpringApplication.java#L203.

But anyway. If we expose something like registerShutdownHook() on the KafkaReceiver and KafkaSender, that should be enough for end-user requirements.

@SujanKumarMitra
Copy link
Author

SujanKumarMitra commented Oct 5, 2021

I am using Reactor Kafka in Spring Boot Webflux environment.

So, I am expecting that spring-managed KafkaReceiver beans should be closed on SIGINT. Like it happens for JDBC or MongoDB connections.

But, Spring only closes KafkaSender connections. Maybe if KafkaReceiver implements the Closeable interface or close(), spring's shutdown hook might kick in.

@artembilan
Copy link
Contributor

OK. I see. Right, the KafkaReceiver (neither its impl) has a close() API, unlike KafkaSender does.

At the same time I see that ConsumerHandler has one, but does not delegate to its Consumer.close().

@garyrussell
Copy link
Contributor

garyrussell commented Oct 5, 2021

ConsumerHandler invokes consumerEventLoop.stop() which schedules a CloseEvent which closes the consumer.

It looks to me like the handler.close() will be invoked when the receiver.receive() flux completes; it is passed into the usingWhen as the asyncCleanUp argument.

What am I missing? Isn't it the application's responsibility to complete the flux?

@SujanKumarMitra
Copy link
Author

SujanKumarMitra commented Oct 6, 2021

I have a question. When exactly does the KafkaReceiver.receive() completes? I think, it runs indefinitely unless someone uses the Disposable.dispose() method returned by Flux.subscribe().

My proposal is,
expose a close() method in KafkaReceiver which will complete the KafkaReceiver.receive() flux and close the KafkaConsumer as well.

Also, KafkaSender has a close() but it doesn't extend the Closeable or AutoCloseable. It would be better if it extends the AutoCloseable so that it can be used with try-with-resources

@garyrussell
Copy link
Contributor

Your application bean could implement AutoCloseable and call dispose().

Also, KafkaSender has a close() but it doesn't extend the Closeable or AutoCloseable. It would be better if it extends the AutoCloseable so that it can be used with try-with-resources

Agreed.

@SujanKumarMitra
Copy link
Author

SujanKumarMitra commented Oct 6, 2021

Your application bean could implement AutoCloseable and call dispose().

Yes, currently my component class implements the DisposableBean and, in the destroy() method, I am calling the dispose() method.
But the problem is, the subscribers of the KafkaReceiver.receive() receive an error signal of CancellationException when dispose() is called. That's why I am proposing this:

My proposal is,
expose a close() method in KafkaReceiver which will complete the KafkaReceiver.receive() flux and close the KafkaConsumer as well.

@simonbasle
Copy link
Member

Also, KafkaSender has a close() but it doesn't extend the Closeable or AutoCloseable. It would be better if it extends the AutoCloseable so that it can be used with try-with-resources

Isn't there an inherent risk here, because the class is reactive and thus non-blocking?
A try-with-resource block would only make sense if you block inside, which is generally discouraged.
Worse, if you don't block but still use the KafkaSender inside a twr, it will get closed too early.

I would be vary of such a change.

@simonbasle
Copy link
Member

(exposing a close() method where it is missing makes perfect sense though)

@SujanKumarMitra
Copy link
Author

The return type of KafkaSender.close() is void not Mono<Void> . So, it must be a blocking operation right?

In regarding to try-with-resources
Imagine the following piece of code

try(KafkaSender sender = KafkaSender.create(senderOptions)) {
       sender.send(recordsPublisher)
                  .subscribe();
}

When the close() call is made, does the KafkaSender finishes the task of pending messages to send or discards them and closes the KafkaProducer?

@simonbasle
Copy link
Member

simonbasle commented Oct 7, 2021

in that particular case, it potentially wouldn't even get a chance to deal with the recordsPublisher at all. I'm not sure of the exact behavior but I would expect the KafkaSender would terminate that subscription with an onError stating that the KafkaSender has been closed. Since .subscribe() is being used (without an error handler), the default behavior of reactor would then consist in logging this onError.

If the close() happens fast enough, it could trigger before the send, and so from the perspective of the KafkaSender there would be no pending messages (since at this point the recordsPublisher wouldn't even be subscribed, or it could have been cancelled).

@aritrachatterjee15
Copy link

Hi! I am using reactor-kaka with spring boot and running into a similar issue. I am calling dispose() on shutdown and this eventually calls ConsumerEventLoop#stop(). However, this isn't closing the underlying KafkaConsumer. There is a call to wakeup().

Is this expected? I added a debugger to see if the close() is invoked on the consumer, but it doesn't look like it is getting called. Since this consumer is wrapped by KafkaReceiver, I don't see a way how spring could automatically handle lifecycle for it either...

@garyrussell
Copy link
Contributor

@aritrachatterjee15 stop() schedules a CloseEvent which performs the close().

There is a test here:

/**
* Tests that a consumer is created when the inbound flux is subscribed to and
* closed when the flux terminates.
*/
@Test
public void consumerLifecycle() {
sendMessages(topic, 0, 1);
receiverOptions = receiverOptions.subscription(Collections.singleton(topic));
DefaultKafkaReceiver<Integer, String> receiver = new DefaultKafkaReceiver<>(consumerFactory, receiverOptions);
assertEquals(0, consumerFactory.consumersInUse().size());
Flux<ReceiverRecord<Integer, String>> flux = receiver.receive();
assertEquals(0, consumerFactory.consumersInUse().size());
Disposable c = flux.subscribe();
TestUtils.waitUntil("Consumer not created using factory", null, f -> f.consumersInUse().size() > 0, consumerFactory, Duration.ofMillis(500));
assertEquals(Arrays.asList(consumer), consumerFactory.consumersInUse());
assertFalse("Consumer closed", consumer.closed());
c.dispose();
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue("Consumer closed", consumer.closed());
});
}

@hkarim
Copy link

hkarim commented May 13, 2023

Using reactor-kafka 1.3.18 in a Scala settings. Basically converting the eventual consumer Flux into a plain Publisher and let the streaming happen through FS2, which is a streaming library.

Getting a hold of Disposable is not feasible here, since I am not subscribing to the upstream, just passing it along as a plain publisher.

When a cancelation signal is received, the stream will use the Subscription instance to cancel, which I can verify since I see the kafka consumer canceled log message. Still, KafkaReceiver doesn't stop. So, does KafkaReceiver honor cancel signal at all?

Here is a snippet for reference:

  val receiver =
    KafkaReceiver
      .create(receiverOptions(group, topics: _*))

  val upstream =
    receiver
      .receiveAutoAck()
      .concatMap(identity(_))
      .retryWhen(Retry.backoff(retryMax, retryDelay).transientErrors(true))
      .repeat

  val stream: Stream[IO, ConsumerRecord[String, Array[Byte]]] =
    fs2.interop.flow
      .fromPublisher[IO](FlowAdapters.toFlowPublisher(upstream), bufferSize)
      .evalFilter(safeAccept)

  if (asyncSize == 1)
    stream.evalMap(safeOnRecord).compile.drain.cancelable {
      IO.blocking {
        logger.info("kafka consumer canceled")
      }
    }
  else
    stream.parEvalMap(asyncSize)(safeOnRecord).compile.drain.cancelable {
      IO.blocking {
        logger.info("kafka consumer canceled")
      }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/need-decision This needs team attention and discussion type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

7 participants