You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Scenario is the following - we have 1 instance of an Application (Pod 1) consuming messages from 2 partitions. We start up a second instance of the Application (Pod 2). This causes the consumers to rebalance, but before that happens, we want to make sure that all the processed messages from Pod 1 successfully complete and get commited. Async commits do not work, because they do now commit before rebalancing (they will try to commit after, once rebalance is complete and will fail, because partitions no longer belong to Pod 1).
I expect to be able to make Synchronous commits in the Revocation Listener.
Actual Behavior
Calling Synchronous commit receiverOffset().commit().block() in the Revocation Listener will throw an error - java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactive-kafka-*. This is caused because single and parallel Schedulers do not support blocking calls.
reactor.kafka.receiver.internals.DefaultKafkaReceiver#createConsumerFlux contains scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get()); which creates a single Scheduler.
Scenario is the following - we have 1 instance of an Application (Pod 1) consuming messages from 2 partitions. We start up a second instance of the Application (Pod 2). This causes the consumers to rebalance, but before that happens, we want to make sure that all the processed messages from Pod 1 successfully complete and get commited. Async commits do not work, because they do now commit before rebalancing (they will try to commit after, once rebalance is complete and will fail, because partitions no longer belong to Pod 1).
https://projectreactor.io/docs/kafka/1.0.0.RELEASE/reference/#_partition_assignment_and_revocation_listeners states:
As mentioned above,
receiverOffset().commit()
is Async and will not be sufficient. We have to use Sync commit (also explained here https://stackoverflow.com/questions/49781671/kafka-rebalancing-and-listeners-pitfalls ).Expected Behavior
I expect to be able to make Synchronous commits in the Revocation Listener.
Actual Behavior
Calling Synchronous commit
receiverOffset().commit().block()
in the Revocation Listener will throw an error -java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactive-kafka-*
. This is caused because single and parallel Schedulers do not support blocking calls.reactor.kafka.receiver.internals.DefaultKafkaReceiver#createConsumerFlux
containsscheduler = Schedulers.single(receiverOptions.schedulerSupplier().get());
which creates a single Scheduler.Steps to Reproduce
Rest of the code is according to documentation - https://projectreactor.io/docs/kafka/1.0.0.RELEASE/reference/#concurrent-ordered
Possible Solution
...
Your Environment
netty
, ...):javar -version
): 11.0.2uname -a
): Windows 10 Version 1903The text was updated successfully, but these errors were encountered: