Skip to content

Commit

Permalink
[FLINK-8298][tests] Properly shutdown MockEnvironment to release reso…
Browse files Browse the repository at this point in the history
…urces

This closes apache#5193.
  • Loading branch information
pnowojski authored and tzulitai committed Jan 6, 2018
1 parent e8d1aa5 commit 091a370
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@
import java.util.Map;
import java.util.concurrent.Future;

import static org.apache.flink.util.Preconditions.checkState;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockEnvironment implements Environment {
/**
* IMPORTANT! Remember to close environment after usage!
*/
public class MockEnvironment implements Environment, AutoCloseable {

private final TaskInfo taskInfo;

Expand Down Expand Up @@ -376,4 +380,17 @@ public void declineCheckpoint(long checkpointId, Throwable cause) {
public void failExternally(Throwable cause) {
throw new UnsupportedOperationException("MockEnvironment does not support external task failure.");
}

@Override
public void close() {
// close() method should be idempotent and calling memManager.verifyEmpty() will throw after it was shutdown.
if (!memManager.isShutdown()) {
checkState(memManager.verifyEmpty(), "Memory Manager managed memory was not completely freed.");
}

memManager.shutdown();
ioManager.shutdown();

checkState(ioManager.isProperlyShutDown(), "IO Manager has not properly shut down.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
Expand All @@ -33,12 +32,13 @@
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.Record;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Assert;

import java.util.List;

Expand Down Expand Up @@ -145,19 +145,7 @@ public MemoryManager getMemoryManager() {
}

@After
public void shutdownIOManager() throws Exception {
this.mockEnv.getIOManager().shutdown();
Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
}

@After
public void shutdownMemoryManager() throws Exception {
if (this.memorySize > 0) {
MemoryManager memMan = getMemoryManager();
if (memMan != null) {
Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
memMan.shutdown();
}
}
public void shutdown() {
mockEnv.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
Expand Down Expand Up @@ -63,26 +64,29 @@ private void testFormatLifecycle(final boolean midCancel) throws Exception {

final LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
final InputFormatSourceFunction<Integer> reader = new InputFormatSourceFunction<>(format, TypeInformation.of(Integer.class));
reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits));

Assert.assertTrue(!format.isConfigured);
Assert.assertTrue(!format.isInputFormatOpen);
Assert.assertTrue(!format.isSplitOpen);
try (MockEnvironment environment = new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16)) {
reader.setRuntimeContext(new MockRuntimeContext(format, noOfSplits, environment));

reader.open(new Configuration());
Assert.assertTrue(format.isConfigured);
Assert.assertTrue(!format.isConfigured);
Assert.assertTrue(!format.isInputFormatOpen);
Assert.assertTrue(!format.isSplitOpen);

TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
reader.run(ctx);
reader.open(new Configuration());
Assert.assertTrue(format.isConfigured);

int splitsSeen = ctx.getSplitsSeen();
Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);
TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, cancelAt);
reader.run(ctx);

// we have exhausted the splits so the
// format and splits should be closed by now
int splitsSeen = ctx.getSplitsSeen();
Assert.assertTrue(midCancel ? splitsSeen == cancelAt : splitsSeen == noOfSplits);

Assert.assertTrue(!format.isSplitOpen);
Assert.assertTrue(!format.isInputFormatOpen);
// we have exhausted the splits so the
// format and splits should be closed by now

Assert.assertTrue(!format.isSplitOpen);
Assert.assertTrue(!format.isInputFormatOpen);
}
}

private static class LifeCycleTestInputFormat extends RichInputFormat<Integer, InputSplit> {
Expand Down Expand Up @@ -255,10 +259,9 @@ private static class MockRuntimeContext extends StreamingRuntimeContext {
private final LifeCycleTestInputFormat format;
private InputSplit[] inputSplits;

private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits) {

private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits, Environment environment) {
super(new MockStreamOperator(),
new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
environment,
Collections.<String, Accumulator<?, ?>>emptyMap());

this.noOfSplits = noOfSplits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,25 +147,30 @@ public void invoke(String value) throws Exception {
StreamMap<Integer, Integer> headOperator =
streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());

StreamTask<Integer, StreamMap<Integer, Integer>> mockTask =
createMockTask(streamConfig, chainedVertex.getName());
try (MockEnvironment environment = createMockEnvironment(chainedVertex.getName())) {
StreamTask<Integer, StreamMap<Integer, Integer>> mockTask = createMockTask(streamConfig, environment);

OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);
OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);

headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());

for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.open();
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.open();
}
}
}

