Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronous Commits in RevokeListener #130

Open
andreasellervee opened this issue Apr 16, 2020 · 0 comments
Open

Synchronous Commits in RevokeListener #130

andreasellervee opened this issue Apr 16, 2020 · 0 comments

Comments

@andreasellervee
Copy link

Scenario is the following - we have 1 instance of an Application (Pod 1) consuming messages from 2 partitions. We start up a second instance of the Application (Pod 2). This causes the consumers to rebalance, but before that happens, we want to make sure that all the processed messages from Pod 1 successfully complete and get commited. Async commits do not work, because they do now commit before rebalancing (they will try to commit after, once rebalance is complete and will fail, because partitions no longer belong to Pod 1).

https://projectreactor.io/docs/kafka/1.0.0.RELEASE/reference/#_partition_assignment_and_revocation_listeners states:

Revocation listeners can be used to commit processed offsets when manual commits are used.

As mentioned above, receiverOffset().commit() is Async and will not be sufficient. We have to use Sync commit (also explained here https://stackoverflow.com/questions/49781671/kafka-rebalancing-and-listeners-pitfalls ).

Expected Behavior

I expect to be able to make Synchronous commits in the Revocation Listener.

Actual Behavior

Calling Synchronous commit receiverOffset().commit().block() in the Revocation Listener will throw an error - java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactive-kafka-*. This is caused because single and parallel Schedulers do not support blocking calls.

reactor.kafka.receiver.internals.DefaultKafkaReceiver#createConsumerFlux contains scheduler = Schedulers.single(receiverOptions.schedulerSupplier().get()); which creates a single Scheduler.

Steps to Reproduce

@Test
public void repoCase() {
var receiverOptions = ReceiverOptions.create(getConsumerProperties())
	 .addRevokeListener(revokedPartitions -> {
		 latestNonCommitedEvent.receiverOffset().commit().block();
	 })
}

Rest of the code is according to documentation - https://projectreactor.io/docs/kafka/1.0.0.RELEASE/reference/#concurrent-ordered

Possible Solution

...

Your Environment

  • Reactor version(s) used:
    • Reactor Core 3.3.0.RELEASE
    • Reactor Kafka 1.2.1.RELEASE
  • Other relevant libraries versions (eg. netty, ...):
    • Spring Boot WebFlux 2.2.1.RELEASE
  • JVM version (javar -version): 11.0.2
  • OS and version (eg uname -a): Windows 10 Version 1903
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant