-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add JVM shutdown hook to KafkaReceiver to close KafkaConsumer #247
Comments
Is that a typical arrangement @garyrussell @artembilan ? |
Well, Spring for Apache Kafka project is fully rely on the Doesn't look like Spring adds such a hook by default: https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#beans-factory-shutdown. Looks like Spring Boot has its own opinion: https://github.com/spring-projects/spring-boot/blob/main/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/SpringApplication.java#L203. But anyway. If we expose something like |
I am using Reactor Kafka in Spring Boot Webflux environment. So, I am expecting that spring-managed But, Spring only closes |
OK. I see. Right, the At the same time I see that |
It looks to me like the What am I missing? Isn't it the application's responsibility to complete the flux? |
I have a question. When exactly does the My proposal is, Also, |
Your application bean could implement
Agreed. |
Yes, currently my component class implements the
|
Isn't there an inherent risk here, because the class is reactive and thus non-blocking? I would be vary of such a change. |
(exposing a |
The return type of In regarding to try-with-resources
When the |
in that particular case, it potentially wouldn't even get a chance to deal with the If the close() happens fast enough, it could trigger before the |
Hi! I am using Is this expected? I added a debugger to see if the |
@aritrachatterjee15 There is a test here: reactor-kafka/src/test/java/reactor/kafka/receiver/internals/MockReceiverTest.java Lines 125 to 145 in f74cd96
|
Using reactor-kafka 1.3.18 in a Scala settings. Basically converting the eventual consumer Getting a hold of When a cancelation signal is received, the stream will use the Here is a snippet for reference: val receiver =
KafkaReceiver
.create(receiverOptions(group, topics: _*))
val upstream =
receiver
.receiveAutoAck()
.concatMap(identity(_))
.retryWhen(Retry.backoff(retryMax, retryDelay).transientErrors(true))
.repeat
val stream: Stream[IO, ConsumerRecord[String, Array[Byte]]] =
fs2.interop.flow
.fromPublisher[IO](FlowAdapters.toFlowPublisher(upstream), bufferSize)
.evalFilter(safeAccept)
if (asyncSize == 1)
stream.evalMap(safeOnRecord).compile.drain.cancelable {
IO.blocking {
logger.info("kafka consumer canceled")
}
}
else
stream.parEvalMap(asyncSize)(safeOnRecord).compile.drain.cancelable {
IO.blocking {
logger.info("kafka consumer canceled")
}
} |
When the JVM receives a SIGINT,
the
KafkaReceiver
does NOT close the underlyingKafkaConsumer
to prevent resource leaks.The text was updated successfully, but these errors were encountered: