Skip to content

Commit

Permalink
[hotfix] Speed up MiniClusterITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Oct 18, 2018
1 parent d316c51 commit 700377a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
Expand Down Expand Up @@ -53,10 +52,10 @@

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -67,7 +66,7 @@ public class MiniClusterITCase extends TestLogger {

@Test
public void runJobWithSingleRpcService() throws Exception {
final int parallelism = 123;
final int parallelism = 23;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand All @@ -85,7 +84,7 @@ public void runJobWithSingleRpcService() throws Exception {

@Test
public void runJobWithMultipleRpcServices() throws Exception {
final int parallelism = 123;
final int parallelism = 23;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand Down Expand Up @@ -326,7 +325,7 @@ public void testSchedulingAllAtOnce() throws Exception {

@Test
public void testJobWithAFailingSenderVertex() throws Exception {
final int parallelism = 100;
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand Down Expand Up @@ -364,7 +363,7 @@ public void testJobWithAFailingSenderVertex() throws Exception {

@Test
public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {
final int parallelism = 100;
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand Down Expand Up @@ -405,7 +404,7 @@ public void testJobWithAnOccasionallyFailingSenderVertex() throws Exception {

@Test
public void testJobWithAFailingReceiverVertex() throws Exception {
final int parallelism = 200;
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand All @@ -421,9 +420,6 @@ public void testJobWithAFailingReceiverVertex() throws Exception {
sender.setInvokableClass(Sender.class);
sender.setParallelism(parallelism);

// set failing senders
SometimesExceptionSender.configFailingSenders(parallelism);

final JobVertex receiver = new JobVertex("Receiver");
receiver.setInvokableClass(ExceptionReceiver.class);
receiver.setParallelism(parallelism);
Expand All @@ -446,7 +442,7 @@ public void testJobWithAFailingReceiverVertex() throws Exception {

@Test
public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception {
final int parallelism = 200;
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand Down Expand Up @@ -484,7 +480,7 @@ public void testJobWithAllVerticesFailingDuringInstantiation() throws Exception

@Test
public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception {
final int parallelism = 200;
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand Down Expand Up @@ -525,7 +521,7 @@ public void testJobWithSomeVerticesFailingDuringInstantiation() throws Exception

@Test
public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
final int parallelism = 31;
final int parallelism = 11;

final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setNumTaskManagers(1)
Expand All @@ -541,7 +537,7 @@ public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
source.setInvokableClass(WaitingNoOpInvokable.class);
source.setParallelism(parallelism);

final WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 500L);
final WaitOnFinalizeJobVertex sink = new WaitOnFinalizeJobVertex("Sink", 20L);
sink.setInvokableClass(NoOpInvokable.class);
sink.setParallelism(parallelism);

Expand All @@ -555,10 +551,9 @@ public void testCallFinalizeOnMasterBeforeJobCompletes() throws Exception {
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
(JobSubmissionResult ignored) -> miniCluster.requestJobResult(jobGraph.getJobID()));

sink.latch.await();

assertFalse(jobResultFuture.isDone());
jobResultFuture.get().toJobExecutionResult(getClass().getClassLoader());

assertTrue(sink.finalizedOnMaster.get());
}
}

Expand Down Expand Up @@ -591,21 +586,22 @@ private static JobGraph getSimpleJob(int parallelism) throws IOException {

private static class WaitOnFinalizeJobVertex extends JobVertex {

private OneShotLatch latch;
private static final long serialVersionUID = -1179547322468530299L;

private final AtomicBoolean finalizedOnMaster = new AtomicBoolean(false);

private final long waitingTime;

WaitOnFinalizeJobVertex(String name, long waitingTime) {
super(name);

this.latch = new OneShotLatch();
this.waitingTime = waitingTime;
}

@Override
public void finalizeOnMaster(ClassLoader loader) throws Exception {
Thread.sleep(waitingTime);
latch.trigger();
finalizedOnMaster.set(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;

import java.util.HashSet;
import java.util.Random;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* {@link org.apache.flink.runtime.testtasks.BlockingNoOpInvokable} that sometimes fails.
Expand All @@ -32,18 +32,12 @@ public class SometimesExceptionSender extends AbstractInvokable {

private static Set<Integer> failingSenders;

private static final Random RANDOM = new Random();

public SometimesExceptionSender(Environment environment) {
super(environment);
}

static void configFailingSenders(int numOfTasks) {
failingSenders = new HashSet<>();

while (failingSenders.size() < 10) {
failingSenders.add(RANDOM.nextInt(numOfTasks));
}
static void configFailingSenders(int numTasks) {
failingSenders = Collections.singleton(ThreadLocalRandom.current().nextInt(numTasks));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;

import java.util.HashSet;
import java.util.Collections;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

/**
* {@link org.apache.flink.runtime.testtasks.BlockingNoOpInvokable} that sometimes fails on constructor.
Expand All @@ -43,10 +44,6 @@ public SometimesInstantiationErrorSender(Environment environment) {
}

static void configFailingSenders(int numOfTasks) {
failingSenders = new HashSet<>();

while (failingSenders.size() < 10) {
failingSenders.add(RANDOM.nextInt(numOfTasks));
}
failingSenders = Collections.singleton(ThreadLocalRandom.current().nextInt(numOfTasks));
}
}

0 comments on commit 700377a

Please sign in to comment.