headOperator.processElement(new StreamRecord<>(1));
headOperator.processElement(new StreamRecord<>(2));
headOperator.processElement(new StreamRecord<>(3));
headOperator.processElement(new StreamRecord<>(1));
headOperator.processElement(new StreamRecord<>(2));
headOperator.processElement(new StreamRecord<>(3));

assertThat(sink1Results, contains("First: 1", "First: 2", "First: 3"));
assertThat(sink2Results, contains("Second: 1", "Second: 2", "Second: 3"));
}
}

assertThat(sink1Results, contains("First: 1", "First: 2", "First: 3"));
assertThat(sink2Results, contains("Second: 1", "Second: 2", "Second: 3"));
private MockEnvironment createMockEnvironment(String taskName) {
return new MockEnvironment(taskName, 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
}

@Test
Expand Down Expand Up @@ -287,38 +292,40 @@ public void invoke(String value) throws Exception {
StreamMap<Integer, Integer> headOperator =
streamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());

StreamTask<Integer, StreamMap<Integer, Integer>> mockTask =
createMockTask(streamConfig, chainedVertex.getName());
try (MockEnvironment environment = createMockEnvironment(chainedVertex.getName())) {
StreamTask<Integer, StreamMap<Integer, Integer>> mockTask = createMockTask(streamConfig, environment);

OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);
OperatorChain<Integer, StreamMap<Integer, Integer>> operatorChain = new OperatorChain<>(mockTask);

headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());
headOperator.setup(mockTask, streamConfig, operatorChain.getChainEntryPoint());

for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.open();
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.open();
}
}
}

headOperator.processElement(new StreamRecord<>(1));
headOperator.processElement(new StreamRecord<>(2));
headOperator.processElement(new StreamRecord<>(3));
headOperator.processElement(new StreamRecord<>(1));
headOperator.processElement(new StreamRecord<>(2));
headOperator.processElement(new StreamRecord<>(3));

assertThat(sink1Results, contains("First 1: 1"));
assertThat(sink2Results, contains("First 2: 1"));
assertThat(sink3Results, contains("Second: 2", "Second: 3"));
assertThat(sink1Results, contains("First 1: 1"));
assertThat(sink2Results, contains("First 2: 1"));
assertThat(sink3Results, contains("Second: 2", "Second: 3"));
}
}

private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(StreamConfig streamConfig, String taskName) {
private <IN, OT extends StreamOperator<IN>> StreamTask<IN, OT> createMockTask(
StreamConfig streamConfig,
Environment environment) {
final Object checkpointLock = new Object();
final Environment env = new MockEnvironment(taskName, 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);

@SuppressWarnings("unchecked")
StreamTask<IN, OT> mockTask = mock(StreamTask.class);
when(mockTask.getName()).thenReturn("Mock Task");
when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
when(mockTask.getConfiguration()).thenReturn(streamConfig);
when(mockTask.getEnvironment()).thenReturn(env);
when(mockTask.getEnvironment()).thenReturn(environment);
when(mockTask.getExecutionConfig()).thenReturn(new ExecutionConfig().enableObjectReuse());

return mockTask;
Expand Down
Loading

0 comments on commit 091a370

Please sign in to comment.