diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java deleted file mode 100644 index 36dfa13fc4c35..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph.failover; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; -import org.apache.flink.runtime.executiongraph.SchedulingUtils; -import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy; -import org.apache.flink.runtime.executiongraph.restart.RestartCallback; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; -import org.apache.flink.runtime.scheduler.ExecutionVertexVersion; -import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.util.ExceptionUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; - -/** - * This failover strategy uses flip1.RestartPipelinedRegionFailoverStrategy to make task failover decisions. - */ -public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy { - - private static final Logger LOG = LoggerFactory.getLogger(AdaptedRestartPipelinedRegionStrategyNG.class); - - /** The execution graph on which this FailoverStrategy works. */ - private final ExecutionGraph executionGraph; - - /** The versioner helps to maintain execution vertex versions. */ - private final ExecutionVertexVersioner executionVertexVersioner; - - /** The underlying new generation region failover strategy. */ - private RestartPipelinedRegionFailoverStrategy restartPipelinedRegionFailoverStrategy; - - public AdaptedRestartPipelinedRegionStrategyNG(final ExecutionGraph executionGraph) { - this.executionGraph = checkNotNull(executionGraph); - this.executionVertexVersioner = new ExecutionVertexVersioner(); - } - - @Override - public void onTaskFailure(final Execution taskExecution, final Throwable cause) { - if (!executionGraph.getRestartStrategy().canRestart()) { - // delegate the failure to a global fail that will check the restart strategy and not restart - LOG.info("Fail to pass the restart strategy validation in region failover. Fallback to fail global."); - failGlobal(cause); - return; - } - - if (!isLocalFailoverValid(executionGraph.getGlobalModVersion())) { - LOG.info("Skip current region failover as a global failover is ongoing."); - return; - } - - final ExecutionVertexID vertexID = getExecutionVertexID(taskExecution.getVertex()); - - final Set tasksToRestart = restartPipelinedRegionFailoverStrategy.getTasksNeedingRestart(vertexID, cause); - restartTasks(tasksToRestart); - } - - @VisibleForTesting - protected void restartTasks(final Set verticesToRestart) { - final long globalModVersion = executionGraph.getGlobalModVersion(); - final Set vertexVersions = new HashSet<>( - executionVertexVersioner.recordVertexModifications(verticesToRestart).values()); - - executionGraph.incrementRestarts(); - - FutureUtils.assertNoException( - cancelTasks(verticesToRestart) - .thenComposeAsync(resetAndRescheduleTasks(globalModVersion, vertexVersions), executionGraph.getJobMasterMainThreadExecutor()) - .handle(failGlobalOnError())); - } - - private Function> resetAndRescheduleTasks(final long globalModVersion, final Set vertexVersions) { - return (ignored) -> { - final RestartStrategy restartStrategy = executionGraph.getRestartStrategy(); - return restartStrategy.restart( - createResetAndRescheduleTasksCallback(globalModVersion, vertexVersions), - executionGraph.getJobMasterMainThreadExecutor() - ); - }; - } - - private RestartCallback createResetAndRescheduleTasksCallback(final long globalModVersion, final Set vertexVersions) { - return () -> { - if (!isLocalFailoverValid(globalModVersion)) { - LOG.info("Skip current region failover as a global failover is ongoing."); - return; - } - - // found out vertices which are still valid to restart. - // some vertices involved in this failover may be modified if another region - // failover happens during the cancellation stage of this failover. - // Will ignore the modified vertices as the other failover will deal with them. - final Set unmodifiedVertices = executionVertexVersioner - .getUnmodifiedExecutionVertices(vertexVersions) - .stream() - .map(this::getExecutionVertex) - .collect(Collectors.toSet()); - - try { - LOG.info("Finally restart {} tasks to recover from task failure.", unmodifiedVertices.size()); - - // reset tasks to CREATED state and reload state - resetTasks(unmodifiedVertices, globalModVersion); - - // re-schedule tasks - rescheduleTasks(unmodifiedVertices, globalModVersion); - } catch (GlobalModVersionMismatch e) { - throw new IllegalStateException( - "Bug: ExecutionGraph was concurrently modified outside of main thread", e); - } catch (Exception e) { - throw new CompletionException(e); - } - }; - } - - private BiFunction failGlobalOnError() { - return (Object ignored, Throwable t) -> { - if (t != null) { - LOG.info("Unexpected error happens in region failover. Fail globally.", t); - failGlobal(t); - } - return null; - }; - } - - @VisibleForTesting - protected CompletableFuture cancelTasks(final Set vertices) { - final List> cancelFutures = vertices.stream() - .map(this::cancelExecutionVertex) - .collect(Collectors.toList()); - - return FutureUtils.combineAll(cancelFutures); - } - - private void resetTasks(final Set vertices, final long globalModVersion) throws Exception { - final Set colGroups = new HashSet<>(); - final long restartTimestamp = System.currentTimeMillis(); - - for (ExecutionVertex ev : vertices) { - CoLocationGroup cgroup = ev.getJobVertex().getCoLocationGroup(); - if (cgroup != null && !colGroups.contains(cgroup)){ - cgroup.resetConstraints(); - colGroups.add(cgroup); - } - - ev.resetForNewExecution(restartTimestamp, globalModVersion); - } - - // if there is checkpointed state, reload it into the executions - if (executionGraph.getCheckpointCoordinator() != null) { - // abort pending checkpoints to - // i) enable new checkpoint triggering without waiting for last checkpoint expired. - // ii) ensure the EXACTLY_ONCE semantics if needed. - executionGraph.getCheckpointCoordinator().abortPendingCheckpoints( - new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION)); - - final Map involvedExecutionJobVertices = - getInvolvedExecutionJobVertices(vertices); - executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState( - involvedExecutionJobVertices, false, true); - } - } - - private void rescheduleTasks(final Set vertices, final long globalModVersion) throws Exception { - - // sort vertices topologically - // this is to reduce the possibility that downstream tasks get launched earlier, - // which may cause lots of partition state checks in EAGER mode - final List sortedVertices = sortVerticesTopologically(vertices); - - final CompletableFuture newSchedulingFuture = SchedulingUtils.schedule( - executionGraph.getScheduleMode(), - sortedVertices, - executionGraph); - - // if no global failover is triggered in the scheduling process, - // register a failure handling callback to the scheduling - if (isLocalFailoverValid(globalModVersion)) { - newSchedulingFuture.whenComplete( - (Void ignored, Throwable throwable) -> { - if (throwable != null) { - final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable); - - if (!(strippedThrowable instanceof CancellationException)) { - // only fail if the scheduling future was not canceled - failGlobal(strippedThrowable); - } - } - }); - } - } - - private boolean isLocalFailoverValid(final long globalModVersion) { - // local failover is valid only if the job is still RUNNING and - // no global failover happens since the given globalModVersion is recorded - return executionGraph.getState() == JobStatus.RUNNING && - executionGraph.getGlobalModVersion() == globalModVersion; - } - - private CompletableFuture cancelExecutionVertex(final ExecutionVertexID executionVertexId) { - return getExecutionVertex(executionVertexId).cancel(); - } - - private Map getInvolvedExecutionJobVertices( - final Set executionVertices) { - - Map tasks = new HashMap<>(); - for (ExecutionVertex executionVertex : executionVertices) { - JobVertexID jobvertexId = executionVertex.getJobvertexId(); - ExecutionJobVertex jobVertex = executionVertex.getJobVertex(); - tasks.putIfAbsent(jobvertexId, jobVertex); - } - return tasks; - } - - private void failGlobal(final Throwable cause) { - executionGraph.failGlobal(cause); - } - - private ExecutionVertex getExecutionVertex(final ExecutionVertexID vertexID) { - return executionGraph.getAllVertices() - .get(vertexID.getJobVertexId()) - .getTaskVertices()[vertexID.getSubtaskIndex()]; - } - - private ExecutionVertexID getExecutionVertexID(final ExecutionVertex vertex) { - return vertex.getID(); - } - - private List sortVerticesTopologically(final Set vertices) { - // org execution vertex by jobVertexId - final Map> verticesMap = new HashMap<>(); - for (ExecutionVertex vertex : vertices) { - verticesMap.computeIfAbsent(vertex.getJobvertexId(), id -> new ArrayList<>()).add(vertex); - } - - // sort in jobVertex topological order - final List sortedVertices = new ArrayList<>(vertices.size()); - for (ExecutionJobVertex jobVertex : executionGraph.getVerticesTopologically()) { - sortedVertices.addAll(verticesMap.getOrDefault(jobVertex.getJobVertexId(), Collections.emptyList())); - } - return sortedVertices; - } - - @Override - public void notifyNewVertices(final List newJobVerticesTopological) { - // build the underlying new generation failover strategy when the executionGraph vertices are all added, - // otherwise the failover topology will not be correctly built. - // currently it's safe to add it here, as this method is invoked only once in production code. - checkState(restartPipelinedRegionFailoverStrategy == null, "notifyNewVertices() must be called only once"); - this.restartPipelinedRegionFailoverStrategy = new RestartPipelinedRegionFailoverStrategy( - executionGraph.getSchedulingTopology(), - executionGraph.getResultPartitionAvailabilityChecker()); - } - - @Override - public String getStrategyName() { - return "New Pipelined Region Failover"; - } - - // ------------------------------------------------------------------------ - // factory - // ------------------------------------------------------------------------ - - /** - * Factory that instantiates the AdaptedRestartPipelinedRegionStrategyNG. - */ - public static class Factory implements FailoverStrategy.Factory { - - @Override - public FailoverStrategy create(final ExecutionGraph executionGraph) { - return new AdaptedRestartPipelinedRegionStrategyNG(executionGraph); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java index d597f23978c09..f2e1e4a589282 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java @@ -38,9 +38,6 @@ public class FailoverStrategyLoader { /** Config name for the {@link RestartIndividualStrategy}. */ public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual"; - /** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */ - public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region"; - // ------------------------------------------------------------------------ /** @@ -62,9 +59,6 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config case FULL_RESTART_STRATEGY_NAME: return new RestartAllStrategy.Factory(); - case PIPELINED_REGION_RESTART_STRATEGY_NAME: - return new AdaptedRestartPipelinedRegionStrategyNG.Factory(); - case INDIVIDUAL_RESTART_STRATEGY_NAME: return new RestartIndividualStrategy.Factory(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java index aea1d98b15a90..ddedf707b05c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java @@ -25,7 +25,6 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -57,8 +56,7 @@ public void setUp() { /** * Tests that {@link CheckpointCoordinator#abortPendingCheckpoints(CheckpointException)} - * called by {@link AdaptedRestartPipelinedRegionStrategyNG} could handle - * the {@code currentPeriodicTrigger} null situation well. + * called on job failover could handle the {@code currentPeriodicTrigger} null case well. */ @Test public void testAbortPendingCheckpointsWithTriggerValidation() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java deleted file mode 100644 index a57918efed9e9..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; -import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; -import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; -import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; -import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; -import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.util.TestLogger; - -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CountDownLatch; - -import static org.apache.flink.util.Preconditions.checkState; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG}. - */ -public class AdaptedRestartPipelinedRegionStrategyNGAbortPendingCheckpointsTest extends TestLogger { - - private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; - - private ComponentMainThreadExecutor componentMainThreadExecutor; - - private SimpleAckingTaskManagerGateway taskManagerGateway; - - @Before - public void setUp() { - manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); - componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread()); - taskManagerGateway = new SimpleAckingTaskManagerGateway(); - } - - @Test - public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { - final JobGraph jobGraph = createStreamingJobGraph(); - final CountDownLatch checkpointTriggeredLatch = new CountDownLatch(1); - taskManagerGateway.setCheckpointConsumer( - (executionAttemptID, jobId, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> { - checkpointTriggeredLatch.countDown(); - }); - final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); - - final Iterator vertexIterator = executionGraph.getAllExecutionVertices().iterator(); - final ExecutionVertex firstExecutionVertex = vertexIterator.next(); - - setTasksRunning(executionGraph, firstExecutionVertex, vertexIterator.next()); - - final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); - checkState(checkpointCoordinator != null); - - checkpointCoordinator.triggerCheckpoint(false); - checkpointTriggeredLatch.await(); - assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); - long checkpointId = checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next(); - - AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( - jobGraph.getJobID(), - firstExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), - checkpointId); - - // let the first vertex acknowledge the checkpoint, and fail it afterwards - // the failover strategy should then cancel all pending checkpoints on restart - checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, "Unknown location"); - assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); - - failVertex(firstExecutionVertex); - assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); - manualMainThreadExecutor.triggerScheduledTasks(); - - assertNoPendingCheckpoints(checkpointCoordinator); - } - - private void setTasksRunning(final ExecutionGraph executionGraph, final ExecutionVertex... executionVertices) { - for (ExecutionVertex executionVertex : executionVertices) { - executionGraph.updateState( - new TaskExecutionState(executionGraph.getJobID(), - executionVertex.getCurrentExecutionAttempt().getAttemptId(), - ExecutionState.RUNNING)); - } - } - - private void failVertex(final ExecutionVertex onlyExecutionVertex) { - onlyExecutionVertex.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); - manualMainThreadExecutor.triggerAll(); - } - - private static JobGraph createStreamingJobGraph() { - final JobVertex v1 = new JobVertex("vertex1"); - final JobVertex v2 = new JobVertex("vertex2"); - v1.setInvokableClass(AbstractInvokable.class); - v2.setInvokableClass(AbstractInvokable.class); - - final JobGraph jobGraph = new JobGraph(v1, v2); - jobGraph.setScheduleMode(ScheduleMode.EAGER); - - return jobGraph; - } - - private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { - final ExecutionGraph executionGraph = TestingExecutionGraphBuilder - .newBuilder() - .setJobGraph(jobGraph) - .setRestartStrategy(new FixedDelayRestartStrategy(10, 0)) - .setFailoverStrategyFactory(AdaptedRestartPipelinedRegionStrategyNG::new) - .setSlotProvider(new SimpleSlotProvider(2, taskManagerGateway)) - .build(); - - enableCheckpointing(executionGraph); - executionGraph.start(componentMainThreadExecutor); - executionGraph.scheduleForExecution(); - manualMainThreadExecutor.triggerAll(); - return executionGraph; - } - - private static void enableCheckpointing(final ExecutionGraph executionGraph) { - final List jobVertices = new ArrayList<>(executionGraph.getAllVertices().values()); - final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration( - Long.MAX_VALUE, - Long.MAX_VALUE, - 0, - 1, - CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION, - true, - false, - false, - 0); - - executionGraph.enableCheckpointing( - checkpointCoordinatorConfiguration, - jobVertices, - jobVertices, - jobVertices, - Collections.emptyList(), - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1), - new MemoryStateBackend(), - new CheckpointStatsTracker( - 0, - jobVertices, - checkpointCoordinatorConfiguration, - new UnregisteredMetricsGroup())); - } - - private static void assertNoPendingCheckpoints(final CheckpointCoordinator checkpointCoordinator) { - assertThat(checkpointCoordinator.getPendingCheckpoints().entrySet(), is(empty())); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java deleted file mode 100644 index ace5333e0bc82..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; -import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AdaptedRestartPipelinedRegionStrategyNGFailoverTest.TestAdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; -import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.util.TestLogger; - -import org.junit.Before; -import org.junit.Test; - -import java.util.Iterator; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -import static org.junit.Assert.assertEquals; - -/** - * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling when concurrent failovers happen. - * There can be local+local and local+global concurrent failovers. - */ -public class AdaptedRestartPipelinedRegionStrategyNGConcurrentFailoverTest extends TestLogger { - - private static final JobID TEST_JOB_ID = new JobID(); - - private static final int DEFAULT_PARALLELISM = 2; - - private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; - - private ComponentMainThreadExecutor componentMainThreadExecutor; - - private TestRestartStrategy manuallyTriggeredRestartStrategy; - - @Before - public void setUp() { - manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); - componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread()); - manuallyTriggeredRestartStrategy = TestRestartStrategy.manuallyTriggered(); - } - - /** - * Tests that 2 concurrent region failovers can lead to a properly vertex state. - *
-	 *     (v11) -+-> (v21)
-	 *            x
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *       (blocking)
-	 * 
- */ - @Test - public void testConcurrentRegionFailovers() throws Exception { - - // the logic in this test is as follows: - // - start a job - // - cause {ev11} failure and delay the local recovery action via the manual executor - // - cause {ev12} failure and delay the local recovery action via the manual executor - // - resume local recovery actions - // - validate that each task is restarted only once - - final ExecutionGraph eg = createExecutionGraph(); - - final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = - (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); - failoverStrategy.setBlockerFuture(new CompletableFuture<>()); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - final ExecutionVertex ev12 = vertexIterator.next(); - final ExecutionVertex ev21 = vertexIterator.next(); - final ExecutionVertex ev22 = vertexIterator.next(); - - // start job scheduling - eg.scheduleForExecution(); - manualMainThreadExecutor.triggerAll(); - - // fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22} - ev11.getCurrentExecutionAttempt().fail(new Exception("task failure 1")); - manualMainThreadExecutor.triggerAll(); - assertEquals(ExecutionState.FAILED, ev11.getExecutionState()); - assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); - assertEquals(ExecutionState.CANCELED, ev21.getExecutionState()); - assertEquals(ExecutionState.CANCELED, ev22.getExecutionState()); - - // fail ev12 to trigger region failover of {ev12}, {ev21}, {ev22} - ev12.getCurrentExecutionAttempt().fail(new Exception("task failure 2")); - manualMainThreadExecutor.triggerAll(); - assertEquals(ExecutionState.FAILED, ev11.getExecutionState()); - assertEquals(ExecutionState.FAILED, ev12.getExecutionState()); - assertEquals(ExecutionState.CANCELED, ev21.getExecutionState()); - assertEquals(ExecutionState.CANCELED, ev22.getExecutionState()); - - // complete region failover blocker to trigger region failover recovery - failoverStrategy.getBlockerFuture().complete(null); - manualMainThreadExecutor.triggerAll(); - manuallyTriggeredRestartStrategy.triggerAll(); - manualMainThreadExecutor.triggerAll(); - - // verify that all tasks are recovered and no task is restarted more than once - assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); - assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); - assertEquals(ExecutionState.CREATED, ev21.getExecutionState()); - assertEquals(ExecutionState.CREATED, ev22.getExecutionState()); - assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); - } - - /** - * Tests that a global failover will take precedence over local failovers. - *
-	 *     (v11) -+-> (v21)
-	 *            x
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *       (blocking)
-	 * 
- */ - @Test - public void testRegionFailoverInterruptedByGlobalFailover() throws Exception { - - // the logic in this test is as follows: - // - start a job - // - cause a task failure and delay the local recovery action via the manual executor - // - cause a global failure - // - resume in local recovery action - // - validate that the local recovery does not restart tasks - - final ExecutionGraph eg = createExecutionGraph(); - - final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = - (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); - failoverStrategy.setBlockerFuture(new CompletableFuture<>()); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - final ExecutionVertex ev12 = vertexIterator.next(); - final ExecutionVertex ev21 = vertexIterator.next(); - final ExecutionVertex ev22 = vertexIterator.next(); - - // start job scheduling - eg.scheduleForExecution(); - manualMainThreadExecutor.triggerAll(); - - // fail ev11 to trigger region failover of {ev11}, {ev21}, {ev22} - ev11.getCurrentExecutionAttempt().fail(new Exception("task failure")); - manualMainThreadExecutor.triggerAll(); - assertEquals(JobStatus.RUNNING, eg.getState()); - assertEquals(ExecutionState.FAILED, ev11.getExecutionState()); - assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); - assertEquals(ExecutionState.CANCELED, ev21.getExecutionState()); - assertEquals(ExecutionState.CANCELED, ev22.getExecutionState()); - - // trigger global failover cancelling and immediately recovery - eg.failGlobal(new Exception("Test global failure")); - ev12.getCurrentExecutionAttempt().completeCancelling(); - manuallyTriggeredRestartStrategy.triggerNextAction(); - manualMainThreadExecutor.triggerAll(); - - // verify the job state and vertex attempt number - assertEquals(2, eg.getGlobalModVersion()); - assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); - - // complete region failover blocker to trigger region failover - failoverStrategy.getBlockerFuture().complete(null); - manualMainThreadExecutor.triggerAll(); - - // verify that no task is restarted by region failover - assertEquals(ExecutionState.DEPLOYING, ev11.getExecutionState()); - assertEquals(ExecutionState.DEPLOYING, ev12.getExecutionState()); - assertEquals(ExecutionState.CREATED, ev21.getExecutionState()); - assertEquals(ExecutionState.CREATED, ev22.getExecutionState()); - assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev12.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); - } - - @Test - public void testSkipFailoverIfExecutionStateIsNotRunning() throws Exception { - final ExecutionGraph executionGraph = createExecutionGraph(); - - final Iterator vertexIterator = executionGraph.getAllExecutionVertices().iterator(); - final ExecutionVertex firstVertex = vertexIterator.next(); - - executionGraph.cancel(); - - final FailoverStrategy failoverStrategy = executionGraph.getFailoverStrategy(); - failoverStrategy.onTaskFailure(firstVertex.getCurrentExecutionAttempt(), new Exception("Test Exception")); - manualMainThreadExecutor.triggerAll(); - - assertEquals(ExecutionState.CANCELED, firstVertex.getExecutionState()); - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - /** - * Creating a sample ExecutionGraph for testing with topology as below. - *
-	 *     (v11) -+-> (v21)
-	 *            x
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *       (blocking)
-	 * 
- * 4 regions. Each consists of one individual execution vertex. - */ - private ExecutionGraph createExecutionGraph() throws Exception { - - final JobVertex v1 = new JobVertex("vertex1"); - v1.setInvokableClass(NoOpInvokable.class); - v1.setParallelism(DEFAULT_PARALLELISM); - - final JobVertex v2 = new JobVertex("vertex2"); - v2.setInvokableClass(NoOpInvokable.class); - v2.setParallelism(DEFAULT_PARALLELISM); - - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - - final JobGraph jg = new JobGraph(TEST_JOB_ID, "testjob", v1, v2); - - final SimpleSlotProvider slotProvider = new SimpleSlotProvider(DEFAULT_PARALLELISM); - - final JobMasterPartitionTracker partitionTracker = new JobMasterPartitionTrackerImpl( - jg.getJobID(), - NettyShuffleMaster.INSTANCE, - ignored -> Optional.empty()); - - final ExecutionGraph graph = TestingExecutionGraphBuilder - .newBuilder() - .setJobGraph(jg) - .setRestartStrategy(manuallyTriggeredRestartStrategy) - .setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new) - .setSlotProvider(slotProvider) - .setPartitionTracker(partitionTracker) - .build(); - - graph.start(componentMainThreadExecutor); - - return graph; - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java deleted file mode 100644 index 87ca389fde59e..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/AdaptedRestartPipelinedRegionStrategyNGFailoverTest.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.JobStatus; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.failover.AdaptedRestartPipelinedRegionStrategyNG; -import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy; -import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; -import org.apache.flink.runtime.executiongraph.restart.RestartCallback; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTrackerImpl; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; -import org.apache.flink.util.TestLogger; - -import org.junit.Before; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertThat; - -/** - * Tests for {@link AdaptedRestartPipelinedRegionStrategyNG} failover handling. - */ -public class AdaptedRestartPipelinedRegionStrategyNGFailoverTest extends TestLogger { - - private static final JobID TEST_JOB_ID = new JobID(); - - private ComponentMainThreadExecutor componentMainThreadExecutor; - - private ManuallyTriggeredScheduledExecutor manualMainThreadExecutor; - - @Before - public void setUp() { - manualMainThreadExecutor = new ManuallyTriggeredScheduledExecutor(); - componentMainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter(manualMainThreadExecutor, Thread.currentThread()); - } - - /** - * Tests for region failover for job in EAGER mode. - * This applies to streaming job, with no BLOCKING edge. - *
-	 *     (v11) ---> (v21)
-	 *
-	 *     (v12) ---> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *       (pipelined)
-	 * 
- */ - @Test - public void testRegionFailoverInEagerMode() throws Exception { - // create a streaming job graph with EAGER schedule mode - final JobGraph jobGraph = createStreamingJobGraph(); - final ExecutionGraph eg = createExecutionGraph(jobGraph); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - final ExecutionVertex ev12 = vertexIterator.next(); - final ExecutionVertex ev21 = vertexIterator.next(); - final ExecutionVertex ev22 = vertexIterator.next(); - - // trigger task failure of ev11 - // vertices { ev11, ev21 } should be affected - ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); - manualMainThreadExecutor.triggerAll(); - manualMainThreadExecutor.triggerScheduledTasks(); - - // verify vertex states and complete cancellation - assertVertexInState(ExecutionState.FAILED, ev11); - assertVertexInState(ExecutionState.DEPLOYING, ev12); - assertVertexInState(ExecutionState.CANCELING, ev21); - assertVertexInState(ExecutionState.DEPLOYING, ev22); - ev21.getCurrentExecutionAttempt().completeCancelling(); - manualMainThreadExecutor.triggerAll(); - manualMainThreadExecutor.triggerScheduledTasks(); - - // verify vertex states - // in eager mode, all affected vertices should be scheduled in failover - assertVertexInState(ExecutionState.DEPLOYING, ev11); - assertVertexInState(ExecutionState.DEPLOYING, ev12); - assertVertexInState(ExecutionState.DEPLOYING, ev21); - assertVertexInState(ExecutionState.DEPLOYING, ev22); - - // verify attempt number - assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(0, ev22.getCurrentExecutionAttempt().getAttemptNumber()); - } - - /** - * Tests for scenario where a task fails for its own error, in which case the - * region containing the failed task and its consumer regions should be restarted. - *
-	 *     (v11) -+-> (v21)
-	 *            x
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *        (blocking)
-	 * 
- */ - @Test - public void testRegionFailoverForRegionInternalErrorsInLazyMode() throws Exception { - final JobGraph jobGraph = createBatchJobGraph(); - final ExecutionGraph eg = createExecutionGraph(jobGraph); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - final ExecutionVertex ev12 = vertexIterator.next(); - final ExecutionVertex ev21 = vertexIterator.next(); - final ExecutionVertex ev22 = vertexIterator.next(); - - // trigger task failure of ev11 - // regions {ev11}, {ev21}, {ev22} should be affected - ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); - manualMainThreadExecutor.triggerAll(); - manualMainThreadExecutor.triggerScheduledTasks(); - - // verify vertex states - // only vertices with consumable inputs can be scheduled - assertVertexInState(ExecutionState.DEPLOYING, ev11); - assertVertexInState(ExecutionState.DEPLOYING, ev12); - assertVertexInState(ExecutionState.CREATED, ev21); - assertVertexInState(ExecutionState.CREATED, ev22); - - // verify attempt number - assertEquals(1, ev11.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(0, ev12.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev21.getCurrentExecutionAttempt().getAttemptNumber()); - assertEquals(1, ev22.getCurrentExecutionAttempt().getAttemptNumber()); - } - - /** - * Tests that the failure is properly propagated to underlying strategy - * to calculate tasks to restart. - *
-	 *     (v11) -+-> (v21)
-	 *            x
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *        (blocking)
-	 * 
- */ - @Test - public void testFailurePropagationToUnderlyingStrategy() throws Exception { - final JobGraph jobGraph = createBatchJobGraph(); - final ExecutionGraph eg = createExecutionGraph(jobGraph); - - final TestAdaptedRestartPipelinedRegionStrategyNG failoverStrategy = - (TestAdaptedRestartPipelinedRegionStrategyNG) eg.getFailoverStrategy(); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - final ExecutionVertex ev12 = vertexIterator.next(); - final ExecutionVertex ev21 = vertexIterator.next(); - final ExecutionVertex ev22 = vertexIterator.next(); - - // finish upstream regions to trigger scheduling of downstream regions - ev11.getCurrentExecutionAttempt().markFinished(); - ev12.getCurrentExecutionAttempt().markFinished(); - - // trigger task failure of ev21 on consuming data from ev11 - Exception taskFailureCause = new PartitionConnectionException( - new ResultPartitionID( - ev11.getProducedPartitions().keySet().iterator().next(), - ev11.getCurrentExecutionAttempt().getAttemptId()), - new Exception("Test failure")); - ev21.getCurrentExecutionAttempt().fail(taskFailureCause); - manualMainThreadExecutor.triggerAll(); - - assertThat(failoverStrategy.getLastTasksToCancel(), - containsInAnyOrder(ev11.getID(), ev21.getID(), ev22.getID())); - } - - /** - * Tests that when a task fail, and restart strategy doesn't support restarting, the job will go to failed. - */ - @Test - public void testNoRestart() throws Exception { - final JobGraph jobGraph = createBatchJobGraph(); - final NoRestartStrategy restartStrategy = new NoRestartStrategy(); - final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy); - - final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); - - ev.fail(new Exception("Test Exception")); - - for (ExecutionVertex evs : eg.getAllExecutionVertices()) { - evs.getCurrentExecutionAttempt().completeCancelling(); - } - - manualMainThreadExecutor.triggerAll(); - - assertEquals(JobStatus.FAILED, eg.getState()); - } - - /** - * Tests that the execution of the restart logic of the failover strategy is dependent on the restart strategy - * calling {@link RestartCallback#triggerFullRecovery()}. - */ - @Test - public void testFailoverExecutionDependentOnRestartStrategyRecoveryTrigger() throws Exception { - final JobGraph jobGraph = createBatchJobGraph(); - final TestRestartStrategy restartStrategy = new TestRestartStrategy(); - - final ExecutionGraph eg = createExecutionGraph(jobGraph, restartStrategy); - - final ExecutionVertex ev = eg.getAllExecutionVertices().iterator().next(); - - ev.fail(new Exception("Test Exception")); - - manualMainThreadExecutor.triggerAll(); - - // the entire failover-procedure is being halted by the restart strategy not doing anything - // the only thing the failover strategy should do is cancel tasks that require it - - // sanity check to ensure we actually called into the restart strategy - assertEquals(restartStrategy.getNumberOfQueuedActions(), 1); - // 3 out of 4 tasks will be canceled, and removed from the set of registered executions - assertEquals(eg.getRegisteredExecutions().size(), 1); - // no job state change should occur; in case of a failover we never switch to RESTARTING/CANCELED - // the important thing is that we don't switch to failed which would imply that we started a global failover - assertEquals(JobStatus.RUNNING, eg.getState()); - } - - @Test - public void testFailGlobalIfErrorOnRestartTasks() throws Exception { - final JobGraph jobGraph = createStreamingJobGraph(); - final ExecutionGraph eg = createExecutionGraph(jobGraph, new FailingRestartStrategy(1)); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - final ExecutionVertex ev12 = vertexIterator.next(); - final ExecutionVertex ev21 = vertexIterator.next(); - final ExecutionVertex ev22 = vertexIterator.next(); - - final long globalModVersionBeforeFailure = eg.getGlobalModVersion(); - - ev11.fail(new Exception("Test Exception")); - completeCancelling(ev11, ev12, ev21, ev22); - - manualMainThreadExecutor.triggerAll(); - manualMainThreadExecutor.triggerScheduledTasks(); - - final long globalModVersionAfterFailure = eg.getGlobalModVersion(); - - assertNotEquals(globalModVersionBeforeFailure, globalModVersionAfterFailure); - } - - @Test - public void testCountingRestarts() throws Exception { - final JobGraph jobGraph = createStreamingJobGraph(); - final ExecutionGraph eg = createExecutionGraph(jobGraph); - - final Iterator vertexIterator = eg.getAllExecutionVertices().iterator(); - final ExecutionVertex ev11 = vertexIterator.next(); - - // trigger task failure for fine grained recovery - ev11.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); - assertEquals(1, eg.getNumberOfRestarts()); - - // trigger global failover - eg.failGlobal(new Exception("Force failover global")); - assertEquals(2, eg.getNumberOfRestarts()); - } - - // ------------------------------- Test Utils ----------------------------------------- - - /** - * Creating job graph as below (execution view). - * It's a representative of streaming job. - *
-	 *     (v11) -+-> (v21)
-	 *
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *       (pipelined)
-	 * 
- * 2 regions. Each has 2 pipelined connected vertices. - */ - private JobGraph createStreamingJobGraph() { - final JobVertex v1 = new JobVertex("vertex1"); - final JobVertex v2 = new JobVertex("vertex2"); - - v1.setParallelism(2); - v2.setParallelism(2); - - v1.setInvokableClass(AbstractInvokable.class); - v2.setInvokableClass(AbstractInvokable.class); - - v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED); - - final JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "Testjob", v1, v2); - jobGraph.setScheduleMode(ScheduleMode.EAGER); - - return jobGraph; - } - - /** - * Creating job graph as below (execution view). - * It's a representative of batch job. - *
-	 *     (v11) -+-> (v21)
-	 *            x
-	 *     (v12) -+-> (v22)
-	 *
-	 *            ^
-	 *            |
-	 *        (blocking)
-	 * 
- * 4 regions. Each consists of one individual vertex. - */ - private JobGraph createBatchJobGraph() { - final JobVertex v1 = new JobVertex("vertex1"); - final JobVertex v2 = new JobVertex("vertex2"); - - v1.setParallelism(2); - v2.setParallelism(2); - - v1.setInvokableClass(AbstractInvokable.class); - v2.setInvokableClass(AbstractInvokable.class); - - v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - - final JobGraph jobGraph = new JobGraph(v1, v2); - jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES); - - return jobGraph; - } - - private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { - return createExecutionGraph(jobGraph, new FixedDelayRestartStrategy(10, 0)); - } - - private ExecutionGraph createExecutionGraph( - final JobGraph jobGraph, - final RestartStrategy restartStrategy) throws Exception { - - final JobMasterPartitionTracker partitionTracker = new JobMasterPartitionTrackerImpl( - jobGraph.getJobID(), - NettyShuffleMaster.INSTANCE, - ignored -> Optional.empty()); - - final ExecutionGraph eg = TestingExecutionGraphBuilder - .newBuilder() - .setJobGraph(jobGraph) - .setRestartStrategy(restartStrategy) - .setFailoverStrategyFactory(TestAdaptedRestartPipelinedRegionStrategyNG::new) - .setPartitionTracker(partitionTracker) - .build(); - - eg.start(componentMainThreadExecutor); - eg.scheduleForExecution(); - manualMainThreadExecutor.triggerAll(); - - return eg; - } - - private static void assertVertexInState(final ExecutionState state, final ExecutionVertex vertex) { - assertEquals(state, vertex.getExecutionState()); - } - - private static void completeCancelling(ExecutionVertex... executionVertices) { - for (final ExecutionVertex executionVertex : executionVertices) { - executionVertex.getCurrentExecutionAttempt().completeCancelling(); - } - } - - /** - * Test implementation of the {@link AdaptedRestartPipelinedRegionStrategyNG} that makes it possible - * to control when the failover action is performed via {@link CompletableFuture}. - * It also exposes some internal state of {@link AdaptedRestartPipelinedRegionStrategyNG}. - */ - static class TestAdaptedRestartPipelinedRegionStrategyNG extends AdaptedRestartPipelinedRegionStrategyNG { - - private CompletableFuture blockerFuture; - - private Set lastTasksToRestart; - - TestAdaptedRestartPipelinedRegionStrategyNG(ExecutionGraph executionGraph) { - super(executionGraph); - this.blockerFuture = CompletableFuture.completedFuture(null); - } - - void setBlockerFuture(CompletableFuture blockerFuture) { - this.blockerFuture = blockerFuture; - } - - @Override - protected void restartTasks(final Set verticesToRestart) { - this.lastTasksToRestart = verticesToRestart; - super.restartTasks(verticesToRestart); - } - - @Override - protected CompletableFuture cancelTasks(final Set vertices) { - final List> terminationAndBlocker = Arrays.asList( - super.cancelTasks(vertices), - blockerFuture); - return FutureUtils.waitForAll(terminationAndBlocker); - } - - CompletableFuture getBlockerFuture() { - return blockerFuture; - } - - Set getLastTasksToCancel() { - return lastTasksToRestart; - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index a44ed31b64069..59532606d1100 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -61,7 +61,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; @@ -1060,7 +1060,7 @@ public void testResourceManagerConnectionAfterRegainingLeadership() throws Excep public void testRequestNextInputSplitWithLocalFailover() throws Exception { configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, - FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME); + FailoverStrategyFactoryLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME); final Function>, Collection> expectFailedExecutionInputSplits = inputSplitsPerTask -> inputSplitsPerTask.get(0); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java index bf12e111823b1..6bcc60acaf159 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/BatchFineGrainedRecoveryITCase.java @@ -91,7 +91,7 @@ import java.util.stream.IntStream; import java.util.stream.LongStream; -import static org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME; +import static org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategyFactoryLoader.PIPELINED_REGION_RESTART_STRATEGY_NAME; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat;