diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java index 186e2d5a19906..d3335de9604b6 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/initializer/SpecifiedOffsetsInitializer.java @@ -60,6 +60,12 @@ public Map getPartitionOffsets( } } if (!toLookup.isEmpty()) { + // First check the committed offsets. + Map committedOffsets = + partitionOffsetsRetriever.committedOffsets(toLookup); + offsets.putAll(committedOffsets); + toLookup.removeAll(committedOffsets.keySet()); + switch (offsetResetStrategy) { case EARLIEST: offsets.putAll(partitionOffsetsRetriever.beginningOffsets(toLookup)); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java index 94d0c30e3f930..0e84882300ecb 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java @@ -41,12 +41,14 @@ /** Unit tests for {@link OffsetsInitializer}. */ public class OffsetsInitializerTest { private static final String TOPIC = "topic"; + private static final String TOPIC2 = "topic2"; private static KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl retriever; @BeforeClass public static void setup() throws Throwable { KafkaSourceTestEnv.setup(); KafkaSourceTestEnv.setupTopic(TOPIC, true, true); + KafkaSourceTestEnv.setupTopic(TOPIC2, false, false); retriever = new KafkaSourceEnumerator.PartitionOffsetsRetrieverImpl( KafkaSourceTestEnv.getConsumer(), @@ -116,19 +118,28 @@ public void testSpecificOffsetsInitializer() { List partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC); Map committedOffsets = KafkaSourceTestEnv.getCommittedOffsets(partitions); - committedOffsets.forEach((tp, oam) -> specifiedOffsets.put(tp, oam.offset())); + partitions.forEach(tp -> specifiedOffsets.put(tp, (long) tp.partition())); // Remove the specified offsets for partition 0. - TopicPartition missingPartition = new TopicPartition(TOPIC, 0); - specifiedOffsets.remove(missingPartition); + TopicPartition partitionSetToCommitted = new TopicPartition(TOPIC, 0); + specifiedOffsets.remove(partitionSetToCommitted); OffsetsInitializer initializer = OffsetsInitializer.offsets(specifiedOffsets); assertEquals(OffsetResetStrategy.EARLIEST, initializer.getAutoOffsetResetStrategy()); + // The partition without committed offset should fallback to offset reset strategy. + TopicPartition partitionSetToEarliest = new TopicPartition(TOPIC2, 0); + partitions.add(partitionSetToEarliest); Map offsets = initializer.getPartitionOffsets(partitions, retriever); for (TopicPartition tp : partitions) { Long offset = offsets.get(tp); - long expectedOffset = - tp.equals(missingPartition) ? 0L : committedOffsets.get(tp).offset(); + long expectedOffset; + if (tp.equals(partitionSetToCommitted)) { + expectedOffset = committedOffsets.get(tp).offset(); + } else if (tp.equals(partitionSetToEarliest)) { + expectedOffset = 0L; + } else { + expectedOffset = specifiedOffsets.get(tp); + } assertEquals( String.format("%s has incorrect offset.", tp), expectedOffset, (long) offset); }