From 700377a86155061679122ad33e595eb079bb1ca6 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Thu, 18 Oct 2018 14:53:16 +0200 Subject: [PATCH] [hotfix] Speed up MiniClusterITCase --- .../minicluster/MiniClusterITCase.java | 36 +++++++++---------- .../minicluster/SometimesExceptionSender.java | 14 +++----- .../SometimesInstantiationErrorSender.java | 9 ++--- 3 files changed, 23 insertions(+), 36 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index c005de95d6254..ef6b4420b6786 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -53,10 +52,10 @@ import java.io.IOException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -67,7 +66,7 @@ public class MiniClusterITCase extends TestLogger { @Test public void runJobWithSingleRpcService() throws Exception { - final int parallelism = 123; + final int parallelism = 23; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -85,7 +84,7 @@ public void runJobWithSingleRpcService() throws Exception { @Test public void runJobWithMultipleRpcServices() throws Exception { - final int parallelism = 123; + final int parallelism = 23; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -326,7 +325,7 @@ public void testSchedulingAllAtOnce() throws Exception { @Test public void testJobWithAFailingSenderVertex() throws Exception { - final int parallelism = 100; + final int parallelism = 11; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -364,7 +363,7 @@ public void testJobWithAFailingSenderVertex() throws Exception { @Test public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception { - final int parallelism = 100; + final int parallelism = 11; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -405,7 +404,7 @@ public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception { @Test public void testJobWithAFailingReceiverVertex() throws Exception { - final int parallelism = 200; + final int parallelism = 11; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -421,9 +420,6 @@ public void testJobWithAFailingReceiverVertex() throws Exception { sender.setInvokableClass(Sender.class); sender.setParallelism(parallelism); - // set failing senders - SometimesExceptionSender.configFailingSenders(parallelism); - final JobVertex receiver = new JobVertex("Receiver"); receiver.setInvokableClass(ExceptionReceiver.class); receiver.setParallelism(parallelism); @@ -446,7 +442,7 @@ public void testJobWithAFailingReceiverVertex() throws Exception { @Test public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception { - final int parallelism = 200; + final int parallelism = 11; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -484,7 +480,7 @@ public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception @Test public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception { - final int parallelism = 200; + final int parallelism = 11; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -525,7 +521,7 @@ public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception @Test public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception { - final int parallelism = 31; + final int parallelism = 11; final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .setNumTaskManagers(1) @@ -541,7 +537,7 @@ public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception { source.setInvokableClass(WaitingNoOpInvokable.class); source.setParallelism(parallelism); - final WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 500L); + final WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 20L); sink.setInvokableClass(NoOpInvokable.class); sink.setParallelism(parallelism); @@ -555,10 +551,9 @@ public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception { final CompletableFuture jobResultFuture = submissionFuture.thenCompose( (JobSubmissionResult ignored) -> miniCluster.requestJobResult(jobGraph.getJobID())); - sink.latch.await(); - - assertFalse(jobResultFuture.isDone()); jobResultFuture.get().toJobExecutionResult(getClass().getClassLoader()); + + assertTrue(sink.finalizedOnMaster.get()); } } @@ -591,21 +586,22 @@ private static JobGraph getSimpleJob(int parallelism) throws IOException { private static class WaitOnFinalizeJobVertex extends JobVertex { - private OneShotLatch latch; + private static final long serialVersionUID = -1179547322468530299L; + + private final AtomicBoolean finalizedOnMaster = new AtomicBoolean(false); private final long waitingTime; WaitOnFinalizeJobVertex(String name, long waitingTime) { super(name); - this.latch = new OneShotLatch(); this.waitingTime = waitingTime; } @Override public void finalizeOnMaster(ClassLoader loader) throws Exception { Thread.sleep(waitingTime); - latch.trigger(); + finalizedOnMaster.set(true); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java index 6ed3c4df304a2..3cb7bd25cca56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesExceptionSender.java @@ -21,9 +21,9 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import java.util.HashSet; -import java.util.Random; +import java.util.Collections; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; /** * {@link org.apache.flink.runtime.testtasks.BlockingNoOpInvokable} that sometimes fails. @@ -32,18 +32,12 @@ public class SometimesExceptionSender extends AbstractInvokable { private static Set failingSenders; - private static final Random RANDOM = new Random(); - public SometimesExceptionSender(Environment environment) { super(environment); } - static void configFailingSenders(int numOfTasks) { - failingSenders = new HashSet<>(); - - while (failingSenders.size() < 10) { - failingSenders.add(RANDOM.nextInt(numOfTasks)); - } + static void configFailingSenders(int numTasks) { + failingSenders = Collections.singleton(ThreadLocalRandom.current().nextInt(numTasks)); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java index d632da3a5c6de..a3519060d227c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/SometimesInstantiationErrorSender.java @@ -21,9 +21,10 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; -import java.util.HashSet; +import java.util.Collections; import java.util.Random; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; /** * {@link org.apache.flink.runtime.testtasks.BlockingNoOpInvokable} that sometimes fails on constructor. @@ -43,10 +44,6 @@ public SometimesInstantiationErrorSender(Environment environment) { } static void configFailingSenders(int numOfTasks) { - failingSenders = new HashSet<>(); - - while (failingSenders.size() < 10) { - failingSenders.add(RANDOM.nextInt(numOfTasks)); - } + failingSenders = Collections.singleton(ThreadLocalRandom.current().nextInt(numOfTasks)); } }