Skip to content

Commit

Permalink
[FLINK-11630] Triggers the termination of all running Tasks when shut…
Browse files Browse the repository at this point in the history
…ting down TaskExecutor

This closes apache#9072.
This closes apache#7757.
  • Loading branch information
blueszheng authored and tillrohrmann committed Aug 18, 2019
1 parent 64938e5 commit cee8a38
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -182,6 +184,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;

private final TaskCompletionTracker taskCompletionTracker;

// --------- job manager connections -----------

private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
Expand Down Expand Up @@ -273,6 +277,7 @@ public TaskExecutor(
this.currentRegistrationTimeoutId = null;

this.stackTraceSampleService = new StackTraceSampleService(rpcService.getScheduledExecutor());
this.taskCompletionTracker = new TaskCompletionTracker();
}

@Override
Expand Down Expand Up @@ -333,31 +338,46 @@ private void handleStartTaskExecutorServicesException(Exception e) throws Except
public CompletableFuture<Void> onStop() {
log.info("Stopping TaskExecutor {}.", getAddress());

Throwable throwable = null;
Throwable jobManagerDisconnectThrowable = null;

if (resourceManagerConnection != null) {
resourceManagerConnection.close();
}

FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) {
try {
disassociateFromJobManager(jobManagerConnection, new FlinkException("The TaskExecutor is shutting down."));
disassociateFromJobManager(jobManagerConnection, cause);
} catch (Throwable t) {
throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
jobManagerDisconnectThrowable = ExceptionUtils.firstOrSuppressed(t, jobManagerDisconnectThrowable);
}
}

try {
stopTaskExecutorServices();
} catch (Exception e) {
throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
final Throwable throwableBeforeTasksCompletion = jobManagerDisconnectThrowable;

return FutureUtils
.runAfterwards(
taskCompletionTracker.failIncompleteTasksAndGetTerminationFuture(),
this::stopTaskExecutorServices)
.handle((ignored, throwable) -> {
handleOnStopException(throwableBeforeTasksCompletion, throwable);
return null;
});
}

private void handleOnStopException(Throwable throwableBeforeTasksCompletion, Throwable throwableAfterTasksCompletion) {
final Throwable throwable;

if (throwableBeforeTasksCompletion != null) {
throwable = ExceptionUtils.firstOrSuppressed(throwableBeforeTasksCompletion, throwableAfterTasksCompletion);
} else {
throwable = throwableAfterTasksCompletion;
}

if (throwable != null) {
return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
throw new CompletionException(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
} else {
log.info("Stopped TaskExecutor {}.", getAddress());
return CompletableFuture.completedFuture(null);
}
}

Expand Down Expand Up @@ -596,6 +616,7 @@ public CompletableFuture<Acknowledge> submitTask(

if (taskAdded) {
task.startTaskThread();
taskCompletionTracker.trackTaskCompletion(task);

setupResultPartitionBookkeeping(tdd, task.getTerminationFuture());
return CompletableFuture.completedFuture(Acknowledge.get());
Expand Down Expand Up @@ -1826,4 +1847,30 @@ public SlotReport retrievePayload(ResourceID resourceID) {
return taskSlotTable.createSlotReport(getResourceID());
}
}

private static class TaskCompletionTracker {
private final Map<ExecutionAttemptID, Task> incompleteTasks;

private TaskCompletionTracker() {
incompleteTasks = new ConcurrentHashMap<>(8);
}

void trackTaskCompletion(Task task) {
incompleteTasks.put(task.getExecutionId(), task);
task.getTerminationFuture().thenRun(() -> incompleteTasks.remove(task.getExecutionId()));
}

CompletableFuture<Void> failIncompleteTasksAndGetTerminationFuture() {
FlinkException cause = new FlinkException("The TaskExecutor is shutting down.");
return FutureUtils.waitForAll(
incompleteTasks
.values()
.stream()
.map(task -> {
task.failExternally(cause);
return task.getTerminationFuture();
})
.collect(Collectors.toList()));
}
}
}
Loading

0 comments on commit cee8a38

Please sign in to comment.