Skip to content

Commit

Permalink
[FLINK-17760][runtime, tests] Add test to verify that vertices are pr…
Browse files Browse the repository at this point in the history
…operly restarted on slot allocation failures
  • Loading branch information
zhuzhurk committed Dec 17, 2020
1 parent bc80c44 commit e6f888e
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
if (!released) {
released = true;

tryAssignPayload(TERMINATED_PAYLOAD);
payloadReference.get().fail(cause);

slotOwner.returnLogicalSlot(this);

if (automaticallyCompleteReleaseFuture) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
Expand Down Expand Up @@ -77,6 +78,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
Expand Down Expand Up @@ -311,6 +313,66 @@ public void failJobIfNotEnoughResources() throws Exception {
assertThat(jobStatus, is(equalTo(JobStatus.FAILED)));
}

@Test
public void restartVerticesOnSlotAllocationTimeout() throws Exception {
testExecutionSlotAllocator.disableAutoCompletePendingRequests();
testRestartVerticesOnFailuresInScheduling(vid -> testExecutionSlotAllocator.timeoutPendingRequest(vid));
}

@Test
public void restartVerticesOnAssignedSlotReleased() throws Exception {
testExecutionSlotAllocator.disableAutoCompletePendingRequests();
testRestartVerticesOnFailuresInScheduling(vid -> {
final LogicalSlot slot = testExecutionSlotAllocator.completePendingRequest(vid);
slot.releaseSlot(new Exception("Release slot for test"));
});
}

private void testRestartVerticesOnFailuresInScheduling(Consumer<ExecutionVertexID> actionsToTriggerTaskFailure) throws Exception {
final int parallelism = 2;
final JobVertex v1 = createVertex("vertex1", parallelism);
final JobVertex v2 = createVertex("vertex2", parallelism);
v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final JobGraph jobGraph = new JobGraph(v1, v2);

testExecutionSlotAllocator.disableAutoCompletePendingRequests();
final TestSchedulingStrategy.Factory schedulingStrategyFactory = new TestSchedulingStrategy.Factory();
final DefaultScheduler scheduler = createScheduler(
jobGraph,
schedulingStrategyFactory,
new RestartPipelinedRegionFailoverStrategy.Factory());
final TestSchedulingStrategy schedulingStrategy = schedulingStrategyFactory.getLastCreatedSchedulingStrategy();
startScheduling(scheduler);

final ExecutionVertexID vid11 = new ExecutionVertexID(v1.getID(), 0);
final ExecutionVertexID vid12 = new ExecutionVertexID(v1.getID(), 1);
final ExecutionVertexID vid21 = new ExecutionVertexID(v2.getID(), 0);
final ExecutionVertexID vid22 = new ExecutionVertexID(v2.getID(), 1);
schedulingStrategy.schedule(Arrays.asList(vid11, vid12, vid21, vid22));

assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(4));

actionsToTriggerTaskFailure.accept(vid11);

final Iterator<ArchivedExecutionVertex> vertexIterator = scheduler.requestJob().getAllExecutionVertices().iterator();
final ArchivedExecutionVertex ev11 = vertexIterator.next();
final ArchivedExecutionVertex ev12 = vertexIterator.next();
final ArchivedExecutionVertex ev21 = vertexIterator.next();
final ArchivedExecutionVertex ev22 = vertexIterator.next();

// ev11 and ev21 needs to be restarted because it is pipelined region failover and
// they are in the same region. ev12 and ev22 will not be affected
assertThat(testExecutionSlotAllocator.getPendingRequests().keySet(), hasSize(2));
assertThat(ev11.getExecutionState(), is(ExecutionState.FAILED));
assertThat(ev21.getExecutionState(), is(ExecutionState.CANCELED));
assertThat(ev12.getExecutionState(), is(ExecutionState.SCHEDULED));
assertThat(ev22.getExecutionState(), is(ExecutionState.SCHEDULED));

taskRestartExecutor.triggerScheduledTasks();
assertThat(schedulingStrategy.getReceivedVerticesToRestart(), containsInAnyOrder(vid11, vid21));
}

@Test
public void skipDeploymentIfVertexVersionOutdated() {
testExecutionSlotAllocator.disableAutoCompletePendingRequests();
Expand Down Expand Up @@ -746,10 +808,15 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception
}

private static JobVertex createVertexWithAllInputConstraints(String name, int parallelism) {
final JobVertex v = createVertex(name, parallelism);
v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
return v;
}

private static JobVertex createVertex(String name, int parallelism) {
final JobVertex v = new JobVertex(name);
v.setParallelism(parallelism);
v.setInvokableClass(AbstractInvokable.class);
v.setInputDependencyConstraint(InputDependencyConstraint.ALL);
return v;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ public void completePendingRequests() {
vertexIds.forEach(this::completePendingRequest);
}

public void completePendingRequest(final ExecutionVertexID executionVertexId) {
public LogicalSlot completePendingRequest(final ExecutionVertexID executionVertexId) {
final LogicalSlot slot = logicalSlotBuilder.setSlotOwner(this).createTestingLogicalSlot();
final SlotExecutionVertexAssignment slotVertexAssignment = removePendingRequest(executionVertexId);
checkState(slotVertexAssignment != null);
slotVertexAssignment
.getLogicalSlotFuture()
.complete(logicalSlotBuilder
.setSlotOwner(this)
.createTestingLogicalSlot());
.complete(slot);
return slot;
}

private SlotExecutionVertexAssignment removePendingRequest(final ExecutionVertexID executionVertexId) {
Expand Down

0 comments on commit e6f888e

Please sign in to comment.