Skip to content

Commit

Permalink
[FLINK-10774] [tests] Refactor Kafka tests to have consistent life cy…
Browse files Browse the repository at this point in the history
…cle verifications
  • Loading branch information
tzulitai authored and tillrohrmann committed Jan 31, 2019
1 parent a9e18fa commit 232560d
Showing 1 changed file with 28 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.junit.Test;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -468,13 +469,7 @@ public void testClosePartitionDiscovererWhenOpenThrowException() throws Exceptio

final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(failingPartitionDiscoverer);

try {
setupConsumer(consumer);
fail("Exception should be thrown in open method");
} catch (RuntimeException e) {
assertThat(ExceptionUtils.findThrowable(e, t -> t.equals(failureCause)).isPresent(), is(true));
}
consumer.close();
testConsumerLifeCycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", failingPartitionDiscoverer.isClosed());
}

Expand All @@ -490,15 +485,7 @@ public void testClosePartitionDiscovererWhenCreateKafkaFetcherFails() throws Exc
testPartitionDiscoverer,
100L);

setupConsumer(consumer);

try {
consumer.run(new TestSourceContext<>());
fail("Exception should be thrown in run method");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true));
}
consumer.close();
testConsumerLifeCycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}

Expand All @@ -512,16 +499,7 @@ public void testClosePartitionDiscovererWhenKafkaFetcherFails() throws Exception

final DummyFlinkKafkaConsumer<String> consumer = new DummyFlinkKafkaConsumer<>(() -> mock, testPartitionDiscoverer, 100L);

setupConsumer(consumer);

try {
consumer.run(new TestSourceContext<>());
fail("Exception should be thrown in run method");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(failureCause)).isPresent(), is(true));
}
consumer.close();
consumer.joinDiscoveryLoopThread();
testConsumerLifeCycle(consumer, failureCause);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}

Expand All @@ -531,19 +509,33 @@ public void testClosePartitionDiscovererWithCancellation() throws Exception {

final TestingFlinkKafkaConsumer<String> consumer = new TestingFlinkKafkaConsumer<>(testPartitionDiscoverer, 100L);

setupConsumer(consumer);

CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> consumer.run(new TestSourceContext<>())));

consumer.close();

consumer.joinDiscoveryLoopThread();
runFuture.get();

testConsumerLifeCycle(consumer, null);
assertTrue("partitionDiscoverer should be closed when consumer is closed", testPartitionDiscoverer.isClosed());
}

protected void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
private void testConsumerLifeCycle(
FlinkKafkaConsumerBase<String> testKafkaConsumer,
@Nullable Exception expectedException) throws Exception {

if (expectedException == null) {
setupConsumer(testKafkaConsumer);
final CompletableFuture<Void> runFuture = CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> testKafkaConsumer.run(new TestSourceContext<>())));
testKafkaConsumer.close();
runFuture.get();
} else {
try {
setupConsumer(testKafkaConsumer);
testKafkaConsumer.run(new TestSourceContext<>());

fail("Exception should have been thrown from open / run method of FlinkKafkaConsumerBase.");
} catch (Exception e) {
assertThat(ExceptionUtils.findThrowable(e, throwable -> throwable.equals(expectedException)).isPresent(), is(true));
}
testKafkaConsumer.close();
}
}

private void setupConsumer(FlinkKafkaConsumerBase<String> consumer) throws Exception {
setupConsumer(
consumer,
false,
Expand Down

0 comments on commit 232560d

Please sign in to comment.