Skip to content

Commit

Permalink
[FLINK-3156] Fix NPE in KafkaConsumer when reading from SOME empty to…
Browse files Browse the repository at this point in the history
…pics/partitions
  • Loading branch information
rmetzger committed Dec 10, 2015
1 parent 74b535d commit c4a2d60
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,12 @@ public void close() {
private static <T> void commitOffsets(HashMap<KafkaTopicPartition, Long> toCommit, FlinkKafkaConsumer<T> consumer) throws Exception {
Map<KafkaTopicPartition, Long> offsetsToCommit = new HashMap<>();
for (KafkaTopicPartitionLeader tp : consumer.subscribedPartitions) {
long offset = toCommit.get(tp.getTopicPartition());
Long offset = toCommit.get(tp.getTopicPartition());
if(offset == null) {
// There was no data ever consumed from this topic, that's why there is no entry
// for this topicPartition in the map.
continue;
}
Long lastCommitted = consumer.committedOffsets.get(tp.getTopicPartition());
if (lastCommitted == null) {
lastCommitted = OFFSET_NOT_SET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
// ------------------------------------------------------------------------

protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
String topic, DeserializationSchema<T> deserializationSchema, Properties props);
List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props);

protected <T> FlinkKafkaConsumer<T> getConsumer(
String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
return getConsumer(Collections.singletonList(topic), deserializationSchema, props);
}

// ------------------------------------------------------------------------
// Suite of Tests
Expand Down Expand Up @@ -343,19 +348,34 @@ public void runOffsetAutocommitTest() throws Exception {
* We need to externally retry this test. We cannot let Flink's retry mechanism do it, because the Kafka producer
* does not guarantee exactly-once output. Hence a recovery would introduce duplicates that
* cause the test to fail.
*
* This test also ensures that FLINK-3156 doesn't happen again:
*
* The following situation caused a NPE in the FlinkKafkaConsumer
*
* topic-1 <-- elements are only produced into topic1.
* topic-2
*
* Therefore, this test is consuming as well from an empty topic.
*
*/
@RetryOnException(times=2, exception=kafka.common.NotLeaderForPartitionException.class)
public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
final String topic = "concurrentProducerConsumerTopic_" + UUID.randomUUID().toString();
final String additionalEmptyTopic = "additionalEmptyTopic_" + UUID.randomUUID().toString();

final int parallelism = 3;
final int elementsPerPartition = 100;
final int totalElements = parallelism * elementsPerPartition;

createTestTopic(topic, parallelism, 2);
createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will remain empty all the time

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
env.setParallelism(parallelism);
env.enableCheckpointing(500);
env.setNumberOfExecutionRetries(0); // fail immediately
env.getConfig().disableSysoutLogging();

TypeInformation<Tuple2<Long, String>> longStringType = TypeInfoParser.parse("Tuple2<Long, String>");
Expand All @@ -373,14 +393,17 @@ public void runSimpleConcurrentProducerConsumerTopology() throws Exception {
private boolean running = true;

@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) {
public void run(SourceContext<Tuple2<Long, String>> ctx) throws InterruptedException {
int cnt = getRuntimeContext().getIndexOfThisSubtask() * elementsPerPartition;
int limit = cnt + elementsPerPartition;


while (running && cnt < limit) {
ctx.collect(new Tuple2<>(1000L + cnt, "kafka-" + cnt));
cnt++;
// we delay data generation a bit so that we are sure that some checkpoints are
// triggered (for FLINK-3156)
Thread.sleep(50);
}
}

Expand All @@ -393,7 +416,10 @@ public void cancel() {

// ----------- add consumer dataflow ----------

FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topic, sourceSchema, standardProps);
List<String> topics = new ArrayList<>();
topics.add(topic);
topics.add(additionalEmptyTopic);
FlinkKafkaConsumer<Tuple2<Long, String>> source = getConsumer(topics, sourceSchema, standardProps);

DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(source).setParallelism(parallelism);

Expand Down Expand Up @@ -1047,8 +1073,6 @@ public void runKeyValueTest() throws Exception {
createTestTopic(topic, 1, 1);
final int ELEMENT_COUNT = 5000;



// ----------- Write some data into Kafka -------------------

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
Expand Down Expand Up @@ -1111,6 +1135,8 @@ public void flatMap(Tuple2<Long, PojoValue> value, Collector<Object> out) throws
});

tryExecute(env, "Read KV from Kafka");

deleteTestTopic(topic);
}

public static class PojoValue {
Expand All @@ -1121,6 +1147,7 @@ public PojoValue() {}
}



// ------------------------------------------------------------------------
// Reading writing test data sets
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import org.junit.Test;

import java.util.List;
import java.util.Properties;


public class KafkaITCase extends KafkaConsumerTestBase {

@Override
protected <T> FlinkKafkaConsumer<T> getConsumer(String topic, DeserializationSchema<T> deserializationSchema, Properties props) {
return new FlinkKafkaConsumer081<>(topic, deserializationSchema, props);
protected <T> FlinkKafkaConsumer<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) {
return new FlinkKafkaConsumer082<>(topics, deserializationSchema, props);
}

// ------------------------------------------------------------------------
Expand Down

0 comments on commit c4a2d60

Please sign in to comment.