Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Is it possible to produce events using reactor? #480

Open
Ehud-Lev-Forter opened this issue Nov 14, 2022 · 7 comments
Open

Question: Is it possible to produce events using reactor? #480

Ehud-Lev-Forter opened this issue Nov 14, 2022 · 7 comments
Labels
enhancement New feature or request not-a-bug question Further information is requested

Comments

@Ehud-Lev-Forter
Copy link
Contributor

Hi, I am working with ReactorProcessor and I would like to produce events. I tried the following code, but it did not write any new events into output topic.
Example code:

lateinit var pConsumer: ReactorProcessor<String, JsonObject>

    private fun createReactParallelConsumer(): ReactorProcessor<String, JsonObject> {
        val producer: Producer<String, JsonObject> = KafkaProducerBuilder.getProducer(kafkaConsumerConfig)
        val options = ParallelConsumerOptions.builder<String, JsonObject>()
            .ordering(ParallelConsumerOptions.ProcessingOrder.KEY)
            .maxConcurrency(parallelConsumerConfig.maxConcurrency)
//            .batchSize(parallelConsumerConfig.batchSize)
            .consumer(buildConsumer(kafkaConsumerConfig))
            .producer(producer)
            .build()
        return ReactorProcessor(options)
      pConsumer.react { context ->
            val event = context.singleConsumerRecord
            // do something with event
            val result = ProducerRecord<String, JsonObject>("output", event.key(),
                JsonObject(mapOf("someTest" to event.offset())))
            Mono.just(result)
        }

What am i missing?

@astubbs
Copy link
Contributor

astubbs commented Nov 17, 2022

Hi and welcome to the project! The reactor module doesn't have the produce flow implemented. However, you can produce a record directly with the producer. However you should consider if you want to wait for positive ack from the broker first or not. If doing a blocking send, you should use a blocking thread pool from Reactor to do it.

To make this more obvious, I could have the reactor module check the return result of the user function, and either throw an exception or log a warning, if a ProducerRecord is returned from your React function. Thoughts?

With regards to sending, the core module does send in bulk, an wait in bulk for send acks. However it still blocks while doing so. See this upcoming PR for fully async non blocking rec ack in the core module:

Curious - what sort of load do you have, that you want to use Reactor? Or is it just to be more efficient with resources?

@astubbs astubbs added question Further information is requested not-a-bug enhancement New feature or request labels Nov 17, 2022
@Ehud-Lev-Forter
Copy link
Contributor Author

Hi, 
First of all, thanks for this great library. We are using Reactor since we are writing to S3 and RDS and we want to do it in an async way without blocking, to achieve better usage of resources.
Currently, we are working on implementing the producer logic. Once our processor tasks are finished in an async way (we are not blocking) we produce events with callbacks that "on complete" update "computableFuture"s. 
We return at the end of the function a Mono.fromFuture(of the Computable future for those producer results). We assume that the library will only commit after the callback will finish successfully. 
Does that make sense?
BTW the reason I opened this issue was that I saw the producer in the reactor example

@astubbs
Copy link
Contributor

astubbs commented Nov 17, 2022

My pleasure! :)

Yup! Sounds great! And yes, that's right. You can see in the Reactor adapter, that the hooks for commit don't register until the Future completes.

You could probably do a PR to do basically that and have it inside pc.

If you could paste your code, I can take a look to see if it lines up with what I'm imagining.

BTW the reason I opened this issue was that I saw the producer in the reactor example

Link didn't work, but the reactor example - yeah it's not the greatest, but the item it returns is just a dummy string. It doesn't try to send a record back to PC. Am I missing something?

@Ehud-Lev-Forter
Copy link
Contributor Author

Ehud-Lev-Forter commented Nov 17, 2022

PR is too much for me now, Our code is written in kotlin and obviously I can not share it, but I converted the relevant parts to java. For what it worth here is an example code of what we did:

    /**
     *
     * // Example usage
     parallelConsumer.react(context -> {
         var consumerRecord = context.getSingleRecord().getConsumerRecord();
         log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
         Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
         Mono originalResult =  Mono.just(Arrays.asList(new ProducerRecord("topic", "key", "some value"));
         return originalResult.map(batchProducer::produce);
     });
     */
class BatchProducer<K, V> {
        Producer<K, V> producer;

        public BatchProducer(Producer<K, V> producer) {
            this.producer = producer;
        }

        public Mono<List<RecordMetadata>> produce(List<ProducerRecord<K, V>> messages) {
            List<CompletableFuture<RecordMetadata>> futures = messages.stream().map(message -> {
                CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<RecordMetadata>();
                Callback kafkaCallback = createCallback(completableFuture);
                producer.send(message, kafkaCallback);
                return completableFuture;
            }).toList();
            CompletableFuture<List<RecordMetadata>> oneResult = sequence(futures);
            return Mono.fromFuture(oneResult);
        }

        // From here: https://stackoverflow.com/questions/30025428/convert-from-listcompletablefuture-to-completablefuturelist
        static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
            return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
                    .thenApply(v -> com.stream()
                            .map(CompletableFuture::join)
                            .collect(Collectors.toList())
                    );
        }

        private Callback createCallback(CompletableFuture<RecordMetadata> completableFuture) {
            return new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        completableFuture.completeExceptionally(exception);
                    } else {
                        completableFuture.complete(metadata);
                    }
                }

            };
        }

        public void close() {
            producer.close();
        }

    }

@Ehud-Lev-Forter
Copy link
Contributor Author

Hi again,
Following adding a PR to contribute to the project, I tried to download and compile it locally, and it failed, the reason it fails as far as I understand is due to maven repositories that are not publicly available and also jars that are not available in maven central.
Is there a guide on how to install locally?
Is there something I should do before running mvn clean compile?
Is there a workaround?

@astubbs
Copy link
Contributor

astubbs commented Dec 5, 2022

TG isn't in maven central, but it is in a public repo, and the repo is added in the pom. Should be fine - unless your corporate proxy setup blocks other repos?

It's on my list to add TG to central, but it's not a priority.

Is there a guide on how to install locally?

shouldn't be needed

Is there something I should do before running mvn clean compile?

no

Is there a workaround?

to what?

@astubbs
Copy link
Contributor

astubbs commented Dec 5, 2022

I'm on the confluent community slack if you want to chat in real time FYI

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request not-a-bug question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants