You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
After I pause my partition at offset X, stop processing it, then some time latter seek to X and resume partition, I expect Flux to return records from this offset.
Actual Behavior
Nothing is returned
Steps to Reproduce
I'm trying to implement @RetryableTopic behavior, but with reactor-kafka.
Here is my abstract class (if you don't want to read it, I've included logs in the end), that tries to do that (sorry if this code looks bad to you)
After that kafka fetcher polls 9 records I think (based on highWatermark/lastStableOffset):
2023-01-31 14:04:33.907 DEBUG 16996 --- [ka-test-group-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 1 for partition test-topic-retry-1-0 returned fetch data PartitionData(partitionIndex=0, errorCode=0, highWatermark=10, lastStableOffset=10, logStartOffset=0, divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1), currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1), snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null, preferredReadReplica=-1, records=MemoryRecords(size=2097, buffer=java.nio.HeapByteBuffer[pos=0 lim=2097 cap=2100]))
And that's it, no processing is done after that, records gets into test-topic-retry-1-0, kafka fetcher polls them (based on such logs):
DEBUG 16996 --- [ka-test-group-1] o.a.k.c.consumer.internals.Fetcher : [Consumer clientId=consumer-test-group-1, groupId=test-group] Fetch READ_UNCOMMITTED at offset 10 for partition test-topic-retry-1-0 returned fetch data
But my processing is not happening (Flux is empty for that partition).
Sorry if this was a long read. Am I doing something wrong?
Many thanks.
Your Environment
Reactor version(s) used: 1.3.13
Kafka-clients: 3.1.2
JVM version (java -version): temurin-1.8.0_345
OS and version (eg uname -a): Windows 10
Kafka: kafka_2.13-3.3.1
The text was updated successfully, but these errors were encountered:
This is way too much code/complexity for reporting a problem.
I suggest you strip it down to a simple reproducer with a single topic and the bare minimum code, so we can more easily follow the logic and the problem you are seeing.
Expected Behavior
After I pause my partition at offset X, stop processing it, then some time latter seek to X and resume partition, I expect Flux to return records from this offset.
Actual Behavior
Nothing is returned
Steps to Reproduce
I'm trying to implement @RetryableTopic behavior, but with reactor-kafka.
Here is my abstract class (if you don't want to read it, I've included logs in the end), that tries to do that (sorry if this code looks bad to you)
Then I have test implementation of this class:
And also test itself:
Logs
In the logs I see that record 5 gets into test-topic-retry-1-0 with offset 0
Then kafka Fetcher polls it:
Then it starts processing:
Then pausing partition (because execution time has not come yet):
Then partition is seeked to paused record and resumed:
Then kafka fetcher polls it again:
Then it starts processing, fails with error and gets into next retry-topic (till now everything is okay)
After some time record 20 gets into test-topic-retry-1-0 with offset 1:
Fetcher polls it and it starts processing and pauses partition:
While this partition was paused, some more records gets into it:
Then partition is seeked to paused record (record 20 with offset 1) and resumed:
After that kafka fetcher polls 9 records I think (based on highWatermark/lastStableOffset):
And that's it, no processing is done after that, records gets into test-topic-retry-1-0, kafka fetcher polls them (based on such logs):
But my processing is not happening (Flux is empty for that partition).
Sorry if this was a long read. Am I doing something wrong?
Many thanks.
Your Environment
java -version
): temurin-1.8.0_345uname -a
): Windows 10The text was updated successfully, but these errors were encountered: