diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 86821a5dd2f6c..56c64aef9eded 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -24,6 +24,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -32,8 +34,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java index 7296269694505..d4feac177f2c4 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkSecuredITCase.java @@ -25,8 +25,8 @@ import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.modules.HadoopModule; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.util.SecureTestEnvironment; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.test.util.TestingSecurityContext; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 3fd687397a16e..9a11e7f483418 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -31,8 +32,7 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; @@ -70,7 +70,7 @@ public class KafkaShortRetentionTestBase implements Serializable { private static Properties standardProps; @ClassRule - public static MiniClusterResource flink = new MiniClusterResource( + public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index 05307acfe59a3..1a20d7e82adf4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -24,10 +24,10 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.metrics.jmx.JMXReporter; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; @@ -81,7 +81,7 @@ public abstract class KafkaTestBase extends TestLogger { protected static Properties standardProps; @ClassRule - public static MiniClusterResource flink = new MiniClusterResource( + public static MiniClusterWithClientResource flink = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getFlinkConfiguration()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java index ab8d6b99b4e4f..d569866185255 100644 --- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.test.examples.windowing; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.examples.windowing.TopSpeedWindowing; import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.FileUtils; import org.apache.flink.util.TestLogger; @@ -41,7 +41,7 @@ public class TopSpeedWindowingExampleITCase extends TestLogger { public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @ClassRule - public static MiniClusterResource miniClusterResource = new MiniClusterResource( + public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java index 4c8440df93313..79ffbe0c624d0 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java @@ -22,10 +22,10 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; @@ -71,7 +71,7 @@ public class DistributedCacheDfsTest extends TestLogger { public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala index 194b657ec6265..5ef129ad7d34f 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/util/FlinkTestBase.scala @@ -18,7 +18,8 @@ package org.apache.flink.ml.util -import org.apache.flink.test.util.{MiniClusterResource, MiniClusterResourceConfiguration} +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration +import org.apache.flink.test.util.MiniClusterWithClientResource import org.scalatest.{BeforeAndAfter, Suite} /** Mixin to start and stop a LocalFlinkMiniCluster automatically for Scala based tests. @@ -50,11 +51,11 @@ import org.scalatest.{BeforeAndAfter, Suite} trait FlinkTestBase extends BeforeAndAfter { that: Suite => - var cluster: Option[MiniClusterResource] = None + var cluster: Option[MiniClusterWithClientResource] = None val parallelism = 4 before { - val cl = new MiniClusterResource( + val cl = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(parallelism) diff --git a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java index eb8b56a4dc4e4..0ec76054633bb 100644 --- a/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java +++ b/flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.client.config.Environment; import org.apache.flink.table.client.config.entries.ViewEntry; @@ -40,8 +41,7 @@ import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.client.gateway.TypedResult; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; import org.apache.flink.util.TestLogger; @@ -83,7 +83,7 @@ public class LocalExecutorITCase extends TestLogger { public static TemporaryFolder tempFolder = new TemporaryFolder(); @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfig()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 179cf9c6de89c..915c02b644d8f 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -37,8 +37,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -62,7 +62,7 @@ public class JMXJobManagerMetricTest extends TestLogger { @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberSlotsPerTaskManager(1) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java index ff84775f16207..9755b52e2f17d 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateFsBackendITCase.java @@ -27,8 +27,8 @@ import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.curator.test.TestingServer; import org.junit.AfterClass; @@ -55,7 +55,7 @@ public class HAQueryableStateFsBackendITCase extends AbstractQueryableStateTestB private static TestingServer zkServer; - private static MiniClusterResource miniClusterResource; + private static MiniClusterWithClientResource miniClusterResource; @Override protected AbstractStateBackend createStateBackend() throws Exception { @@ -68,7 +68,7 @@ public static void setup() throws Exception { // we have to manage this manually because we have to create the ZooKeeper server // ahead of this - miniClusterResource = new MiniClusterResource( + miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfig()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java index 828123ed2c956..94baaed37458c 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java @@ -27,8 +27,8 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.curator.test.TestingServer; import org.junit.AfterClass; @@ -55,7 +55,7 @@ public class HAQueryableStateRocksDBBackendITCase extends AbstractQueryableState private static TestingServer zkServer; - private static MiniClusterResource miniClusterResource; + private static MiniClusterWithClientResource miniClusterResource; @Override protected AbstractStateBackend createStateBackend() throws Exception { @@ -68,7 +68,7 @@ public static void setup() throws Exception { // we have to manage this manually because we have to create the ZooKeeper server // ahead of this - miniClusterResource = new MiniClusterResource( + miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfig()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java index edf0b6224e883..9116304ebf5a5 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -52,7 +52,7 @@ public class NonHAQueryableStateFsBackendITCase extends AbstractQueryableStateTe public TemporaryFolder temporaryFolder = new TemporaryFolder(); @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfig()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java index b9fd4534db1a0..ce3e665fce619 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.queryablestate.client.QueryableStateClient; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -52,7 +52,7 @@ public class NonHAQueryableStateRocksDBBackendITCase extends AbstractQueryableSt public TemporaryFolder temporaryFolder = new TemporaryFolder(); @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfig()) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java index 3ae830d061877..7a287cf85be78 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java @@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.StoppableTask; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.TestLogger; @@ -79,7 +79,7 @@ public class WebFrontendITCase extends TestLogger { private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration(); @ClassRule - public static final MiniClusterResource CLUSTER = new MiniClusterResource( + public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(CLUSTER_CONFIGURATION) .setNumberTaskManagers(NUM_TASK_MANAGERS) @@ -117,7 +117,7 @@ public void tearDown() { @Test public void getFrontPage() { try { - String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/index.html"); + String fromHTTP = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/index.html"); String text = "Apache Flink Dashboard"; assertTrue("Startpage should contain " + text, fromHTTP.contains(text)); } catch (Exception e) { @@ -126,10 +126,14 @@ public void getFrontPage() { } } + private int getRestPort() { + return CLUSTER.getRestAddres().getPort(); + } + @Test public void testResponseHeaders() throws Exception { // check headers for successful json response - URL taskManagersUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers"); + URL taskManagersUrl = new URL("http://localhost:" + getRestPort() + "/taskmanagers"); HttpURLConnection taskManagerConnection = (HttpURLConnection) taskManagersUrl.openConnection(); taskManagerConnection.setConnectTimeout(100000); taskManagerConnection.connect(); @@ -145,7 +149,7 @@ public void testResponseHeaders() throws Exception { Assert.assertEquals("application/json; charset=UTF-8", taskManagerConnection.getContentType()); // check headers in case of an error - URL notFoundJobUrl = new URL("http://localhost:" + CLUSTER.getWebUIPort() + "/jobs/dontexist"); + URL notFoundJobUrl = new URL("http://localhost:" + getRestPort() + "/jobs/dontexist"); HttpURLConnection notFoundJobConnection = (HttpURLConnection) notFoundJobUrl.openConnection(); notFoundJobConnection.setConnectTimeout(100000); notFoundJobConnection.connect(); @@ -161,7 +165,7 @@ public void testResponseHeaders() throws Exception { @Test public void getNumberOfTaskManagers() { try { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/"); + String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/"); ObjectMapper mapper = new ObjectMapper(); JsonNode response = mapper.readTree(json); @@ -177,7 +181,7 @@ public void getNumberOfTaskManagers() { @Test public void getTaskmanagers() throws Exception { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/"); + String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/"); ObjectMapper mapper = new ObjectMapper(); JsonNode parsed = mapper.readTree(json); @@ -197,18 +201,18 @@ public void getLogAndStdoutFiles() throws Exception { WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION); FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); - String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/log"); + String logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/log"); assertTrue(logs.contains("job manager log")); FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); - logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/stdout"); + logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/stdout"); assertTrue(logs.contains("job manager out")); } @Test public void getTaskManagerLogAndStdoutFiles() { try { - String json = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/"); + String json = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/"); ObjectMapper mapper = new ObjectMapper(); JsonNode parsed = mapper.readTree(json); @@ -220,11 +224,11 @@ public void getTaskManagerLogAndStdoutFiles() { //we check for job manager log files, since no separate taskmanager logs exist FileUtils.writeStringToFile(logFiles.logFile, "job manager log"); - String logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/log"); + String logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/" + id + "/log"); assertTrue(logs.contains("job manager log")); FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out"); - logs = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/taskmanagers/" + id + "/stdout"); + logs = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/" + id + "/stdout"); assertTrue(logs.contains("job manager out")); } catch (Exception e) { e.printStackTrace(); @@ -235,7 +239,7 @@ public void getTaskManagerLogAndStdoutFiles() { @Test public void getConfiguration() { try { - String config = TestBaseUtils.getFromHTTP("http://localhost:" + CLUSTER.getWebUIPort() + "/jobmanager/config"); + String config = TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/config"); Map conf = WebMonitorUtils.fromKeyValueJsonArray(config); assertEquals( @@ -275,7 +279,7 @@ public void testStop() throws Exception { final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); final Deadline deadline = testTimeout.fromNow(); - try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { + try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) { // stop the job client.sendPatchRequest("/jobs/" + jid + "/?mode=stop", deadline.timeLeft()); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft()); @@ -291,7 +295,7 @@ public void testStop() throws Exception { } // ensure we can access job details when its finished (FLINK-4011) - try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { + try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) { FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS); client.sendGetRequest("/jobs/" + jid + "/config", timeout); HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout); @@ -334,7 +338,7 @@ public void testStopYarn() throws Exception { final FiniteDuration testTimeout = new FiniteDuration(2, TimeUnit.MINUTES); final Deadline deadline = testTimeout.fromNow(); - try (HttpTestClient client = new HttpTestClient("localhost", CLUSTER.getWebUIPort())) { + try (HttpTestClient client = new HttpTestClient("localhost", getRestPort())) { // Request the file from the web server client.sendGetRequest("/jobs/" + jid + "/yarn-stop", deadline.timeLeft()); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java index a2138e14383b3..5fc3ff5c06790 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java @@ -26,9 +26,10 @@ import org.apache.flink.runtime.rest.RestClientConfiguration; import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; import org.junit.ClassRule; import org.junit.Test; @@ -43,7 +44,7 @@ /** * Tests for the {@link JarRunHandler}. */ -public class JarRunHandlerTest { +public class JarRunHandlerTest extends TestLogger { @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java index 18dd76e05b0a7..fce2b6dcc31d0 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java @@ -26,9 +26,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -63,7 +63,7 @@ public class HistoryServerTest extends TestLogger { @ClassRule public static final TemporaryFolder TMP = new TemporaryFolder(); - private MiniClusterResource cluster; + private MiniClusterWithClientResource cluster; private File jmDirectory; private File hsDirectory; @@ -75,7 +75,7 @@ public void setUp() throws Exception { Configuration clusterConfig = new Configuration(); clusterConfig.setString(JobManagerOptions.ARCHIVE_DIR, jmDirectory.toURI().toString()); - cluster = new MiniClusterResource( + cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(clusterConfig) .setNumberTaskManagers(1) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index f6689fe72d28b..f32b8398c02a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -31,13 +31,12 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; /** @@ -52,30 +51,20 @@ public class PartialConsumePipelinedResultTest extends TestLogger { private static final int NUMBER_OF_NETWORK_BUFFERS = 128; - private static MiniCluster flink; + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFlinkConfiguration()) + .setNumberTaskManagers(NUMBER_OF_TMS) + .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) + .build()); - @BeforeClass - public static void setUp() throws Exception { + private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, NUMBER_OF_NETWORK_BUFFERS); - final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(config) - .setNumTaskManagers(NUMBER_OF_TMS) - .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) - .build(); - - flink = new MiniCluster(miniClusterConfiguration); - - flink.start(); - } - - @AfterClass - public static void tearDown() throws Exception { - if (flink != null) { - flink.close(); - } + return config; } /** @@ -111,7 +100,7 @@ public void testPartialConsumePipelinedResultReceiver() throws Exception { sender.setSlotSharingGroup(slotSharingGroup); receiver.setSlotSharingGroup(slotSharingGroup); - flink.executeJobBlocking(jobGraph); + MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph); } // --------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 7782a8e2feb3b..9b16e86c42254 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.reader.RecordReader; @@ -30,14 +29,13 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import java.util.BitSet; @@ -56,30 +54,19 @@ public class SlotCountExceedingParallelismTest extends TestLogger { public static final String JOB_NAME = "SlotCountExceedingParallelismTest (no slot sharing, blocking results)"; - private static MiniCluster flink; + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFlinkConfiguration()) + .setNumberTaskManagers(NUMBER_OF_TMS) + .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) + .build()); - @BeforeClass - public static void setUp() throws Exception { + private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); - config.setInteger(RestOptions.PORT, 0); config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); - final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(config) - .setNumTaskManagers(NUMBER_OF_TMS) - .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) - .build(); - - flink = new MiniCluster(miniClusterConfiguration); - - flink.start(); - } - - @AfterClass - public static void tearDown() throws Exception { - if (flink != null) { - flink.close(); - } + return config; } @Test @@ -106,7 +93,7 @@ public void testNoSlotSharingAndBlockingResultBoth() throws Exception { // --------------------------------------------------------------------------------------------- private void submitJobGraphAndWait(final JobGraph jobGraph) throws JobExecutionException, InterruptedException { - flink.executeJobBlocking(jobGraph); + MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph); } private JobGraph createTestJobGraph( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java index eab4eabd6aaa1..f382f0410068f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java @@ -20,7 +20,6 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -29,16 +28,15 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.SlotCountExceedingParallelismTest; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import java.util.List; @@ -55,30 +53,19 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger { private static final int NUMBER_OF_SLOTS_PER_TM = 2; private static final int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; - private static MiniCluster flink; + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFlinkConfiguration()) + .setNumberTaskManagers(NUMBER_OF_TMS) + .setNumberSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) + .build()); - @BeforeClass - public static void setUp() throws Exception { + private static Configuration getFlinkConfiguration() { final Configuration config = new Configuration(); - config.setInteger(RestOptions.PORT, 0); config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); - final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(config) - .setNumTaskManagers(NUMBER_OF_TMS) - .setNumSlotsPerTaskManager(NUMBER_OF_SLOTS_PER_TM) - .build(); - - flink = new MiniCluster(miniClusterConfiguration); - - flink.start(); - } - - @AfterClass - public static void tearDown() throws Exception { - if (flink != null) { - flink.close(); - } + return config; } /** @@ -137,7 +124,7 @@ public void testMixedPipelinedAndBlockingResults() throws Exception { pipelinedReceiver, blockingReceiver); - flink.executeJobBlocking(jobGraph); + MINI_CLUSTER_RESOURCE.getMiniCluster().executeJobBlocking(jobGraph); } // --------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index 561b81beb0284..14aecc700ad2b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -35,11 +35,13 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.types.LongValue; import org.apache.flink.util.TestLogger; +import org.junit.ClassRule; import org.junit.Test; import java.time.Duration; @@ -61,6 +63,19 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { private static volatile Thread ASYNC_PRODUCER_THREAD; private static volatile Thread ASYNC_CONSUMER_THREAD; + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getFlinkConfiguration()) + .build()); + + private static Configuration getFlinkConfiguration() { + Configuration config = new Configuration(); + config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096"); + config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9); + return config; + } + /** * Tests that a task waiting on an async producer/consumer that is stuck * in a blocking buffer request can be properly cancelled. @@ -73,105 +88,92 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { public void testCancelAsyncProducerAndConsumer() throws Exception { Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2)); - // Cluster - Configuration config = new Configuration(); - config.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096"); - config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9); + // Job with async producer and consumer + JobVertex producer = new JobVertex("AsyncProducer"); + producer.setParallelism(1); + producer.setInvokableClass(AsyncProducer.class); - MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(config) - .setNumTaskManagers(1) - .setNumSlotsPerTaskManager(1) - .build(); - - try (MiniCluster flink = new MiniCluster(miniClusterConfiguration)) { - flink.start(); - - // Job with async producer and consumer - JobVertex producer = new JobVertex("AsyncProducer"); - producer.setParallelism(1); - producer.setInvokableClass(AsyncProducer.class); - - JobVertex consumer = new JobVertex("AsyncConsumer"); - consumer.setParallelism(1); - consumer.setInvokableClass(AsyncConsumer.class); - consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - - SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID()); - producer.setSlotSharingGroup(slot); - consumer.setSlotSharingGroup(slot); - - JobGraph jobGraph = new JobGraph(producer, consumer); - - // Submit job and wait until running - flink.runDetached(jobGraph); - - FutureUtils.retrySuccesfulWithDelay( - () -> flink.getJobStatus(jobGraph.getJobID()), - Time.milliseconds(10), - deadline, - status -> status == JobStatus.RUNNING, - TestingUtils.defaultScheduledExecutor() - ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); - - boolean producerBlocked = false; - for (int i = 0; i < 50; i++) { - Thread thread = ASYNC_PRODUCER_THREAD; - - if (thread != null && thread.isAlive()) { - StackTraceElement[] stackTrace = thread.getStackTrace(); - producerBlocked = isInBlockingBufferRequest(stackTrace); - } + JobVertex consumer = new JobVertex("AsyncConsumer"); + consumer.setParallelism(1); + consumer.setInvokableClass(AsyncConsumer.class); + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - if (producerBlocked) { - break; - } else { - // Retry - Thread.sleep(500L); - } - } + SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID()); + producer.setSlotSharingGroup(slot); + consumer.setSlotSharingGroup(slot); - // Verify that async producer is in blocking request - assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked); + JobGraph jobGraph = new JobGraph(producer, consumer); - boolean consumerWaiting = false; - for (int i = 0; i < 50; i++) { - Thread thread = ASYNC_CONSUMER_THREAD; + final MiniCluster flink = MINI_CLUSTER_RESOURCE.getMiniCluster(); - if (thread != null && thread.isAlive()) { - consumerWaiting = thread.getState() == Thread.State.WAITING; - } + // Submit job and wait until running + flink.runDetached(jobGraph); - if (consumerWaiting) { - break; - } else { - // Retry - Thread.sleep(500L); - } + FutureUtils.retrySuccesfulWithDelay( + () -> flink.getJobStatus(jobGraph.getJobID()), + Time.milliseconds(10), + deadline, + status -> status == JobStatus.RUNNING, + TestingUtils.defaultScheduledExecutor() + ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + boolean producerBlocked = false; + for (int i = 0; i < 50; i++) { + Thread thread = ASYNC_PRODUCER_THREAD; + + if (thread != null && thread.isAlive()) { + StackTraceElement[] stackTrace = thread.getStackTrace(); + producerBlocked = isInBlockingBufferRequest(stackTrace); } - // Verify that async consumer is in blocking request - assertTrue("Consumer thread is not blocked.", consumerWaiting); + if (producerBlocked) { + break; + } else { + // Retry + Thread.sleep(500L); + } + } - flink.cancelJob(jobGraph.getJobID()) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + // Verify that async producer is in blocking request + assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked); - // wait until the job is canceled - FutureUtils.retrySuccesfulWithDelay( - () -> flink.getJobStatus(jobGraph.getJobID()), - Time.milliseconds(10), - deadline, - status -> status == JobStatus.CANCELED, - TestingUtils.defaultScheduledExecutor() - ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + boolean consumerWaiting = false; + for (int i = 0; i < 50; i++) { + Thread thread = ASYNC_CONSUMER_THREAD; - // Verify the expected Exceptions - assertNotNull(ASYNC_PRODUCER_EXCEPTION); - assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass()); + if (thread != null && thread.isAlive()) { + consumerWaiting = thread.getState() == Thread.State.WAITING; + } - assertNotNull(ASYNC_CONSUMER_EXCEPTION); - assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass()); + if (consumerWaiting) { + break; + } else { + // Retry + Thread.sleep(500L); + } } + + // Verify that async consumer is in blocking request + assertTrue("Consumer thread is not blocked.", consumerWaiting); + + flink.cancelJob(jobGraph.getJobID()) + .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + // wait until the job is canceled + FutureUtils.retrySuccesfulWithDelay( + () -> flink.getJobStatus(jobGraph.getJobID()), + Time.milliseconds(10), + deadline, + status -> status == JobStatus.CANCELED, + TestingUtils.defaultScheduledExecutor() + ).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + + // Verify the expected Exceptions + assertNotNull(ASYNC_PRODUCER_EXCEPTION); + assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass()); + + assertNotNull(ASYNC_CONSUMER_EXCEPTION); + assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass()); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java new file mode 100644 index 0000000000000..5cd1a50176d26 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResource.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * Resource which starts a {@link MiniCluster} for testing purposes. + */ +public class MiniClusterResource extends ExternalResource { + + private static final String DEFAULT_MANAGED_MEMORY_SIZE = "80m"; + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private final MiniClusterResourceConfiguration miniClusterResourceConfiguration; + + private MiniCluster miniCluster = null; + + private int numberSlots = -1; + + private UnmodifiableConfiguration restClusterClientConfig; + + public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { + this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration); + } + + public int getNumberSlots() { + return numberSlots; + } + + public MiniCluster getMiniCluster() { + return miniCluster; + } + + public UnmodifiableConfiguration getClientConfiguration() { + return restClusterClientConfig; + } + + public URI getRestAddres() { + return miniCluster.getRestAddress(); + } + + @Override + public void before() throws Exception { + temporaryFolder.create(); + + startMiniCluster(); + + numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers(); + } + + @Override + public void after() { + temporaryFolder.delete(); + + Exception exception = null; + + if (miniCluster != null) { + final CompletableFuture terminationFuture = miniCluster.closeAsync(); + + try { + terminationFuture.get( + miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(), + TimeUnit.MILLISECONDS); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + miniCluster = null; + } + + if (exception != null) { + log.warn("Could not properly shut down the MiniClusterResource.", exception); + } + } + + private void startMiniCluster() throws Exception { + final Configuration configuration = new Configuration(miniClusterResourceConfiguration.getConfiguration()); + configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); + + // we need to set this since a lot of test expect this because TestBaseUtils.startCluster() + // enabled this by default + if (!configuration.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) { + configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); + } + + if (!configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) { + configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, DEFAULT_MANAGED_MEMORY_SIZE); + } + + // set rest and rpc port to 0 to avoid clashes with concurrent MiniClusters + configuration.setInteger(JobManagerOptions.PORT, 0); + configuration.setInteger(RestOptions.PORT, 0); + + final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() + .setConfiguration(configuration) + .setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers()) + .setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()) + .build(); + + miniCluster = new MiniCluster(miniClusterConfiguration); + + miniCluster.start(); + + final URI restAddress = miniCluster.getRestAddress(); + createClientConfiguration(restAddress); + } + + private void createClientConfiguration(URI restAddress) { + Configuration restClientConfig = new Configuration(); + restClientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); + restClientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); + this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java new file mode 100644 index 0000000000000..d54996d0fa095 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/MiniClusterResourceConfiguration.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testutils; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.util.Preconditions; + +/** + * Mini cluster resource configuration object. + */ +public class MiniClusterResourceConfiguration { + + private final UnmodifiableConfiguration configuration; + + private final int numberTaskManagers; + + private final int numberSlotsPerTaskManager; + + private final Time shutdownTimeout; + + private final RpcServiceSharing rpcServiceSharing; + + protected MiniClusterResourceConfiguration( + Configuration configuration, + int numberTaskManagers, + int numberSlotsPerTaskManager, + Time shutdownTimeout, + RpcServiceSharing rpcServiceSharing) { + this.configuration = new UnmodifiableConfiguration(Preconditions.checkNotNull(configuration)); + this.numberTaskManagers = numberTaskManagers; + this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; + this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout); + this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing); + } + + public Configuration getConfiguration() { + return configuration; + } + + public int getNumberTaskManagers() { + return numberTaskManagers; + } + + public int getNumberSlotsPerTaskManager() { + return numberSlotsPerTaskManager; + } + + public Time getShutdownTimeout() { + return shutdownTimeout; + } + + public RpcServiceSharing getRpcServiceSharing() { + return rpcServiceSharing; + } + + /** + * Builder for {@link MiniClusterResourceConfiguration}. + */ + public static final class Builder { + + private Configuration configuration = new Configuration(); + private int numberTaskManagers = 1; + private int numberSlotsPerTaskManager = 1; + private Time shutdownTimeout = AkkaUtils.getTimeoutAsTime(configuration); + + private RpcServiceSharing rpcServiceSharing = RpcServiceSharing.SHARED; + + public Builder setConfiguration(Configuration configuration) { + this.configuration = configuration; + return this; + } + + public Builder setNumberTaskManagers(int numberTaskManagers) { + this.numberTaskManagers = numberTaskManagers; + return this; + } + + public Builder setNumberSlotsPerTaskManager(int numberSlotsPerTaskManager) { + this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; + return this; + } + + public Builder setShutdownTimeout(Time shutdownTimeout) { + this.shutdownTimeout = shutdownTimeout; + return this; + } + + public Builder setRpcServiceSharing(RpcServiceSharing rpcServiceSharing) { + this.rpcServiceSharing = rpcServiceSharing; + return this; + } + + public MiniClusterResourceConfiguration build() { + return new MiniClusterResourceConfiguration(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing); + } + } +} diff --git a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala index 731bbf6b288c3..337e4fb9be9e6 100644 --- a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala +++ b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala @@ -20,9 +20,11 @@ package org.apache.flink.api.scala import java.io._ -import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions} +import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.clusterframework.BootstrapTools -import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} +import org.apache.flink.runtime.minicluster.MiniCluster +import org.apache.flink.runtime.testutils.{MiniClusterResource, MiniClusterResourceConfiguration} +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration import org.apache.flink.util.TestLogger import org.junit._ import org.junit.rules.TemporaryFolder @@ -274,16 +276,15 @@ class ScalaShellITCase extends TestLogger { val dir = temporaryFolder.newFolder() BootstrapTools.writeConfiguration(configuration, new File(dir, "flink-conf.yaml")) - val args = cluster match { - case Some(_) => - Array( - "remote", - hostname, - Integer.toString(port), - "--configDir", - dir.getAbsolutePath) - case None => throw new IllegalStateException("Cluster has not been started.") - } + val port: Int = clusterResource.getRestAddres.getPort + val hostname : String = clusterResource.getRestAddres.getHost + + val args = Array( + "remote", + hostname, + Integer.toString(port), + "--configDir", + dir.getAbsolutePath) //start scala shell with initialized // buffered reader for testing @@ -313,35 +314,19 @@ object ScalaShellITCase { val configuration = new Configuration() var cluster: Option[MiniCluster] = None - var port: Int = _ - var hostname : String = _ val parallelism: Int = 4 - @BeforeClass - def beforeAll(): Unit = { - // set to different than default so not to interfere with ScalaShellLocalStartupITCase - configuration.setInteger(RestOptions.PORT, 8082) - val miniConfig = new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .setNumSlotsPerTaskManager(parallelism) - .build() - - val miniCluster = new MiniCluster(miniConfig) - miniCluster.start() - port = miniCluster.getRestAddress.getPort - hostname = miniCluster.getRestAddress.getHost - - cluster = Some(miniCluster) - } + val _clusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(parallelism) + .build()) + + @ClassRule + def clusterResource = _clusterResource @AfterClass def afterAll(): Unit = { // The Scala interpreter somehow changes the class loader. Therefore, we have to reset it Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader) - - cluster.foreach { - miniCluster => miniCluster.close() - } } /** @@ -358,45 +343,44 @@ object ScalaShellITCase { val oldOut = System.out System.setOut(new PrintStream(baos)) - cluster match { - case Some(_) => - val repl = externalJars match { - case Some(ej) => new FlinkILoop( - hostname, - port, - configuration, - Option(Array(ej)), - in, new PrintWriter(out)) - - case None => new FlinkILoop( - hostname, - port, - configuration, - in, new PrintWriter(out)) - } + val port: Int = clusterResource.getRestAddres.getPort + val hostname : String = clusterResource.getRestAddres.getHost - repl.settings = new Settings() + val repl = externalJars match { + case Some(ej) => new FlinkILoop( + hostname, + port, + configuration, + Option(Array(ej)), + in, new PrintWriter(out)) - // enable this line to use scala in intellij - repl.settings.usejavacp.value = true + case None => new FlinkILoop( + hostname, + port, + configuration, + in, new PrintWriter(out)) + } - externalJars match { - case Some(ej) => repl.settings.classpath.value = ej - case None => - } + repl.settings = new Settings() + + // enable this line to use scala in intellij + repl.settings.usejavacp.value = true + + externalJars match { + case Some(ej) => repl.settings.classpath.value = ej + case None => + } - repl.process(repl.settings) + repl.process(repl.settings) - repl.closeInterpreter() + repl.closeInterpreter() - System.setOut(oldOut) + System.setOut(oldOut) - baos.flush() + baos.flush() - val stdout = baos.toString + val stdout = baos.toString - out.toString + stdout - case _ => throw new IllegalStateException("The cluster has not been started.") - } + out.toString + stdout } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java index 60ee66f42465a..d9a257c9cb54c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/RemoteStreamExecutionEnvironmentTest.java @@ -21,14 +21,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import java.util.Iterator; @@ -38,30 +38,12 @@ */ public class RemoteStreamExecutionEnvironmentTest extends TestLogger { - private static MiniCluster flink; - - @BeforeClass - public static void setUp() throws Exception { - final Configuration config = new Configuration(); - config.setInteger(RestOptions.PORT, 0); - - final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(config) - .setNumTaskManagers(1) - .setNumSlotsPerTaskManager(1) - .build(); - - flink = new MiniCluster(miniClusterConfiguration); - - flink.start(); - } - - @AfterClass - public static void tearDown() throws Exception { - if (flink != null) { - flink.close(); - } - } + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setNumberSlotsPerTaskManager(1) + .build()); /** * Verifies that the port passed to the RemoteStreamEnvironment is used for connecting to the cluster. @@ -71,9 +53,10 @@ public void testPortForwarding() throws Exception { final Configuration clientConfiguration = new Configuration(); clientConfiguration.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0); + final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( - flink.getRestAddress().getHost(), - flink.getRestAddress().getPort(), + miniCluster.getRestAddress().getHost(), + miniCluster.getRestAddress().getPort(), clientConfiguration); final DataStream resultStream = env.fromElements(1) diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml b/flink-test-utils-parent/flink-test-utils/pom.xml index 6a8e01700b266..6c32a436babc2 100644 --- a/flink-test-utils-parent/flink-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-test-utils/pom.xml @@ -50,6 +50,14 @@ under the License. compile + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + test-jar + compile + + org.apache.flink flink-clients_${scala.binary.version} diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 0b7a3b3f93f29..3ac2104b7b2b9 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.util; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.FileUtils; import org.junit.ClassRule; @@ -58,7 +59,7 @@ public abstract class AbstractTestBase extends TestBaseUtils { private static final int DEFAULT_PARALLELISM = 4; @ClassRule - public static MiniClusterResource miniClusterResource = new MiniClusterResource( + public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 9140bb4eff557..cbd36e4eddead 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -18,166 +18,16 @@ package org.apache.flink.test.util; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.MiniClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestOptions; -import org.apache.flink.configuration.TaskManagerOptions; -import org.apache.flink.configuration.UnmodifiableConfiguration; -import org.apache.flink.runtime.minicluster.JobExecutorService; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.streaming.util.TestStreamEnvironment; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; - -import org.junit.rules.ExternalResource; -import org.junit.rules.TemporaryFolder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - /** - * Starts a Flink mini cluster as a resource and registers the respective - * ExecutionEnvironment and StreamExecutionEnvironment. + * Mirror of the {@link MiniClusterWithClientResource} to avoid breaking + * changes when splitting up the original MiniClusterResource implementation + * with FLINK-10637. + * + * @deprecated This class should be replaced with {@link MiniClusterWithClientResource}. */ -public class MiniClusterResource extends ExternalResource { - - private static final Logger LOG = LoggerFactory.getLogger(MiniClusterResource.class); - - private final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - private final MiniClusterResourceConfiguration miniClusterResourceConfiguration; - - private JobExecutorService jobExecutorService; - - private ClusterClient clusterClient; - - private Configuration restClusterClientConfig; - - private int numberSlots = -1; - - private TestEnvironment executionEnvironment; - - private int webUIPort = -1; - - public MiniClusterResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { - this.miniClusterResourceConfiguration = Preconditions.checkNotNull(miniClusterResourceConfiguration); - } - - public int getNumberSlots() { - return numberSlots; - } - - public ClusterClient getClusterClient() { - return clusterClient; - } - - public Configuration getClientConfiguration() { - return restClusterClientConfig; - } - - public TestEnvironment getTestEnvironment() { - return executionEnvironment; - } - - public int getWebUIPort() { - return webUIPort; - } - - @Override - public void before() throws Exception { - temporaryFolder.create(); - - startMiniCluster(); - - numberSlots = miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() * miniClusterResourceConfiguration.getNumberTaskManagers(); - - executionEnvironment = new TestEnvironment(jobExecutorService, numberSlots, false); - executionEnvironment.setAsContext(); - TestStreamEnvironment.setAsContext(jobExecutorService, numberSlots); - } - - @Override - public void after() { - temporaryFolder.delete(); - - TestStreamEnvironment.unsetAsContext(); - TestEnvironment.unsetAsContext(); - - Exception exception = null; - - if (clusterClient != null) { - try { - clusterClient.shutdown(); - } catch (Exception e) { - exception = e; - } - } - - clusterClient = null; - - if (jobExecutorService != null) { - final CompletableFuture terminationFuture = jobExecutorService.closeAsync(); - - try { - terminationFuture.get( - miniClusterResourceConfiguration.getShutdownTimeout().toMilliseconds(), - TimeUnit.MILLISECONDS); - } catch (Exception e) { - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - - jobExecutorService = null; - } - - if (exception != null) { - LOG.warn("Could not properly shut down the MiniClusterResource.", exception); - } - } - - private void startMiniCluster() throws Exception { - final Configuration configuration = miniClusterResourceConfiguration.getConfiguration(); - configuration.setString(CoreOptions.TMP_DIRS, temporaryFolder.newFolder().getAbsolutePath()); - - // we need to set this since a lot of test expect this because TestBaseUtils.startCluster() - // enabled this by default - if (!configuration.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) { - configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); - } - - if (!configuration.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) { - configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, TestBaseUtils.TASK_MANAGER_MEMORY_SIZE); - } - - // set rest port to 0 to avoid clashes with concurrent MiniClusters - configuration.setInteger(RestOptions.PORT, 0); - - final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .setNumTaskManagers(miniClusterResourceConfiguration.getNumberTaskManagers()) - .setNumSlotsPerTaskManager(miniClusterResourceConfiguration.getNumberSlotsPerTaskManager()) - .build(); - - final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration); - - miniCluster.start(); - - // update the port of the rest endpoint - configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); - - jobExecutorService = miniCluster; - clusterClient = new MiniClusterClient(configuration, miniCluster); - - Configuration restClientConfig = new Configuration(); - restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost()); - restClientConfig.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort()); - this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); - - webUIPort = miniCluster.getRestAddress().getPort(); +@Deprecated +public class MiniClusterResource extends MiniClusterWithClientResource { + public MiniClusterResource(MiniClusterResourceConfiguration miniClusterResourceConfiguration) { + super(miniClusterResourceConfiguration); } } diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java index bd521a2f2020d..8b5167a4df776 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java @@ -22,58 +22,27 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.RpcServiceSharing; -import org.apache.flink.util.Preconditions; /** - * Mini cluster resource configuration object. + * Mirror of {@link org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration} which has been + * introduced to avoid breaking changes with FLINK-10637. + * + * @deprecated This class should be replaced with {@link org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration}. */ -public class MiniClusterResourceConfiguration { - - private final Configuration configuration; - - private final int numberTaskManagers; - - private final int numberSlotsPerTaskManager; - - private final Time shutdownTimeout; - - private final RpcServiceSharing rpcServiceSharing; +@Deprecated +public class MiniClusterResourceConfiguration extends org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration { MiniClusterResourceConfiguration( - Configuration configuration, - int numberTaskManagers, - int numberSlotsPerTaskManager, - Time shutdownTimeout, - RpcServiceSharing rpcServiceSharing) { - this.configuration = Preconditions.checkNotNull(configuration); - this.numberTaskManagers = numberTaskManagers; - this.numberSlotsPerTaskManager = numberSlotsPerTaskManager; - this.shutdownTimeout = Preconditions.checkNotNull(shutdownTimeout); - this.rpcServiceSharing = Preconditions.checkNotNull(rpcServiceSharing); - } - - public Configuration getConfiguration() { - return configuration; - } - - public int getNumberTaskManagers() { - return numberTaskManagers; - } - - public int getNumberSlotsPerTaskManager() { - return numberSlotsPerTaskManager; - } - - public Time getShutdownTimeout() { - return shutdownTimeout; - } - - public RpcServiceSharing getRpcServiceSharing() { - return rpcServiceSharing; + Configuration configuration, + int numberTaskManagers, + int numberSlotsPerTaskManager, + Time shutdownTimeout, + RpcServiceSharing rpcServiceSharing) { + super(configuration, numberTaskManagers, numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing); } /** - * Builder for {@link MiniClusterResourceConfiguration}. + * Builder for {@link org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration}. */ public static final class Builder { diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java new file mode 100644 index 0000000000000..594959b915674 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterWithClientResource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.util; + +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.util.TestStreamEnvironment; + +/** + * Starts a Flink mini cluster as a resource and registers the respective + * ExecutionEnvironment and StreamExecutionEnvironment. + */ +public class MiniClusterWithClientResource extends MiniClusterResource { + + private ClusterClient clusterClient; + + private TestEnvironment executionEnvironment; + + public MiniClusterWithClientResource(final MiniClusterResourceConfiguration miniClusterResourceConfiguration) { + super(miniClusterResourceConfiguration); + } + + public ClusterClient getClusterClient() { + return clusterClient; + } + + public TestEnvironment getTestEnvironment() { + return executionEnvironment; + } + + @Override + public void before() throws Exception { + super.before(); + + clusterClient = createMiniClusterClient(); + + executionEnvironment = new TestEnvironment(getMiniCluster(), getNumberSlots(), false); + executionEnvironment.setAsContext(); + TestStreamEnvironment.setAsContext(getMiniCluster(), getNumberSlots()); + } + + @Override + public void after() { + TestStreamEnvironment.unsetAsContext(); + TestEnvironment.unsetAsContext(); + + Exception exception = null; + + if (clusterClient != null) { + try { + clusterClient.shutdown(); + } catch (Exception e) { + exception = e; + } + } + + clusterClient = null; + + super.after(); + + if (exception != null) { + log.warn("Could not properly shut down the MiniClusterWithClientResource.", exception); + } + } + + private MiniClusterClient createMiniClusterClient() { + return new MiniClusterClient(getClientConfiguration(), getMiniCluster()); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java index bd70cadda0f1d..e890216e7e9ac 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsITCase.java @@ -23,11 +23,11 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.Before; @@ -56,7 +56,7 @@ public class JobManagerMetricsITCase extends TestLogger { private CheckedThread jobExecuteThread; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(1) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java index 6d9a7b03703fd..b875d97f578b4 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/metrics/SystemResourcesMetricsITCase.java @@ -22,8 +22,9 @@ import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.AbstractReporter; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; import org.junit.ClassRule; import org.junit.Test; @@ -43,7 +44,7 @@ /** * Integration tests for proper initialization of the system resource metrics. */ -public class SystemResourcesMetricsITCase { +public class SystemResourcesMetricsITCase extends TestLogger { @ClassRule public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java index 1ac75311d7fe4..1b8ac05f388d3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorErrorITCase.java @@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -50,7 +50,7 @@ public class AccumulatorErrorITCase extends TestLogger { private static final String INCOMPATIBLE_ACCUMULATORS_NAME = "incompatible-accumulators"; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(3) diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 2c12e441313af..0ead861adbaf7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -39,10 +39,10 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -84,7 +84,7 @@ public class AccumulatorLiveITCase extends TestLogger { } @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(1) diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java index a56089186b2d6..0d29568c2a045 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancelingTestBase.java @@ -32,8 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.testingUtils.TestingUtils; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -56,7 +56,7 @@ public abstract class CancelingTestBase extends TestLogger { // -------------------------------------------------------------------------------------------- @ClassRule - public static final MiniClusterResource CLUSTER = new MiniClusterResource( + public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 5dc2aa0e87f05..061d7e0580cd8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction; @@ -34,8 +35,7 @@ import org.apache.flink.test.checkpointing.utils.FailingSource; import org.apache.flink.test.checkpointing.utils.IntType; import org.apache.flink.test.checkpointing.utils.ValidatingSink; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -58,7 +58,7 @@ public class EventTimeAllWindowCheckpointingITCase extends TestLogger { private static final int PARALLELISM = 4; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index 4fa90206a756c..1c04d270b9c83 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -45,8 +46,7 @@ import org.apache.flink.test.checkpointing.utils.FailingSource; import org.apache.flink.test.checkpointing.utils.IntType; import org.apache.flink.test.checkpointing.utils.ValidatingSink; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -91,7 +91,7 @@ public class EventTimeWindowCheckpointingITCase extends TestLogger { private TestingServer zkServer; - public MiniClusterResource miniClusterResource; + public MiniClusterWithClientResource miniClusterResource; @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); @@ -117,8 +117,8 @@ protected StateBackendEnum getStateBackend() { return this.stateBackendEnum; } - protected final MiniClusterResource getMiniClusterResource() { - return new MiniClusterResource( + protected final MiniClusterWithClientResource getMiniClusterResource() { + return new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfigurationSafe()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java index 989024cffb7f6..957cd2b48d4c2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java @@ -31,13 +31,13 @@ import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.ClassRule; @@ -79,7 +79,7 @@ public class KeyedStateCheckpointingITCase extends TestLogger { // ------------------------------------------------------------------------ @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(NUM_TASK_MANAGERS) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 3a3eadaea5bbf..8f788cbfbcd0c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -51,8 +52,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -105,7 +105,7 @@ enum OperatorCheckpointMethod { NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED } - private static MiniClusterResource cluster; + private static MiniClusterWithClientResource cluster; @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -127,7 +127,7 @@ public void setup() throws Exception { config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - cluster = new MiniClusterResource( + cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 24f408de9a764..0635f239c567e 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -30,14 +30,14 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.test.state.ManualWindowSpeedITCase; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.apache.curator.test.TestingServer; @@ -263,7 +263,7 @@ private void testExternalizedCheckpoints( config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); } - MiniClusterResource cluster = new MiniClusterResource( + MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(NUM_TASK_MANAGERS) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index a806f02698650..2cd2bbb60e926 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.IterativeStream; @@ -51,8 +52,7 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -142,7 +142,7 @@ private String submitJobAndGetVerifiedSavepoint(MiniClusterResourceFactory clust final JobID jobId = jobGraph.getJobID(); StatefulCounter.resetForTest(parallelism); - MiniClusterResource cluster = clusterFactory.get(); + MiniClusterWithClientResource cluster = clusterFactory.get(); cluster.before(); ClusterClient client = cluster.getClusterClient(); @@ -184,7 +184,7 @@ private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceF final JobID jobId = jobGraph.getJobID(); StatefulCounter.resetForTest(parallelism); - MiniClusterResource cluster = clusterFactory.get(); + MiniClusterWithClientResource cluster = clusterFactory.get(); cluster.before(); ClusterClient client = cluster.getClusterClient(); @@ -230,7 +230,7 @@ public void testTriggerSavepointForNonExistingJob() throws Exception { final Configuration config = new Configuration(); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - final MiniClusterResource cluster = new MiniClusterResource( + final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) @@ -261,7 +261,7 @@ public void testTriggerSavepointWithCheckpointingDisabled() throws Exception { final Configuration config = new Configuration(); - final MiniClusterResource cluster = new MiniClusterResource( + final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) @@ -305,7 +305,7 @@ public void testSubmitWithUnknownSavepointPath() throws Exception { final Configuration config = new Configuration(); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - MiniClusterResource cluster = new MiniClusterResource( + MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) @@ -373,7 +373,7 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception { LOG.info("Flink configuration: " + config + "."); // Start Flink - MiniClusterResource cluster = new MiniClusterResource( + MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) @@ -417,7 +417,7 @@ public void testCanRestoreWithModifiedStatelessOperators() throws Exception { // create a new TestingCluster to make sure we start with completely // new resources - cluster = new MiniClusterResource( + cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) @@ -653,7 +653,7 @@ public Integer map(Integer value) throws Exception { config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0); config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString()); - MiniClusterResource cluster = new MiniClusterResource( + MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(1) @@ -789,8 +789,8 @@ private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManag this.config = config; } - MiniClusterResource get() { - return new MiniClusterResource( + MiniClusterWithClientResource get() { + return new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(numTaskManagers) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index 0ed9d6b2195ae..0ace56f5db6e3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -19,9 +19,9 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.TestLogger; @@ -41,7 +41,7 @@ public abstract class StreamFaultToleranceTestBase extends TestLogger { protected static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(NUM_TASK_MANAGERS) .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java index b0e2967d4b19a..8905ecdd386d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; @@ -34,8 +35,7 @@ import org.apache.flink.test.checkpointing.utils.FailingSource; import org.apache.flink.test.checkpointing.utils.IntType; import org.apache.flink.test.checkpointing.utils.ValidatingSink; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -71,7 +71,7 @@ public WindowCheckpointingITCase(TimeCharacteristic timeCharacteristic) { private static final int PARALLELISM = 4; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java index 642af40aaf234..7c00de7f43684 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java @@ -44,12 +44,12 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; @@ -104,7 +104,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestLogger { private static TestingServer zkServer; - private static MiniClusterResource miniClusterResource; + private static MiniClusterWithClientResource miniClusterResource; private static OneShotLatch waitForCheckpointLatch = new OneShotLatch(); private static OneShotLatch failInCheckpointLatch = new OneShotLatch(); @@ -131,7 +131,7 @@ public static void setup() throws Exception { // we have to manage this manually because we have to create the ZooKeeper server // ahead of this - miniClusterResource = new MiniClusterResource( + miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(NUM_TMS) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java index a5267b59e6846..34c20d6376091 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java @@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.OptionalFailure; @@ -72,7 +72,7 @@ public static void before() { public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); @Rule - public final MiniClusterResource miniClusterResource; + public final MiniClusterWithClientResource miniClusterResource; private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class); private static final Deadline DEADLINE = new FiniteDuration(5, TimeUnit.MINUTES).fromNow(); @@ -88,7 +88,7 @@ protected static String getResourceFilename(String filename) { } protected SavepointMigrationTestBase() throws Exception { - miniClusterResource = new MiniClusterResource( + miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(1) diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java index 4851e54c2ee43..23503d7e370c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java @@ -30,8 +30,8 @@ import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.testdata.KMeansData; import org.apache.flink.test.util.SuccessException; @@ -44,6 +44,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -90,18 +91,18 @@ public class ClassLoaderITCase extends TestLogger { private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar"; - private static final TemporaryFolder FOLDER = new TemporaryFolder(); + @ClassRule + public static final TemporaryFolder FOLDER = new TemporaryFolder(); @Rule public ExpectedException expectedException = ExpectedException.none(); - private static MiniCluster testCluster; + private static MiniClusterResource miniClusterResource = null; private static final int parallelism = 4; @BeforeClass public static void setUp() throws Exception { - FOLDER.create(); Configuration config = new Configuration(); @@ -117,26 +118,25 @@ public static void setUp() throws Exception { // required as we otherwise run out of memory config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "80m"); - testCluster = new MiniCluster( - new MiniClusterConfiguration.Builder() - .setNumTaskManagers(2) - .setNumSlotsPerTaskManager(2) + miniClusterResource = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) .setConfiguration(config) - .build() - ); - testCluster.start(); + .build()); + + miniClusterResource.before(); } @AfterClass - public static void tearDownClass() throws Exception { - if (testCluster != null) { - testCluster.close(); + public static void tearDownClass() { + if (miniClusterResource != null) { + miniClusterResource.after(); } - FOLDER.delete(); } @After - public void tearDown() throws Exception { + public void tearDown() { TestStreamEnvironment.unsetAsContext(); TestEnvironment.unsetAsContext(); } @@ -147,7 +147,7 @@ public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, Pro PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE)); TestEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)), Collections.emptyList()); @@ -160,7 +160,7 @@ public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOExceptio PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)); TestStreamEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)), Collections.emptyList()); @@ -174,7 +174,7 @@ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, Pr PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE)); TestEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.emptyList(), Collections.singleton(classpath)); @@ -188,7 +188,7 @@ public void testStreamingClassloaderJobWithCustomClassLoader() throws IOExceptio PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE)); TestStreamEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)), Collections.emptyList()); @@ -203,7 +203,7 @@ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throw PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE)); TestStreamEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)), Collections.emptyList()); @@ -242,7 +242,7 @@ public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvo }); TestEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(KMEANS_JAR_PATH)), Collections.emptyList()); @@ -255,7 +255,7 @@ public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, Progr PackagedProgram userCodeTypeProg = new PackagedProgram(new File(USERCODETYPE_JAR_PATH)); TestEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(USERCODETYPE_JAR_PATH)), Collections.emptyList()); @@ -276,7 +276,7 @@ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOEx }); TestStreamEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)), Collections.emptyList()); @@ -292,7 +292,7 @@ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOEx */ @Test public void testDisposeSavepointWithCustomKvState() throws Exception { - ClusterClient clusterClient = new MiniClusterClient(new Configuration(), testCluster); + ClusterClient clusterClient = new MiniClusterClient(new Configuration(), miniClusterResource.getMiniCluster()); Deadline deadline = new FiniteDuration(100, TimeUnit.SECONDS).fromNow(); @@ -309,7 +309,7 @@ public void testDisposeSavepointWithCustomKvState() throws Exception { }); TestStreamEnvironment.setAsContext( - testCluster, + miniClusterResource.getMiniCluster(), parallelism, Collections.singleton(new Path(CUSTOM_KV_STATE_JAR_PATH)), Collections.emptyList() diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java index 56df46e49ea5c..fff77e777d281 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java @@ -29,8 +29,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java index 9a96297933ab1..3cd7674037c97 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -55,7 +55,7 @@ public class JobSubmissionFailsITCase extends TestLogger { private static final int NUM_SLOTS = 20; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(NUM_TM) diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index 131b6a00eebb1..73e4ae946a67f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -22,12 +22,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import static org.junit.Assert.fail; @@ -45,7 +45,7 @@ public static void main(String[] args) throws Exception { final int slotsPerTaskManager = 80; final int parallelism = taskManagers * slotsPerTaskManager; - MiniClusterResource cluster = null; + MiniClusterWithClientResource cluster = null; try { Configuration config = new Configuration(); @@ -55,7 +55,7 @@ public static void main(String[] args) throws Exception { config.setInteger("taskmanager.net.server.numThreads", 1); config.setInteger("taskmanager.net.client.numThreads", 1); - cluster = new MiniClusterResource( + cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(taskManagers) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java index f62ccf7374d14..03376ece20078 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java @@ -26,8 +26,8 @@ import org.apache.flink.api.java.io.LocalCollectionOutputFormat; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -53,7 +53,7 @@ public class AutoParallelismITCase extends TestLogger { private static final int PARALLELISM = NUM_TM * SLOTS_PER_TM; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(NUM_TM) .setNumberSlotsPerTaskManager(SLOTS_PER_TM) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java index 07d146d54cf30..e9b0d7533208a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java @@ -27,8 +27,8 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.types.Value; import org.apache.flink.util.TestLogger; @@ -50,7 +50,7 @@ public class CustomSerializationITCase extends TestLogger { private static final int PARLLELISM = 5; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(1) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java index 6b3371dd4ffe3..77f3d4981c6c5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java @@ -28,8 +28,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -55,7 +55,7 @@ public class MiscellaneousIssuesITCase extends TestLogger { @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(3) diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java index ca6cb14a757d9..d53c052dc9008 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java @@ -32,8 +32,8 @@ import org.apache.flink.examples.java.graph.ConnectedComponents; import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.ClassRule; @@ -52,7 +52,7 @@ public class SuccessAfterNetworkBuffersFailureITCase extends TestLogger { private static final int PARALLELISM = 16; @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java index a20a0d0c9221e..1683dfc369fb4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/CustomDistributionITCase.java @@ -29,9 +29,9 @@ import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.operators.util.CollectionDataSets; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; @@ -49,7 +49,7 @@ public class CustomDistributionITCase extends TestLogger { @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(8) diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java index 451108b862763..b8b897db3c17b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/RemoteEnvironmentITCase.java @@ -24,16 +24,13 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; -import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.ClassRule; import org.junit.Test; import java.io.IOException; @@ -52,44 +49,13 @@ public class RemoteEnvironmentITCase extends TestLogger { private static final int USER_DOP = 2; - private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms"; - private static final String VALID_STARTUP_TIMEOUT = "100 s"; - private static Configuration configuration; - - private static AutoCloseableAsync resource; - - private static String hostname; - - private static int port; - - @BeforeClass - public static void setupCluster() throws Exception { - configuration = new Configuration(); - - configuration.setInteger(WebOptions.PORT, 0); - final MiniCluster miniCluster = new MiniCluster( - new MiniClusterConfiguration.Builder() - .setConfiguration(configuration) - .setNumSlotsPerTaskManager(TM_SLOTS) - .build()); - - miniCluster.start(); - - final URI uri = miniCluster.getRestAddress(); - hostname = uri.getHost(); - port = uri.getPort(); - - configuration.setInteger(WebOptions.PORT, port); - - resource = miniCluster; - } - - @AfterClass - public static void tearDownCluster() throws Exception { - resource.close(); - } + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(TM_SLOTS) + .build()); /** * Ensure that the program parallelism can be set even if the configuration is supplied. @@ -99,6 +65,10 @@ public void testUserSpecificParallelism() throws Exception { Configuration config = new Configuration(); config.setString(AkkaOptions.STARTUP_TIMEOUT, VALID_STARTUP_TIMEOUT); + final URI restAddress = MINI_CLUSTER_RESOURCE.getMiniCluster().getRestAddress(); + final String hostname = restAddress.getHost(); + final int port = restAddress.getPort(); + final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( hostname, port, diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java index d3247f22c782a..6783df0a85b0a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java @@ -20,8 +20,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.ClassRule; @@ -31,7 +31,7 @@ public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase { @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(2) .setNumberSlotsPerTaskManager(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java index 0ccb3fe14f806..c4f3e12d8fb31 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java @@ -20,8 +20,8 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.junit.ClassRule; @@ -31,7 +31,7 @@ public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase { @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index c1775573765b2..5f40115b5a15b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -26,9 +26,9 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; @@ -59,7 +59,7 @@ public class IPv6HostnamesITCase extends TestLogger { @Rule - public final MiniClusterResource miniClusterResource = new MiniClusterResource( + public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java index 418e9476e3849..5be9363d25120 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java @@ -20,10 +20,10 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.AssumptionViolatedException; @@ -47,7 +47,7 @@ public class NettyEpollITCase extends TestLogger { @Test public void testNettyEpoll() throws Exception { - MiniClusterResource cluster = trySetUpCluster(); + MiniClusterWithClientResource cluster = trySetUpCluster(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(NUM_TASK_MANAGERS); @@ -70,11 +70,11 @@ public Integer getKey(Integer value) throws Exception { } } - private MiniClusterResource trySetUpCluster() throws Exception { + private MiniClusterWithClientResource trySetUpCluster() throws Exception { try { Configuration config = new Configuration(); config.setString(TRANSPORT_TYPE, "epoll"); - MiniClusterResource cluster = new MiniClusterResource( + MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(config) .setNumberTaskManagers(NUM_TASK_MANAGERS) diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 96cb3973b0662..ad1de11992baa 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -32,8 +32,8 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -234,7 +234,7 @@ public void testThroughput() throws Exception { final int numTaskManagers = parallelism / numSlotsPerTaskManager; - final MiniClusterResource cluster = new MiniClusterResource( + final MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(numTaskManagers) .setNumberSlotsPerTaskManager(numSlotsPerTaskManager) @@ -258,7 +258,7 @@ public void testThroughput() throws Exception { } private void testProgram( - final MiniClusterResource cluster, + final MiniClusterWithClientResource cluster, final int dataVolumeGb, final boolean useForwarder, final boolean isSlowSender, diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java index 7eebde8602821..3db0f62f8292a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java @@ -30,11 +30,11 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; @@ -71,7 +71,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger { public final TemporaryFolder tmpFolder = new TemporaryFolder(); @ClassRule - public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(NUM_TMS) .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java index bef3ebdb94872..cf7233594cb81 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BigUserProgramJobSubmitITCase.java @@ -20,22 +20,18 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.rest.RestClusterClient; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.minicluster.MiniCluster; -import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; +import org.junit.ClassRule; import org.junit.Test; -import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -54,42 +50,9 @@ public class BigUserProgramJobSubmitITCase extends TestLogger { // The mini cluster that is shared across tests // ------------------------------------------------------------------------ - private static final MiniCluster CLUSTER; - private static final RestClusterClient CLIENT; - - static { - try { - MiniClusterConfiguration clusterConfiguration = new MiniClusterConfiguration.Builder() - .setNumTaskManagers(1) - .setNumSlotsPerTaskManager(1) - .build(); - CLUSTER = new MiniCluster(clusterConfiguration); - CLUSTER.start(); - - URI restAddress = CLUSTER.getRestAddress(); - - final Configuration clientConfig = new Configuration(); - clientConfig.setString(JobManagerOptions.ADDRESS, restAddress.getHost()); - clientConfig.setInteger(RestOptions.PORT, restAddress.getPort()); - - CLIENT = new RestClusterClient<>( - clientConfig, - StandaloneClusterId.getInstance()); - - } catch (Exception e) { - throw new AssertionError("Could not setup cluster.", e); - } - } - - // ------------------------------------------------------------------------ - // Cluster setup & teardown - // ------------------------------------------------------------------------ - - @AfterClass - public static void teardown() throws Exception { - CLIENT.shutdown(); - CLUSTER.close(); - } + @ClassRule + public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder().build()); private final Random rnd = new Random(); @@ -122,17 +85,26 @@ public String map(Integer value) throws Exception { }).addSink(resultSink); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); - CLIENT.setDetached(false); - CLIENT.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader()); - List expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0"); + final RestClusterClient restClusterClient = new RestClusterClient<>( + MINI_CLUSTER_RESOURCE.getClientConfiguration(), + StandaloneClusterId.getInstance()); - List result = CollectingSink.result; + try { + restClusterClient.setDetached(false); + restClusterClient.submitJob(jobGraph, BigUserProgramJobSubmitITCase.class.getClassLoader()); + + List expected = Arrays.asList("x 1 0", "x 3 0", "x 5 0"); - Collections.sort(expected); - Collections.sort(result); + List result = CollectingSink.result; - assertEquals(expected, result); + Collections.sort(expected); + Collections.sort(result); + + assertEquals(expected, result); + } finally { + restClusterClient.shutdown(); + } } private static class CollectingSink implements SinkFunction { diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 8793b1ff0d1e9..07f88c6be1d75 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -45,8 +46,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.test.util.MiniClusterResource; -import org.apache.flink.test.util.MiniClusterResourceConfiguration; +import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; import org.junit.Assert; @@ -78,7 +78,7 @@ public class TimestampITCase extends TestLogger { static MultiShotLatch latch; @ClassRule - public static final MiniClusterResource CLUSTER = new MiniClusterResource( + public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(getConfiguration()) .setNumberTaskManagers(NUM_TASK_MANAGERS)