Skip to content

Commit

Permalink
[FLINK-28586][runtime] Refactor some OperatorCoordinator methods to b…
Browse files Browse the repository at this point in the history
…e aware of execution attempts
  • Loading branch information
zhuzhurk committed Jul 22, 2022
1 parent 9af271f commit bedcc3f
Show file tree
Hide file tree
Showing 21 changed files with 117 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,24 @@
* In particular, the following methods are guaranteed to be called strictly in order:
*
* <ol>
* <li>{@link #subtaskReady(int, SubtaskGateway)}: Called once you can send events to the subtask.
* The provided gateway is bound to that specific task. This is the start of interaction with
* the operator subtasks.
* <li>{@link #subtaskFailed(int, Throwable)}: Called for each subtask as soon as the subtask
* execution failed or was cancelled. At this point, interaction with the subtask should stop.
* <li>{@link #executionAttemptReady(int, int, SubtaskGateway)}: Called once you can send events
* to the subtask execution attempt. The provided gateway is bound to that specific execution
* attempt. This is the start of interaction with the operator subtask attempt.
* <li>{@link #executionAttemptFailed(int, int, Throwable)}: Called for each subtask execution
* attempt as soon as the attempt failed or was cancelled. At this point, interaction with the
* subtask attempt should stop.
* <li>{@link #subtaskReset(int, long)} or {@link #resetToCheckpoint(long, byte[])}: Once the
* scheduler determined which checkpoint to restore, these methods notify the coordinator of
* that. The former method is called in case of a regional failure/recovery (affecting
* possible a subset of subtasks), the later method in case of a global failure/recovery. This
* method should be used to determine which actions to recover, because it tells you which
* checkpoint to fall back to. The coordinator implementation needs to recover the
* interactions with the relevant tasks since the checkpoint that is restored.
* <li>{@link #subtaskReady(int, SubtaskGateway)}: Called again, once the recovered tasks are
* ready to go. This is later than {@link #subtaskReset(int, long)}, because between those
* methods, the task are scheduled and deployed.
* interactions with the relevant tasks since the checkpoint that is restored. It will be
* called only after {@link #executionAttemptFailed(int, int, Throwable)} has been called on
* all the attempts of the subtask.
* <li>{@link #executionAttemptReady(int, int, SubtaskGateway)}: Called again, once the recovered
* tasks (new attempts) are ready to go. This is later than {@link #subtaskReset(int, long)},
* because between those methods, the new attempts are scheduled and deployed.
* </ol>
*/
@Internal
Expand Down Expand Up @@ -101,13 +104,14 @@ public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
// ------------------------------------------------------------------------

/**
* Hands an OperatorEvent coming from a parallel Operator instances (one of the parallel
* Hands an OperatorEvent coming from a parallel Operator instance (one attempt of the parallel
* subtasks).
*
* @throws Exception Any exception thrown by this method results in a full job failure and
* recovery.
*/
void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception;
void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception;

// ------------------------------------------------------------------------

Expand Down Expand Up @@ -195,35 +199,38 @@ default void notifyCheckpointAborted(long checkpointId) {}
// ------------------------------------------------------------------------

/**
* Called when one of the subtasks of the task running the coordinated operator goes through a
* failover (failure / recovery cycle).
*
* <p>This method is called every time there is a failover of a subtasks, regardless of whether
* there it is a partial failover or a global failover.
*/
void subtaskFailed(int subtask, @Nullable Throwable reason);

/**
* Called if a task is recovered as part of a <i>partial failover</i>, meaning a failover
* Called if a subtask is recovered as part of a <i>partial failover</i>, meaning a failover
* handled by the scheduler's failover strategy (by default recovering a pipelined region). The
* method is invoked for each subtask involved in that partial failover.
*
* <p>In contrast to this method, the {@link #resetToCheckpoint(long, byte[])} method is called
* in the case of a global failover, which is the case when the coordinator (JobManager) is
* recovered.
*
* <p>Note that this method will not be called if an execution attempt of a subtask failed, if
* the subtask is not entirely failed, i.e. if the subtask has other execution attempts that are
* not failed/canceled.
*/
void subtaskReset(int subtask, long checkpointId);

/**
* This is called when a subtask of the Operator becomes ready to receive events, both after
* initial startup and after task failover. The given {@code SubtaskGateway} can be used to send
* events to the executed subtask.
* Called when any subtask execution attempt of the task running the coordinated operator is
* failed/canceled.
*
* <p>This method is called every time an execution attempt is failed/canceled, regardless of
* whether there it is caused by a partial failover or a global failover.
*/
void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason);

/**
* This is called when a subtask execution attempt of the Operator becomes ready to receive
* events. The given {@code SubtaskGateway} can be used to send events to the execution attempt.
*
* <p>The given {@code SubtaskGateway} is bound to that specific execution attempt that became
* ready. All events sent through the gateway target that execution attempt; if the attempt is
* no longer running by the time the event is sent, then the events are failed.
*/
void subtaskReady(int subtask, SubtaskGateway gateway);
void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway);

// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ public void close() throws Exception {
context.unInitialize();
}

public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
mainThreadExecutor.assertRunningInMainThread();
coordinator.handleEventFromOperator(subtask, event);
coordinator.handleEventFromOperator(subtask, attemptNumber, event);
}

public void subtaskFailed(int subtask, @Nullable Throwable reason) {
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
mainThreadExecutor.assertRunningInMainThread();
coordinator.subtaskFailed(subtask, reason);
coordinator.executionAttemptFailed(subtask, attemptNumber, reason);
}

@Override
Expand Down Expand Up @@ -409,7 +410,8 @@ private void setupSubtaskGateway(int subtask) {

private void notifySubtaskReady(int subtask, OperatorCoordinator.SubtaskGateway gateway) {
try {
coordinator.subtaskReady(subtask, gateway);
coordinator.executionAttemptReady(
subtask, gateway.getExecution().getAttemptNumber(), gateway);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
globalFailureHandler.handleGlobalFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface OperatorEventGateway {

/**
* Sends the given event to the coordinator, where it will be handled by the {@link
* OperatorCoordinator#handleEventFromOperator(int, OperatorEvent)} method.
* OperatorCoordinator#handleEventFromOperator(int, int, OperatorEvent)} method.
*/
void sendEventToCoordinator(OperatorEvent event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* an {@link OperatorCoordinator} and a runtime operator (which registers this handler).
*
* <p>The counterpart to this handler is the {@link OperatorCoordinator#handleEventFromOperator(int,
* OperatorEvent)} method.
* int, OperatorEvent)} method.
*/
public interface OperatorEventHandler {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,18 @@ public void close() throws Exception {
}

@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
coordinator.applyCall(
"handleEventFromOperator", c -> c.handleEventFromOperator(subtask, event));
"handleEventFromOperator",
c -> c.handleEventFromOperator(subtask, attemptNumber, event));
}

@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
coordinator.applyCall("subtaskFailed", c -> c.subtaskFailed(subtask, reason));
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
coordinator.applyCall(
"executionAttemptFailed",
c -> c.executionAttemptFailed(subtask, attemptNumber, reason));
}

@Override
Expand All @@ -93,8 +97,10 @@ public void subtaskReset(int subtask, long checkpointId) {
}

@Override
public void subtaskReady(int subtask, SubtaskGateway gateway) {
coordinator.applyCall("subtaskReady", c -> c.subtaskReady(subtask, gateway));
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
coordinator.applyCall(
"executionAttemptReady",
c -> c.executionAttemptReady(subtask, attemptNumber, gateway));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public void deliverOperatorEventToCoordinator(
}

try {
coordinator.handleEventFromOperator(exec.getParallelSubtaskIndex(), evt);
coordinator.handleEventFromOperator(
exec.getParallelSubtaskIndex(), exec.getAttemptNumber(), evt);
} catch (Throwable t) {
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
globalFailureHandler.handleGlobalFailure(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,11 @@ private void notifyCoordinatorsAboutTaskFailure(
final Execution execution, @Nullable final Throwable error) {
final ExecutionJobVertex jobVertex = execution.getVertex().getJobVertex();
final int subtaskIndex = execution.getParallelSubtaskIndex();
final int attemptNumber = execution.getAttemptNumber();

jobVertex.getOperatorCoordinators().forEach(c -> c.subtaskFailed(subtaskIndex, error));
jobVertex
.getOperatorCoordinators()
.forEach(c -> c.executionAttemptFailed(subtaskIndex, attemptNumber, error));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void close() throws Exception {
}

@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) {
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
runInEventLoop(
() -> {
if (event instanceof RequestSplitEvent) {
Expand Down Expand Up @@ -261,7 +261,8 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) {
}

@Override
public void subtaskFailed(int subtaskId, @Nullable Throwable reason) {
public void executionAttemptFailed(
int subtaskId, int attemptNumber, @Nullable Throwable reason) {
runInEventLoop(
() -> {
LOG.info(
Expand Down Expand Up @@ -299,7 +300,7 @@ public void subtaskReset(int subtaskId, long checkpointId) {
}

@Override
public void subtaskReady(int subtask, SubtaskGateway gateway) {
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
assert subtask == gateway.getSubtask();

runInEventLoop(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ private IntegerResponse(int value) {
* The coordinator that sends events and completes checkpoints.
*
* <p>All consistency guaranteed for the coordinator apply to order or method invocations (like
* {@link #subtaskFailed(int, Throwable)}, {@link #subtaskReset(int, long)} or {@link
* #checkpointCoordinator(long, CompletableFuture)}) and the order in which actions are done
* (sending events and completing checkpoints). Tho consistently evaluate this, but with
* {@link #executionAttemptFailed(int, int, Throwable)}}, {@link #subtaskReset(int, long)} or
* {@link #checkpointCoordinator(long, CompletableFuture)}) and the order in which actions are
* done (sending events and completing checkpoints). Tho consistently evaluate this, but with
* concurrency against the scheduler thread that calls this coordinator implements a simple
* mailbox that moves the method handling into a separate thread, but keeps the order.
*/
Expand Down Expand Up @@ -357,7 +357,8 @@ public void close() throws Exception {
}

@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception {
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
if (subtask != 0 || !(event instanceof StartEvent)) {
throw new Exception(
String.format("Don't recognize event '%s' from task %d.", event, subtask));
Expand All @@ -383,7 +384,8 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc
}

@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
public void executionAttemptFailed(
int subtask, int attemptNumber, @Nullable Throwable reason) {
// we need to create and register this outside the mailbox so that the
// registration is not affected by the artificial stall on the mailbox, but happens
// strictly before the tasks are restored and the operator events are received (to
Expand Down Expand Up @@ -420,7 +422,7 @@ public void subtaskFailed(int subtask, @Nullable Throwable reason) {
public void subtaskReset(int subtask, long checkpointId) {}

@Override
public void subtaskReady(int subtask, SubtaskGateway gateway) {
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
runInMailbox(
() -> {
checkState(!workLoopRunning);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public void close() {
}

@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) {
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
throw new UnsupportedOperationException();
}

@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
throw new UnsupportedOperationException();
}

Expand All @@ -54,7 +54,7 @@ public void subtaskReset(int subtask, long checkpointId) {
}

@Override
public void subtaskReady(int subtask, SubtaskGateway gateway) {
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,27 +264,28 @@ public void testFailingJobMultipleTimesNotCauseCascadingJobFailure() throws Exce
context ->
new TestingOperatorCoordinator(context) {
@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) {
public void handleEventFromOperator(
int subtask, int attemptNumber, OperatorEvent event) {
context.failJob(new RuntimeException("Artificial Exception"));
}
};
final EventReceivingTasks tasks = EventReceivingTasks.createForRunningTasks();
final OperatorCoordinatorHolder holder =
createCoordinatorHolder(tasks, coordinatorProvider);

holder.handleEventFromOperator(0, new TestOperatorEvent());
holder.handleEventFromOperator(0, 0, new TestOperatorEvent());
assertNotNull(globalFailure);
final Throwable firstGlobalFailure = globalFailure;

holder.handleEventFromOperator(1, new TestOperatorEvent());
holder.handleEventFromOperator(1, 0, new TestOperatorEvent());
assertEquals(
"The global failure should be the same instance because the context"
+ "should only take the first request from the coordinator to fail the job.",
firstGlobalFailure,
globalFailure);

holder.resetToCheckpoint(0L, new byte[0]);
holder.handleEventFromOperator(1, new TestOperatorEvent());
holder.handleEventFromOperator(1, 1, new TestOperatorEvent());
assertNotEquals(
"The new failures should be propagated after the coordinator " + "is reset.",
firstGlobalFailure,
Expand Down Expand Up @@ -676,16 +677,17 @@ public void close() throws Exception {
}

@Override
public void handleEventFromOperator(int subtask, OperatorEvent event) {}
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {}

@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {}
public void executionAttemptFailed(
int subtask, int attemptNumber, @Nullable Throwable reason) {}

@Override
public void subtaskReset(int subtask, long checkpointId) {}

@Override
public void subtaskReady(int subtask, SubtaskGateway gateway) {
public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) {
subtaskGateways[subtask] = gateway;

for (SubtaskGateway subtaskGateway : subtaskGateways) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public void testMethodCallsOnLongResetToCheckpoint() throws Exception {
// The following method calls should be applied to the new internal
// coordinator asynchronously because the current coordinator has not
// been successfully closed yet.
coordinator.handleEventFromOperator(1, testingEvent);
coordinator.subtaskFailed(1, new Exception("Subtask Failure Exception."));
coordinator.handleEventFromOperator(1, 0, testingEvent);
coordinator.executionAttemptFailed(1, 0, new Exception("Subtask Failure Exception."));
coordinator.notifyCheckpointComplete(completedCheckpointId);

// The new coordinator should not have been created because the resetToCheckpoint()
Expand Down Expand Up @@ -198,8 +198,8 @@ public void testConsecutiveResetToCheckpoint() throws Exception {
// Loop to get some interleaved method invocations on multiple instances
// of active coordinators.
for (int i = 0; i < numResets; i++) {
coordinator.handleEventFromOperator(1, new TestingEvent(i));
coordinator.subtaskFailed(i, new Exception());
coordinator.handleEventFromOperator(1, 0, new TestingEvent(i));
coordinator.executionAttemptFailed(i, 0, new Exception());
CompletableFuture<byte[]> future = CompletableFuture.completedFuture(new byte[i]);
coordinator.checkpointCoordinator(i, future);
final int loop = i;
Expand Down
Loading

0 comments on commit bedcc3f

Please sign in to comment.