-
Notifications
You must be signed in to change notification settings - Fork 132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Question: Is it possible to produce events using reactor? #480
Comments
Hi and welcome to the project! The reactor module doesn't have the produce flow implemented. However, you can produce a record directly with the producer. However you should consider if you want to wait for positive ack from the broker first or not. If doing a blocking send, you should use a blocking thread pool from Reactor to do it. To make this more obvious, I could have the reactor module check the return result of the user function, and either throw an exception or log a warning, if a ProducerRecord is returned from your React function. Thoughts? With regards to sending, the core module does send in bulk, an wait in bulk for send acks. However it still blocks while doing so. See this upcoming PR for fully async non blocking rec ack in the core module: Curious - what sort of load do you have, that you want to use Reactor? Or is it just to be more efficient with resources? |
Hi, |
My pleasure! :) Yup! Sounds great! And yes, that's right. You can see in the Reactor adapter, that the hooks for commit don't register until the Future completes. You could probably do a PR to do basically that and have it inside pc. If you could paste your code, I can take a look to see if it lines up with what I'm imagining.
Link didn't work, but the reactor example - yeah it's not the greatest, but the item it returns is just a dummy string. It doesn't try to send a record back to PC. Am I missing something? |
PR is too much for me now, Our code is written in kotlin and obviously I can not share it, but I converted the relevant parts to java. For what it worth here is an example code of what we did: /**
*
* // Example usage
parallelConsumer.react(context -> {
var consumerRecord = context.getSingleRecord().getConsumerRecord();
log.info("Concurrently constructing and returning RequestInfo from record: {}", consumerRecord);
Map<String, String> params = UniMaps.of("recordKey", consumerRecord.key(), "payload", consumerRecord.value());
Mono originalResult = Mono.just(Arrays.asList(new ProducerRecord("topic", "key", "some value"));
return originalResult.map(batchProducer::produce);
});
*/
class BatchProducer<K, V> {
Producer<K, V> producer;
public BatchProducer(Producer<K, V> producer) {
this.producer = producer;
}
public Mono<List<RecordMetadata>> produce(List<ProducerRecord<K, V>> messages) {
List<CompletableFuture<RecordMetadata>> futures = messages.stream().map(message -> {
CompletableFuture<RecordMetadata> completableFuture = new CompletableFuture<RecordMetadata>();
Callback kafkaCallback = createCallback(completableFuture);
producer.send(message, kafkaCallback);
return completableFuture;
}).toList();
CompletableFuture<List<RecordMetadata>> oneResult = sequence(futures);
return Mono.fromFuture(oneResult);
}
// From here: https://stackoverflow.com/questions/30025428/convert-from-listcompletablefuture-to-completablefuturelist
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
private Callback createCallback(CompletableFuture<RecordMetadata> completableFuture) {
return new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
completableFuture.completeExceptionally(exception);
} else {
completableFuture.complete(metadata);
}
}
};
}
public void close() {
producer.close();
}
} |
Hi again, |
TG isn't in maven central, but it is in a public repo, and the repo is added in the pom. Should be fine - unless your corporate proxy setup blocks other repos? It's on my list to add TG to central, but it's not a priority.
shouldn't be needed
no
to what? |
I'm on the confluent community slack if you want to chat in real time FYI |
Hi, I am working with
ReactorProcessor
and I would like to produce events. I tried the following code, but it did not write any new events into output topic.Example code:
lateinit var pConsumer: ReactorProcessor<String, JsonObject>
What am i missing?
The text was updated successfully, but these errors were encountered: