Skip to content

Commit

Permalink
[FLINK-21399][coordination][tests] Provide enough slots for job deplo…
Browse files Browse the repository at this point in the history
…yment
  • Loading branch information
zentol committed Feb 24, 2021
1 parent 87c3933 commit 7c286d7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,6 @@ public void testRequestNextInputSplitWithLocalFailover() throws Exception {
}

@Test
@Category(FailsWithAdaptiveScheduler.class) // FLINK-21399
public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
configuration.set(
Expand Down Expand Up @@ -1100,7 +1099,7 @@ private void runRequestNextInputSplitTest(
source.setInvokableClass(AbstractInvokable.class);

final JobGraph inputSplitJobGraph = new JobGraph(source);
jobGraph.setJobType(JobType.STREAMING);
inputSplitJobGraph.setJobType(JobType.STREAMING);

final ExecutionConfig executionConfig = new ExecutionConfig();
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0));
Expand All @@ -1119,6 +1118,10 @@ private void runRequestNextInputSplitTest(
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);

registerSlotsRequiredForJobExecution(jobMasterGateway, parallelism);

waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);

final JobVertexID sourceId = source.getID();

final List<AccessExecution> executions = getExecutions(jobMasterGateway, sourceId);
Expand All @@ -1139,16 +1142,14 @@ private void runRequestNextInputSplitTest(
allRequestedInputSplits,
containsInAnyOrder(allInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS)));

waitUntilAllExecutionsAreScheduled(jobMasterGateway);

// fail the first execution to trigger a failover
jobMasterGateway
.updateTaskExecutionState(
new TaskExecutionState(initialAttemptId, ExecutionState.FAILED))
.get();

// wait until the job has been recovered
waitUntilAllExecutionsAreScheduled(jobMasterGateway);
waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);

final ExecutionAttemptID restartedAttemptId =
getFirstExecution(jobMasterGateway, sourceId).getAttemptId();
Expand Down Expand Up @@ -1181,8 +1182,8 @@ private Supplier<SerializedInputSplit> getInputSplitSupplier(
return () -> getInputSplit(jobMasterGateway, jobVertexID, initialAttemptId);
}

private void waitUntilAllExecutionsAreScheduled(final JobMasterGateway jobMasterGateway)
throws Exception {
private void waitUntilAllExecutionsAreScheduledOrDeployed(
final JobMasterGateway jobMasterGateway) throws Exception {
final Duration duration = Duration.ofMillis(testingTimeout.toMilliseconds());
final Deadline deadline = Deadline.fromNow(duration);

Expand All @@ -1191,7 +1192,9 @@ private void waitUntilAllExecutionsAreScheduled(final JobMasterGateway jobMaster
getExecutions(jobMasterGateway).stream()
.allMatch(
execution ->
execution.getState() == ExecutionState.SCHEDULED),
execution.getState() == ExecutionState.SCHEDULED
|| execution.getState()
== ExecutionState.DEPLOYING),
deadline);
}

Expand Down Expand Up @@ -1977,4 +1980,11 @@ public StreamStateHandle getMetadataHandle() {
@Override
public void disposeStorageLocation() throws IOException {}
}

private static void registerSlotsRequiredForJobExecution(
JobMasterGateway jobMasterGateway, int numSlots)
throws ExecutionException, InterruptedException {
JobMasterTestUtils.registerTaskExecutorAndOfferSlots(
rpcService, jobMasterGateway, numSlots, testingTimeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -43,7 +47,16 @@ public static void registerTaskExecutorAndOfferSlots(
throws ExecutionException, InterruptedException {

final TaskExecutorGateway taskExecutorGateway =
new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
new TestingTaskExecutorGatewayBuilder()
.setCancelTaskFunction(
executionAttemptId -> {
jobMasterGateway.updateTaskExecutionState(
new TaskExecutionState(
executionAttemptId, ExecutionState.CANCELED));
return CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();

final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation =
new LocalUnresolvedTaskManagerLocation();

Expand Down

0 comments on commit 7c286d7

Please sign in to comment.