Skip to content

Commit

Permalink
[FLINK-9788] Fix ExecutionGraph inconsistency for global failures whe…
Browse files Browse the repository at this point in the history
…n restarting

The problem was that a concurrent global failure could start a concurrent
restart operation without terminating the previous operation. Terminating
the previous restart operation means to cancel all current Executions and
wait for cancellation completion. Due to the missing wait, it could happen
that previously reset Executions are being tried to reset again. This violates
a sanity check and would lead to a restart loop.

The problem is fixed by not distinguishing between a fail which happens in
state JobStatus.RESTARTING and in any other state. Due to this, we will always
cancel all existing Executions and only trigger the restart after all Executions
have reached a terminal state.
  • Loading branch information
tillrohrmann committed Oct 11, 2018
1 parent 0b0c261 commit 5ae68f8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) {
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
}
else if (current == JobStatus.RESTARTING) {
// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
initFailureCause(t);

final long globalVersionForRestart = incrementGlobalModVersion();
if (tryRestartOrFail(globalVersionForRestart)) {
return;
}
}
else if (transitionState(current, JobStatus.FAILING, t)) {
} else if (transitionState(current, JobStatus.FAILING, t)) {
initFailureCause(t);

// make sure no concurrent local or global actions interfere with the failover
Expand Down Expand Up @@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) {
colGroups.add(cgroup);
}

jv.resetForNewExecution(resetTimestamp, globalModVersion);
jv.resetForNewExecution(resetTimestamp, expectedGlobalVersion);
}

for (int i = 0; i < stateTimestamps.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
Expand Down Expand Up @@ -57,12 +58,15 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;

import org.junit.After;
import org.junit.Test;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Iterator;
Expand All @@ -86,8 +90,11 @@
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -217,23 +224,21 @@ public void testFailWhileRestarting() throws Exception {
// Kill the instance and wait for the job to restart
instance.markDead();

Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();

while (deadline.hasTimeLeft() &&
executionGraph.getState() != JobStatus.RESTARTING) {

Thread.sleep(100);
}
waitUntilJobStatus(executionGraph, JobStatus.RESTARTING, TestingUtils.TESTING_DURATION().toMillis());

assertEquals(JobStatus.RESTARTING, executionGraph.getState());

// The restarting should not fail with an ordinary exception
executionGraph.failGlobal(new Exception("Test exception"));
// If we fail when being in RESTARTING, then we should try to restart again
final long globalModVersion = executionGraph.getGlobalModVersion();
final Exception testException = new Exception("Test exception");
executionGraph.failGlobal(testException);

assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion());
assertEquals(JobStatus.RESTARTING, executionGraph.getState());
assertEquals(testException, executionGraph.getFailureCause()); // we should have updated the failure cause

// but it should fail when sending a SuppressRestartsException
executionGraph.failGlobal(new SuppressRestartsException(new Exception("Test exception")));
executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart exception")));

assertEquals(JobStatus.FAILED, executionGraph.getState());

Expand Down Expand Up @@ -770,6 +775,68 @@ public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception {
}
}

/**
* Tests that the {@link ExecutionGraph} can handle concurrent failures while
* being in the RESTARTING state.
*/
@Test
public void testConcurrentFailureWhileRestarting() throws Exception {
final long timeout = 5000L;

final CountDownLatch countDownLatch = new CountDownLatch(2);
final CountDownLatchRestartStrategy restartStrategy = new CountDownLatchRestartStrategy(countDownLatch);
final ExecutionGraph executionGraph = createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored -> new CompletableFuture<>()));

executionGraph.setQueuedSchedulingAllowed(true);
executionGraph.scheduleForExecution();

assertThat(executionGraph.getState(), is(JobStatus.RUNNING));

executionGraph.failGlobal(new FlinkException("Test exception"));

executor.execute(() -> {
countDownLatch.countDown();
try {
countDownLatch.await();
} catch (InterruptedException e) {
ExceptionUtils.rethrow(e);
}

executionGraph.failGlobal(new FlinkException("Concurrent exception"));
});

waitUntilJobStatus(executionGraph, JobStatus.RUNNING, timeout);
}

private static final class CountDownLatchRestartStrategy implements RestartStrategy {

private final CountDownLatch countDownLatch;

private CountDownLatchRestartStrategy(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public boolean canRestart() {
return true;
}

@Override
public void restart(RestartCallback restarter, ScheduledExecutor executor) {
executor.execute(() -> {
countDownLatch.countDown();

try {
countDownLatch.await();
} catch (InterruptedException e) {
ExceptionUtils.rethrow(e);
}

restarter.triggerFullRecovery();
});
}
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -858,12 +925,7 @@ private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStra
Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
scheduler.newInstanceAvailable(instance);

JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class);

JobGraph jobGraph = new JobGraph("Pointwise job", sender);

ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler);
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler);

assertEquals(JobStatus.CREATED, eg.getState());

Expand All @@ -872,7 +934,23 @@ private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStra
return new Tuple2<>(eg, instance);
}

private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler scheduler) throws IOException {
private static ExecutionGraph createSimpleExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException, JobException {
JobGraph jobGraph = createJobGraph(NUM_TASKS);

ExecutionGraph eg = newExecutionGraph(restartStrategy, slotProvider);
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());

return eg;
}

@Nonnull
private static JobGraph createJobGraph(int parallelism) {
JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", parallelism, NoOpInvokable.class);

return new JobGraph("Pointwise job", sender);
}

private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider slotProvider) throws IOException {
return new ExecutionGraph(
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
Expand All @@ -882,7 +960,7 @@ private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy,
new SerializedValue<>(new ExecutionConfig()),
AkkaUtils.getDefaultTimeout(),
restartStrategy,
scheduler);
slotProvider);
}

private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException, TimeoutException {
Expand Down

0 comments on commit 5ae68f8

Please sign in to comment.