Skip to content

Commit

Permalink
[FLINK-8703][tests] Migrate tests to MiniClusterResource (batch apache#2
Browse files Browse the repository at this point in the history
)

This closes apache#5542.
  • Loading branch information
zentol committed Feb 26, 2018
1 parent 0ae7364 commit 3e056b3
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

package org.apache.flink.ml.util

import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.test.util.{TestBaseUtils, TestEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.test.util.MiniClusterResource
import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
import org.scalatest.{BeforeAndAfter, Suite}

/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
Expand Down Expand Up @@ -51,27 +52,21 @@ import org.scalatest.{BeforeAndAfter, Suite}
trait FlinkTestBase extends BeforeAndAfter {
that: Suite =>

var cluster: Option[LocalFlinkMiniCluster] = None
var cluster: Option[MiniClusterResource] = None
val parallelism = 4

before {
val cl = TestBaseUtils.startCluster(
1,
parallelism,
false,
false,
true)

val clusterEnvironment = new TestEnvironment(cl, parallelism, false)
clusterEnvironment.setAsContext()
val cl = new MiniClusterResource(
new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
)

cl.before()

cluster = Some(cl)
}

after {
cluster.foreach(c => TestBaseUtils.stopCluster(c, TestBaseUtils.DEFAULT_TIMEOUT))

TestEnvironment.unsetAsContext()
cluster.foreach(c => c.after())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,32 @@

package org.apache.flink.streaming.api.scala

import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.test.util.TestBaseUtils

import org.apache.flink.configuration.Configuration
import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration
import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
import org.junit.{After, Before}

import org.scalatest.junit.JUnitSuiteLike

trait ScalaStreamingMultipleProgramsTestBase
extends TestBaseUtils
with JUnitSuiteLike {

val parallelism = 4
var cluster: Option[LocalFlinkMiniCluster] = None
var cluster: Option[MiniClusterResource] = None

@Before
def beforeAll(): Unit = {
val cluster = Some(
TestBaseUtils.startCluster(
1,
parallelism,
false,
false,
true
)
val cl = new MiniClusterResource(
new MiniClusterResourceConfiguration(new Configuration(), 1, parallelism)
)

TestStreamEnvironment.setAsContext(cluster.get, parallelism)
cl.before()

cluster = Some(cl)
}

@After
def afterAll(): Unit = {
TestStreamEnvironment.unsetAsContext()
cluster.foreach {
TestBaseUtils.stopCluster(_, TestBaseUtils.DEFAULT_TIMEOUT)
}
cluster.foreach { c => c.after() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,32 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;

import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -65,9 +62,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
Expand All @@ -91,62 +85,57 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog
private static final int MAX_MEM_STATE_SIZE = 20 * 1024 * 1024;
private static final int PARALLELISM = 4;

private static LocalFlinkMiniCluster cluster;
private TestingServer zkServer;

private static TestStreamEnvironment env;

private static TestingServer zkServer;

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@ClassRule
public static TemporaryFolder tempFolder = new TemporaryFolder();

@Rule
public TestName name = new TestName();

private StateBackendEnum stateBackendEnum;
protected AbstractStateBackend stateBackend;
private AbstractStateBackend stateBackend;

AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) {
this.stateBackendEnum = stateBackendEnum;
}
@Rule
public final MiniClusterResource miniClusterResource = getMiniClusterResource();

enum StateBackendEnum {
MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
}

@Before
public void startTestCluster() throws Exception {
protected abstract StateBackendEnum getStateBackend();

protected final MiniClusterResource getMiniClusterResource() {
return new MiniClusterResource(
new MiniClusterResource.MiniClusterResourceConfiguration(
getConfigurationSafe(),
2,
PARALLELISM / 2));
}

private Configuration getConfigurationSafe() {
try {
return getConfiguration();
} catch (Exception e) {
throw new AssertionError("Could not initialize test.", e);
}
}

private Configuration getConfiguration() throws Exception {

// print a message when starting a test method to avoid Travis' <tt>"Maven produced no
// output for xxx seconds."</tt> messages
System.out.println(
"Starting " + getClass().getCanonicalName() + "#" + name.getMethodName() + ".");

// Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints
StateBackendEnum stateBackendEnum = getStateBackend();
if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
zkServer = new TestingServer();
zkServer.start();
}

Configuration config = createClusterConfig();

// purposefully delay in the executor to tease out races
final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
config,
new Executor() {
@Override
public void execute(Runnable command) {
executor.schedule(command, 500, MILLISECONDS);
}
});

cluster = new LocalFlinkMiniCluster(config, haServices, false);
cluster.start();

env = new TestStreamEnvironment(cluster, PARALLELISM);
env.getConfig().setUseSnapshotCompression(true);

switch (stateBackendEnum) {
case MEM:
this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
Expand Down Expand Up @@ -190,6 +179,7 @@ public void execute(Runnable command) {
default:
throw new IllegalStateException("No backend selected.");
}
return config;
}

protected Configuration createClusterConfig() throws IOException {
Expand All @@ -198,8 +188,6 @@ protected Configuration createClusterConfig() throws IOException {
final File haDir = temporaryFolder.newFolder();

Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2);
config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 48L);
// the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case
config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB
Expand All @@ -215,11 +203,6 @@ protected Configuration createClusterConfig() throws IOException {

@After
public void stopTestCluster() throws IOException {
if (cluster != null) {
cluster.stop();
cluster = null;
}

if (zkServer != null) {
zkServer.stop();
zkServer = null;
Expand All @@ -241,12 +224,14 @@ public void testTumblingTimeWindow() {
FailingSource.reset();

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);

env
.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
Expand Down Expand Up @@ -310,13 +295,15 @@ public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
FailingSource.reset();

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setMaxParallelism(maxParallelism);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);

env
.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
Expand Down Expand Up @@ -376,6 +363,7 @@ public void testSlidingTimeWindow() {
FailingSource.reset();

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(2 * PARALLELISM);
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Expand Down Expand Up @@ -438,12 +426,14 @@ public void testPreAggregatedTumblingTimeWindow() {
FailingSource.reset();

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);

env
.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
Expand Down Expand Up @@ -507,12 +497,14 @@ public void testPreAggregatedSlidingTimeWindow() {
FailingSource.reset();

try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
env.getConfig().disableSysoutLogging();
env.setStateBackend(this.stateBackend);
env.getConfig().setUseSnapshotCompression(true);

env
.addSource(new FailingSource(numKeys, numElementsPerKey, numElementsPerKey / 3))
Expand Down
Loading

0 comments on commit 3e056b3

Please sign in to comment.