Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency set to > 1 still have one reactor thread only consuming from multiple partitions #332

Open
patpatpat123 opened this issue Mar 29, 2023 · 6 comments
Labels
type/enhancement A general enhancement
Milestone

Comments

@patpatpat123
Copy link

patpatpat123 commented Mar 29, 2023

Hello Reactor Kafka team,

Apologies to trouble you with this issue.

I am having a topic with multiple partitions (4 partitions) with one single consumer running in a multi core machine.

Using Spring-Kafka (I understand this is another project, but I am just utilizing this a comparison example) setting this:
factory.setConcurrency(4);
I see one thread per partition behaviour.

Using Spring Cloud Stream (I understand I am mentioning another project) setting:
spring.cloud.stream.bindings.input.consumer.concurrency=4
I also see one thread per partition behavior.

I tried setting either one in Reactor Kafka, and unfortunately, I am still seeing one thread only consuming all partitions, even with multicore machines.

I would have expected one reactor core, one reactor thread per partition, hence raising this issue.

Thank you in advanced for your help.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Mar 29, 2023
@garyrussell
Copy link
Contributor

You either have to create multiple KafkaReceivers or add a publishOn(someScheduler) element to the pipeline. If you do the latter you may need to enable out of order offset commits, depending on how you are committing offsets.

@garyrussell garyrussell added for/stackoverflow Questions are best asked on SO or Gitter and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Mar 29, 2023
@garyrussell
Copy link
Contributor

In future, please ask questions on Stack Overflow - GitHub issues are for reporting bugs or asking for new features.

@garyrussell garyrussell added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Mar 29, 2023
@patpatpat123
Copy link
Author

Thank you @garyrussell.

Your explanations are very clear.
Would you allow me to turn this into an enhancement request, feature request, when one uses reactor kafka to consume from multiple partition, to have some kind of out of the box mechanism which would handle multiple partitions with multiple reactor core / reactor thread please?

And this, without, having to manually create multiple instances of KafkaReceiver (have user side to scale up instances), or to play with publishOn and a dedicated Scheduler?

I know it is not good to just throw other projects name, but Spring Kafka, Spring Cloud Stream have easy and friendly configurations in order to make this happen (without any code change)

Something a bit more far away, Reactor Netty would propose out of the box feature to scale one reactor thread / reactor core per incoming request.

Mirroring this behavior on the Spring Reactor projects stack, it would be cool to have a behavior where messages coming from multiple partitions, Reactor Kafka would take care of it.

If you believe this enhancement request does not make sense, please feel free to close this. This is just opening an idea for discussion.

Thank you again.

@garyrussell
Copy link
Contributor

Also, note that there is a reactive kafka binder for spring-cloud-stream (based on this project); it honors the concurrency property by creating multiple receivers and merging their fluxes.

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_concurrency

@patpatpat123
Copy link
Author

patpatpat123 commented Mar 29, 2023

Agreed. I was using the Spring Cloud Stream Reactive Kafka binder mentioned on my first post.

This would requires one to pull in the whole Spring Cloud Stream ecosystem in order to benefit from this feature.

For a regular Reactive Kafka application, this project is simple, small, fast, efficient enough.
Would it be possible to have this feature as part of Reactor Kafka, without having to go with the entire (to be honest, much heavier) Spring Cloud Stream please?

Totally ok with an argument (just use Spring Cloud Stream, closing), but I thought having this feature directly in the lighter Reactor Kafka would be cool

@garyrussell garyrussell added type/enhancement A general enhancement and removed for/stackoverflow Questions are best asked on SO or Gitter for/user-attention This issue needs user attention (feedback, rework, etc...) labels Mar 29, 2023
@garyrussell
Copy link
Contributor

It would be pretty easy to write a simple wrapper around multiple receivers.

Contributions are always welcome.

@garyrussell garyrussell added this to the Backlog milestone Mar 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants