-
Notifications
You must be signed in to change notification settings - Fork 40.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Reactor Kafka #29080
Comments
This comment has been minimized.
This comment has been minimized.
@snicoll @simonbasle @garyrussell any help with this? Can someone describe what features are currently missing on this subject and how should they be implemented (what functionalities should be added and where), so it'll be easier for contributors to help with this? |
This comment has been minimized.
This comment has been minimized.
@lynch19 reactor-kafka has two fundamental properties objects Each with a number of properties. In addition, creation of these takes a There are several levels of Boot auto configuration that would be useful, with the MVP being the auto-configuration of these two beans using application properties. |
I'm using the reactive kafka in my applications. The Reactor kafka provides everything you need to work. Or does it means support in the form of annotations? @ReactiveKafkaListener(topic = "smth", autoCommit=true)
public Mono<Void> consumer(DTO dto) {
log.info("received dto: " + dto);
return service.call(dto).then();
} |
No; not at all; this is just about configuring the sender and receiver options via Boot properties. Such a mechanism would belong in Spring for Apache Kafka (spring-kafka), but there are currently no plans to do so. |
I would suggest something like We can leverage the existing |
This has been requested previously (#18751) but was declined as the Reactor team advised us against adding support. IIRC, this was due to the status of Reactor's Kafka support at the time and things may well have moved on since then. @simonbasle, what's you take on this now please? Has this moved on sufficiently in the last couple of years that this is now worth considering again? |
@wilkinsona with the broad changes in 1.3.x late 2020, reactor-kafka has been made more stable and maintainable. thanks to @garyrussell and @artembilan, the project is more actively maintained. thus I would defer to gary and artem regarding that decision, but it definitely looks better than back in 2019 |
I concur; it is in much better shape now, thanks to some significant work by Sergie back then. Also, due to community requests, the Spring Cloud Stream team are likely to incorporate it in the next major release and basic auto configuration of the sender and receiver properties will make things easier there too. |
@garyrussell what will it exactly demand? Only
Do you mean that we should use the existing |
@almogtavor There would be no As I said above, I'd say the MVP would be a For example @Bean
ReceiverOptions receiverOptions(KafkaProperties kp, ReactiveKafkaProperties rkp) {
ReceiverOptions opts = ReceiverOptions.create(kp.buildConsumerProperties());
/// apply rkps.getReceiver() properties here
return opts;
} Calling options customizers (if configured) before returning might be a nice addition, but not really a requirement because the user can further customize the properties where used to create a receiver/sender - since the options are immutable, the base options can be altered in different ways for each usage. A object is created each time a property is added. |
@garyrussell Seems great. Is there any need for using |
|
Notice that there are not a lot of common parameters between the two (
|
I am not talking about those properties, I am specifically talking about the Ideally, that part of |
Agree about the common super class. Is this a thing you'd want later on or at the very first PR? |
@garyrussell I've raised a PR. |
As a continuation of the discussion in the PR...
I believe we want to auto-configure only the @garyrussell @artembilan @almogtavor is this your understanding as well?
The properties would be the 1st class properties offered by the Receiver/Sender options as well as the ability to specify the normal consumer/producer properties currently available in KafkaProperties.Consumer/Producer. It does sound like we may only want to offer a subset of those. And there is also a question of "overlapping" properties to figure out. |
As a continuation from this comment on the PR...
@garyrussell are you suggesting we should drop some of the properties in the current PR? I think the number of primitive properties on the ReceiverOptions (the ones chosen in the associated PR) are not too overwhelming. The more complicated props can be adjusted as needed in the options customizer that will be added shortly after.
Yes, this would be nice. |
That was my proposal for the MVP above, yes: For the "overlapping" properties (for spring-kafka), the hierarchy is as follows (e.g. for the consumer properties).
i.e. with:
the last one would win.
I am not sure what you mean; I am suggesting using the Producer and Consumer (from For complex types, for example, users can grab the auto configured Given that the default implementation of @Autowired
ReceiverOptions ro;
...
ReceiverOptions one = ro.addAssignListener(...).subscription(List.of("topic1"));
ReceiverOptions two = ro.addAssignListener(...).subscription(List.of("topic2")); one and two will be different objects. |
I agree w/ the property hierarchy and precedence as you outlined above. The "overlapping" ones I was referring to are the ones that are named similar but may or may not be the same thing (eg.
I am suggesting the same thing. My comment around the "drop some of the properties" I was referring to the 1st class boot properties in the current proposal. I was not sure if you were suggesting there were currently too many of them and we should pick the high priority ones. It sounds like that is not what you are saying though. Sorry for the confusion.
Are you suggesting we do not add the Sender/ReceiverOptionCustomizers or that we can use this technique until we do add the customizers? |
On second thought, I suppose customizers would still make sense - e.g. for an app that creates multiple receivers but wants to add the same assignment listener to them all. |
So I dug into each ReactorKafka TL;DRThe only consumer properties that needs to be considered as duplicate/overlapping at this point are:
They are not the same exact property but could be used to control the same underlying concept. More details below. DetailsReceiverOptions propertiesignore (we are not surfacing these - customizer only)
Unique to RK only
Auto-commit relatedSo these were the ones that seemed likely overlapping
Unique to RK onlyWhile these are commit related, I believe these have no equivalent in KP
Needs more diggingI think this is RKP only but need to dig a bit more to see what KP options are available for out-of-order-commits
|
This "overlap" is by name only; unfortunate, but true. The latter two control whether the kafka-clients automatically commits the offsets on a schedule. The first one is the interval used by reactor kafka to automatically commit offsets. Users should not enable both mechanisms; in spring-kafka, we disable reactor-kafka also disables it - see
|
We should consider removing those properties from |
Agreed @garyrussell - that is what I was alluding to here as well with these comments:
If they are not desired in spring-kafka either, it seems like a great thing to remove (as you suggested). |
So I dug into each ReactorKafka TL;DRThese are much more straight forward than the consumer/receiver properties. Properties unique to RKP
ignore (not surfacing via props - customizer only)
|
Based on the above analysis of the KafkaProperties and the Sender/ReceiverOptions of ReactorKafka, here is a suggested list of properties as well as a suggested "layout" of them. I think we can use this as a starting point of how to map them out. spring:
kafka:
reactor:
# common properties (just like KafkaProperties)
bootstrap-servers: localhost:9093
ssl: ...
security: ...
properties: ...
client.id: fooDemoApp
# Producer class - pass to Kafka producer props (aka SenderOptions.properties)
producer:
key-serializer: org.apache.kafka.common.serialization.LongSerializer
value-serializer: com.example.FooSerializer
buffer-memory: 32MB # used in tandem w/ max-in-flight
...
# SenderOptions props
sender:
close-timeout: 5m
max-in-flight: 256
stop-on-error: false
# Consumer class - pass to Kafka consumer props (aka ReceiverOptions.properties)
consumer:
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
value-deserializer: com.example.FooDeserializer
...
# DELETE enable-auto-commit:
# DELETE auto-commit-interval:
# ReceiverOptions props
receiver:
subscribe-topics:
subscribe-pattern:
poll-timeout:
close-timeout:
commit-interval:
commit-batch-size:
max-commit-attempts:
commit-retry-interval:
max-deferred-commits:
atmost-once-commit-ahead-size: This assumes that no properties will be used for ReactorKafka directly under
A benefit of not sharing the KP producer/consumer props (the ones under |
I would vote for keeping them separate. |
@onobc @garyrussell Any concrete decision taken? @onobc do you suggest implementing one |
Hi @almogtavor , Due to competing priorities we have not had a chance to come to an agreement on the approach. More than likely we will close this PR and will revisit in a subsequent proposal. I will be sure we tag you when that happens to keep you in the loop. Thank you for this initial contribution. |
@onobc Don't you think we can meanwhile lean on the current PR since it delivers the issue, and just migrate paths to the ideal solution when the time comes? The current PR doesn't include changes to Spring-Kafka's autoconfigurations, so that'll only be an additional feature. It just feels like it's going to take lots of time for some pretty semantic changes. After all, merging an initial and basic version will still help to prettify the usage of Reactor Kafka with Spring Boot. Wdyt? |
@almogtavor Unfortunately, I don't think we should do that. Until we know exactly what direction we want to take, merging something may set us off in the wrong direction. If we don't have a chance to course correct before 3.0 is released later this year, backwards compatibility will then limit our options. We want to avoid that by giving ourselves enough time to maximise our chances of getting it right first time round. |
@wilkinsona makes sense. |
will the methods annotated @ReactiveKafkaListener supports @RetryableTopic ? |
There is no such thing as Also |
Hi @garyrussell any plan when we'll be getting Thanks |
There are no such plans; an evaluation resulted in there not being much value add that spring-kafka can provide over reactor-kafka. |
So, If I want to implement reactor-kafka then I have only below referral doc as of now? |
Yes. |
Why do you think there is no value in that? Isn't the difference similar to the one between WebMVC and WebFlux (i.e. blocking vs non-blocking?) |
That is not a useful comparison. If you feel spring-kafka can provide some useful value add over reactor-kafka, please start a discussion with your thoughts over there. https://github.com/spring-projects/spring-kafka/discussions Let's not continue here. I am open to ideas, but nobody has provided a compelling reason to do anything so far. |
Replied here spring-projects/spring-kafka#2351 (reply in thread) |
@wilkinsona @garyrussell any reason for this to not get solved for so long? There are already 3 working PRs on the subject |
@almogtavor Thanks for your patience. That there are 3 working PRs is largely why nothing's happened. We know we want to do something here but as of yet we haven't managed to identify exactly what we want to do. We have a number of competing priorities at the moment and adding support for Reactor Kafka hasn't yet made it to the top of the list. Unfortunately, that's unlikely to change until we're able to carve out a block of time to work on this and figure out exactly what needs to be done. |
As said in reactor/reactor-kafka#100 (comment) native support for Reactor Kafka would be nice to lots of users. Spring Boot currently supports WebFlux, but auto-configuration and more capabilities aren't supported yet with Reactor Kafka. As others said in different issues, I also think the connection between Reactor Kafka and Spring Boot should be more documented and supported since it's the most basic usage of Reactor Kafka. There is currently no official way of using Reactor Kafka with Spring Boot, which is pretty odd.
The text was updated successfully, but these errors were encountered: