Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Will this re-create a new kafka consumer every time there is an error #124

Open
dariodariodario opened this issue Feb 4, 2020 · 0 comments

Comments

@dariodariodario
Copy link

Hello, I'm new to reactor... and I wanted to use it to consume from Kafka (which on the other side I know well).

So the thing is I wanted to setup a pipeline that does retry in case of error. I read in the docs that an error in reactor is a terminal operation... even when using retry and backoff the engine will resubscribe to the flux. I use this to consume from a topic, following the thread per partition example:

        Scheduler scheduler = Schedulers.newBoundedElastic(
                processingThreads,
                100,
                "KafkaProcessingPool",
                60);
        this.receiverRecordFlux
                .groupBy(r -> r.partition() % processingThreads)
                .flatMap(groupedFlux -> {
                    var consumer = makeConsumer();
                    return groupedFlux.publishOn(scheduler)

                            .map(r -> {
                                consumer.accept(r);
                                return r.receiverOffset();
                            })
                            .concatMap(ReceiverOffset::commit);
                }).retryBackoff(Long.MAX_VALUE, Duration.ofMillis(25), Duration.ofSeconds(1))
                .doOnError(err -> LOG.warn("Error while consuming, will retry", err)).retry()
                .subscribe();

does it result in the consumer being re-created every time there is an error (that sounds crazy to me)...?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant