Skip to content

Commit

Permalink
[hotfix] Extend MockEnvironment to provide better testing tools
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Aug 21, 2019
1 parent 8935ebf commit 1a46d8d
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@
* IMPORTANT! Remember to close environment after usage!
*/
public class MockEnvironment implements Environment, AutoCloseable {

private final TaskInfo taskInfo;

private final ExecutionConfig executionConfig;

private final MemoryManager memManager;
Expand Down Expand Up @@ -107,9 +107,9 @@ public class MockEnvironment implements Environment, AutoCloseable {

private final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

private Optional<Class<Throwable>> expectedExternalFailureCause = Optional.empty();
private Optional<Class<? extends Throwable>> expectedExternalFailureCause = Optional.empty();

private Optional<Throwable> actualExternalFailureCause = Optional.empty();
private Optional<? extends Throwable> actualExternalFailureCause = Optional.empty();

private final TaskMetricGroup taskMetricGroup;

Expand Down Expand Up @@ -164,7 +164,6 @@ protected MockEnvironment(
this.taskMetricGroup = taskMetricGroup;
}


public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
try {
final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
Expand Down Expand Up @@ -346,11 +345,11 @@ public void close() throws Exception {
ioManager.close();
}

public void setExpectedExternalFailureCause(Class<Throwable> expectedThrowableClass) {
public void setExpectedExternalFailureCause(Class<? extends Throwable> expectedThrowableClass) {
this.expectedExternalFailureCause = Optional.of(expectedThrowableClass);
}

public Optional<Throwable> getActualExternalFailureCause() {
public Optional<? extends Throwable> getActualExternalFailureCause() {
return actualExternalFailureCause;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,12 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -195,13 +200,12 @@ public void handleAsyncException() throws Throwable {
streamConfig.setOperatorID(new OperatorID());

try (MockEnvironment mockEnvironment =
new MockEnvironmentBuilder()
.setTaskName("Test Task")
.setMemorySize(32L * 1024L)
.setInputSplitProvider(new MockInputSplitProvider())
.setBufferSize(1)
.setTaskConfiguration(taskConfiguration)
.build()) {
new MockEnvironmentBuilder()
.setTaskName("Test Task")
.setMemorySize(32L * 1024L)
.setBufferSize(1)
.setTaskConfiguration(taskConfiguration)
.build()) {

RunningTask<StreamTask<Void, BlockingCloseStreamOperator>> task = runTask(() -> new NoOpStreamTask<>(mockEnvironment));

Expand All @@ -210,16 +214,18 @@ public void handleAsyncException() throws Throwable {
// check that the StreamTask is not yet in isRunning == false
assertTrue(task.streamTask.isRunning());


// generate an error report and expect it to be caught by the Environment
mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
mockEnvironment.setExpectedExternalFailureCause(AsynchronousException.class);
task.streamTask.handleAsyncException("EXPECTED_ERROR MESSAGE", expectedException);

// expect an AsynchronousException containing the supplied error details
Optional<Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
AsynchronousException cause = (AsynchronousException)actualExternalFailureCause.get();
assertEquals(cause.getMessage(), "EXPECTED_ERROR MESSAGE");
assertEquals(cause.getCause(), expectedException);
Optional<? extends Throwable> actualExternalFailureCause = mockEnvironment.getActualExternalFailureCause();
final Throwable actualException = actualExternalFailureCause
.orElseThrow(() -> new AssertionError("Expected exceptional completion"));

assertThat(actualException, instanceOf(AsynchronousException.class));
assertThat(actualException.getMessage(), is("EXPECTED_ERROR MESSAGE"));
assertThat(actualException.getCause(), is(expectedException));
}
}

Expand Down

0 comments on commit 1a46d8d

Please sign in to comment.