Skip to content

Commit

Permalink
[FLINK-10637] Use MiniClusterResource for tests in flink-runtime
Browse files Browse the repository at this point in the history
Rename MiniClusterResource into MiniClusterWithClientResource and move base functionality
in the form of MiniClusterResource into flink-runtime. The new base class simply starts a
MiniCluster with random ports.

Moreover, this commit lets almost all tests in flink-runtime use the MiniClusterResource
to guarantee that there are no port conflicts between concurrently executed tests.

This closes apache#6899.
  • Loading branch information
tillrohrmann committed Oct 24, 2018
1 parent d643982 commit bf64740
Show file tree
Hide file tree
Showing 62 changed files with 815 additions and 750 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -32,8 +34,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.Collector;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestingSecurityContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.InstantiationUtil;

import org.junit.AfterClass;
Expand Down Expand Up @@ -70,7 +70,7 @@ public class KafkaShortRetentionTestBase implements Serializable {
private static Properties standardProps;

@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.jmx.JMXReporter;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -81,7 +81,7 @@ public abstract class KafkaTestBase extends TestLogger {
protected static Properties standardProps;

@ClassRule
public static MiniClusterResource flink = new MiniClusterResource(
public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getFlinkConfiguration())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.flink.streaming.test.examples.windowing;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing;
import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -41,7 +41,7 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
public static TemporaryFolder temporaryFolder = new TemporaryFolder();

@ClassRule
public static MiniClusterResource miniClusterResource = new MiniClusterResource(
public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -71,7 +71,7 @@ public class DistributedCacheDfsTest extends TestLogger {
public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.flink.ml.util

import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration}
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.{BeforeAndAfter, Suite}

/** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests.
Expand Down Expand Up @@ -50,11 +51,11 @@ import org.scalatest.{BeforeAndAfter, Suite}
trait FlinkTestBase extends BeforeAndAfter {
that: Suite =>

var cluster: Option[MiniClusterResource] = None
var cluster: Option[MiniClusterWithClientResource] = None
val parallelism = 4

before {
val cl = new MiniClusterResource(
val cl = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.ViewEntry;
Expand All @@ -40,8 +41,7 @@
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.TypedResult;
import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class LocalExecutorITCase extends TestLogger {
public static TemporaryFolder tempFolder = new TemporaryFolder();

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
Expand All @@ -62,7 +62,7 @@
public class JMXJobManagerMetricTest extends TestLogger {

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfiguration())
.setNumberSlotsPerTaskManager(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
Expand All @@ -55,7 +55,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB

private static TestingServer zkServer;

private static MiniClusterResource miniClusterResource;
private static MiniClusterWithClientResource miniClusterResource;

@Override
protected AbstractStateBackend createStateBackend() throws Exception {
Expand All @@ -68,7 +68,7 @@ public static void setup() throws Exception {

// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
miniClusterResource = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.apache.curator.test.TestingServer;
import org.junit.AfterClass;
Expand All @@ -55,7 +55,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState

private static TestingServer zkServer;

private static MiniClusterResource miniClusterResource;
private static MiniClusterWithClientResource miniClusterResource;

@Override
protected AbstractStateBackend createStateBackend() throws Exception {
Expand All @@ -68,7 +68,7 @@ public static void setup() throws Exception {

// we have to manage this manually because we have to create the ZooKeeper server
// ahead of this
miniClusterResource = new MiniClusterResource(
miniClusterResource = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -52,7 +52,7 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.queryablestate.client.QueryableStateClient;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand All @@ -52,7 +52,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@ClassRule
public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(getConfig())
.setNumberTaskManagers(NUM_TMS)
Expand Down
Loading

0 comments on commit bf64740

Please sign in to comment.