Skip to content

Commit

Permalink
[FLINK-3061] Properly fail Kafka Consumer if broker is not available
Browse files Browse the repository at this point in the history
This closes apache#1395
  • Loading branch information
rmetzger committed Nov 30, 2015
1 parent 3e9d33e commit 209ae6c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializ
// Connect to a broker to get the partitions
List<PartitionInfo> partitionInfos = getPartitionsForTopic(topic, props);

if (partitionInfos.size() == 0) {
throw new RuntimeException("Unable to retrieve any partitions for topic " + topic + "." +
"Please check previous log entries");
}

// get initial partitions list. The order of the partitions is important for consistent
// partition id assignment in restart cases.
this.partitions = new int[partitionInfos.size()];
Expand Down Expand Up @@ -424,7 +429,11 @@ public void run(SourceContext<T> sourceContext) throws Exception {
} finally {
if (offsetCommitter != null) {
offsetCommitter.close();
offsetCommitter.join();
try {
offsetCommitter.join();
} catch(InterruptedException ie) {
// ignore interrupt
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,35 @@ protected abstract <T> FlinkKafkaConsumer<T> getConsumer(
// select which tests to run.
// ------------------------------------------------------------------------


/**
* Test that ensures the KafkaConsumer is properly failing if the topic doesnt exist
* and a wrong broker was specified
*
* @throws Exception
*/
public void runFailOnNoBrokerTest() throws Exception {
try {
Properties properties = new Properties();

StreamExecutionEnvironment see = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
see.getConfig().disableSysoutLogging();
see.setNumberOfExecutionRetries(0);
see.setParallelism(1);

// use wrong ports for the consumers
properties.setProperty("bootstrap.servers", "localhost:80");
properties.setProperty("zookeeper.connect", "localhost:80");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer<String> source = getConsumer("doesntexist", new SimpleStringSchema(), properties);
DataStream<String> stream = see.addSource(source);
stream.print();
see.execute("No broker test");
} catch(RuntimeException re){
Assert.assertTrue("Wrong RuntimeException thrown",
re.getMessage().contains("Unable to retrieve any partitions for topic"));
}
}
/**
* Test that validates that checkpointing and checkpoint notification works properly
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public void testCheckpointing() throws Exception {
runCheckpointingTest();
}

@Test()
public void testFailOnNoBroker() throws Exception {
runFailOnNoBrokerTest();
}

@Test
public void testOffsetInZookeeper() throws Exception {
runOffsetInZookeeperValidationTest();
Expand Down

0 comments on commit 209ae6c

Please sign in to comment.