Skip to content

Commit

Permalink
[FLINK-7732][kafka-consumer] Do not commit to kafka Flink's sentinel …
Browse files Browse the repository at this point in the history
…offsets

This closes apache#4928.
  • Loading branch information
pnowojski authored and tzulitai committed Nov 2, 2017
1 parent b7d3589 commit c61d186
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ protected TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition parti
// ------------------------------------------------------------------------

@Override
public void commitInternalOffsetsToKafka(
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.Map;
import java.util.Properties;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API.
*
Expand Down Expand Up @@ -212,7 +214,7 @@ public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition)
}

@Override
public void commitInternalOffsetsToKafka(
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {

Expand All @@ -224,6 +226,8 @@ public void commitInternalOffsetsToKafka(
for (KafkaTopicPartitionState<TopicPartition> partition : partitions) {
Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
if (lastProcessedOffset != null) {
checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

// committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
// This does not affect Flink's checkpoints/saved state.
long offsetToCommit = lastProcessedOffset + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -242,10 +243,25 @@ protected final List<KafkaTopicPartitionState<KPH>> subscribedPartitionStates()
* @param commitCallback The callback that the user should trigger when a commit request completes or fails.
* @throws Exception This method forwards exceptions.
*/
public abstract void commitInternalOffsetsToKafka(
public final void commitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
// Ignore sentinels. They might appear here if snapshot has started before actual offsets values
// replaced sentinels
doCommitInternalOffsetsToKafka(filterOutSentinels(offsets), commitCallback);
}

protected abstract void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception;

private Map<KafkaTopicPartition, Long> filterOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
return offsets.entrySet()
.stream()
.filter(entry -> !KafkaTopicPartitionStateSentinel.isSentinel(entry.getValue()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
}

/**
* Creates the Kafka version specific representation of the given
* topic partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,7 @@ public class KafkaTopicPartitionStateSentinel {
*/
public static final long GROUP_OFFSET = -915623761773L;

public static boolean isSentinel(long offset) {
return offset < 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -46,6 +48,42 @@
@SuppressWarnings("serial")
public class AbstractFetcherTest {

@Test
public void testIgnorePartitionStateSentinelInSnapshot() throws Exception {
final String testTopic = "test topic name";
Map<KafkaTopicPartition, Long> originalPartitions = new HashMap<>();
originalPartitions.put(new KafkaTopicPartition(testTopic, 1), KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
originalPartitions.put(new KafkaTopicPartition(testTopic, 2), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
originalPartitions.put(new KafkaTopicPartition(testTopic, 3), KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);

TestSourceContext<Long> sourceContext = new TestSourceContext<>();

TestFetcher<Long> fetcher = new TestFetcher<>(
sourceContext,
originalPartitions,
null,
null,
mock(TestProcessingTimeService.class),
0);

synchronized (sourceContext.getCheckpointLock()) {
HashMap<KafkaTopicPartition, Long> currentState = fetcher.snapshotCurrentState();
fetcher.commitInternalOffsetsToKafka(currentState, new KafkaCommitCallback() {
@Override
public void onSuccess() {
}

@Override
public void onException(Throwable cause) {
throw new RuntimeException("Callback failed", cause);
}
});

assertTrue(fetcher.getLastCommittedOffsets().isPresent());
assertEquals(Collections.emptyMap(), fetcher.getLastCommittedOffsets().get());
}
}

// ------------------------------------------------------------------------
// Record emitting tests
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -327,6 +365,7 @@ public void testPeriodicWatermarks() throws Exception {
// ------------------------------------------------------------------------

private static final class TestFetcher<T> extends AbstractFetcher<T, Object> {
protected Optional<Map<KafkaTopicPartition, Long>> lastCommittedOffsets = Optional.empty();

protected TestFetcher(
SourceContext<T> sourceContext,
Expand Down Expand Up @@ -362,10 +401,15 @@ public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
}

@Override
public void commitInternalOffsetsToKafka(
protected void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback callback) throws Exception {
throw new UnsupportedOperationException();
lastCommittedOffsets = Optional.of(offsets);
callback.onSuccess();
}

public Optional<Map<KafkaTopicPartition, Long>> getLastCommittedOffsets() {
return lastCommittedOffsets;
}
}

Expand Down

0 comments on commit c61d186

Please sign in to comment.