Skip to content

Commit

Permalink
[FLINK-22574] Adaptive Scheduler: Fix cancellation while in Restartin…
Browse files Browse the repository at this point in the history
…g state.

The Canceling state of Adaptive Scheduler was expecting the ExecutionGraph to be in state RUNNING when entering the state.
However, the Restarting state is cancelling the ExecutionGraph already, thus the ExectionGraph can be in state CANCELING or CANCELED when entering the Canceling state.

Calling the ExecutionGraph.cancel() method in the Canceling state while being in ExecutionGraph.state = CANCELED || CANCELLED is not a problem.

The change is guarded by a new ITCase, as this issue affects the interplay between different AS states.

This closes apache#15882
  • Loading branch information
rmetzger committed May 12, 2021
1 parent 728cddc commit 02d30ac
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ abstract class StateWithExecutionGraph implements State {
this.operatorCoordinatorHandler = operatorCoordinatorHandler;
this.kvStateHandler = new KvStateHandler(executionGraph);
this.logger = logger;
Preconditions.checkState(
executionGraph.getState() == JobStatus.RUNNING, "Assuming running execution graph");

FutureUtils.assertNoException(
executionGraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.Environment;
Expand All @@ -33,6 +35,7 @@
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
Expand All @@ -43,6 +46,7 @@

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -104,6 +108,34 @@ private JobGraph createJobGraph() {
return JobGraphTestUtils.streamingJobGraph(source, sink);
}

@Test
public void testJobCancellationWhileRestartingSucceeds() throws Exception {
final long timeInRestartingState = 10000L;

final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
final JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
alwaysFailingOperator.setParallelism(1);

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
ExecutionConfig executionConfig = new ExecutionConfig();
// configure a high delay between attempts: We'll stay in RESTARTING for 10 seconds.
executionConfig.setRestartStrategy(
RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, timeInRestartingState));
jobGraph.setExecutionConfig(executionConfig);

miniCluster.submitJob(jobGraph).join();

// wait until we are in RESTARTING state
CommonTestUtils.waitUntilCondition(
() -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING,
Deadline.fromNow(Duration.of(timeInRestartingState, ChronoUnit.MILLIS)),
5);

// now cancel while in RESTARTING state
miniCluster.cancelJob(jobGraph.getJobID()).get();
}

@Test
public void testGlobalFailoverIfTaskFails() throws Throwable {
final MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
Expand Down Expand Up @@ -160,4 +192,16 @@ private static void reset() {
hasFailed = false;
}
}

/** Always failing {@link AbstractInvokable}. */
public static final class AlwaysFailingInvokable extends AbstractInvokable {
public AlwaysFailingInvokable(Environment environment) {
super(environment);
}

@Override
public void invoke() throws Exception {
throw new FlinkRuntimeException("Test failure.");
}
}
}

0 comments on commit 02d30ac

Please sign in to comment.