Skip to content

Commit

Permalink
[FLINK-20114][connector/kafka] SourceOperatorStreamTask should check …
Browse files Browse the repository at this point in the history
…the committed offset first before using OffsetResetStrategy.

This is necessary to keep the same behavior as the legacy FlinkKafkaConsumer.
  • Loading branch information
lindong28 authored and becketqin committed Mar 29, 2021
1 parent b831b85 commit 8de1784
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public Map<TopicPartition, Long> getPartitionOffsets(
}
}
if (!toLookup.isEmpty()) {
// First check the committed offsets.
Map<TopicPartition, Long> committedOffsets =
partitionOffsetsRetriever.committedOffsets(toLookup);
offsets.putAll(committedOffsets);
toLookup.removeAll(committedOffsets.keySet());

switch (offsetResetStrategy) {
case EARLIEST:
offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@
/** Unit tests for {@link OffsetsInitializer}. */
public class OffsetsInitializerTest {
private static final String TOPIC = "topic";
private static final String TOPIC2 = "topic2";
private static KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl retriever;

@BeforeClass
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
KafkaSourceTestEnv.setupTopic(TOPIC, true, true);
KafkaSourceTestEnv.setupTopic(TOPIC2, false, false);
retriever =
new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl(
KafkaSourceTestEnv.getConsumer(),
Expand Down Expand Up @@ -116,19 +118,28 @@ public void testSpecificOffsetsInitializer() {
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, OffsetAndMetadata> committedOffsets =
KafkaSourceTestEnv.getCommittedOffsets(partitions);
committedOffsets.forEach((tp, oam) -> specifiedOffsets.put(tp, oam.offset()));
partitions.forEach(tp -> specifiedOffsets.put(tp, (long) tp.partition()));
// Remove the specified offsets for partition 0.
TopicPartition missingPartition = new TopicPartition(TOPIC, 0);
specifiedOffsets.remove(missingPartition);
TopicPartition partitionSetToCommitted = new TopicPartition(TOPIC, 0);
specifiedOffsets.remove(partitionSetToCommitted);
OffsetsInitializer initializer = OffsetsInitializer.offsets(specifiedOffsets);

assertEquals(OffsetResetStrategy.EARLIEST, initializer.getAutoOffsetResetStrategy());
// The partition without committed offset should fallback to offset reset strategy.
TopicPartition partitionSetToEarliest = new TopicPartition(TOPIC2, 0);
partitions.add(partitionSetToEarliest);

Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
for (TopicPartition tp : partitions) {
Long offset = offsets.get(tp);
long expectedOffset =
tp.equals(missingPartition) ? 0L : committedOffsets.get(tp).offset();
long expectedOffset;
if (tp.equals(partitionSetToCommitted)) {
expectedOffset = committedOffsets.get(tp).offset();
} else if (tp.equals(partitionSetToEarliest)) {
expectedOffset = 0L;
} else {
expectedOffset = specifiedOffsets.get(tp);
}
assertEquals(
String.format("%s has incorrect offset.", tp), expectedOffset, (long) offset);
}
Expand Down

0 comments on commit 8de1784

Please sign in to comment.