Skip to content

Commit

Permalink
[FLINK-17012][runtime] 'RUNNING' state split into 'RUNNING' and 'RECO…
Browse files Browse the repository at this point in the history
…VERING' in order to distinguish when the task is really running

This closes apache#15221
  • Loading branch information
akalash authored and dawidwys committed Mar 31, 2021
1 parent 5d70317 commit c0c156b
Show file tree
Hide file tree
Showing 25 changed files with 285 additions and 51 deletions.
26 changes: 21 additions & 5 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"tasks" : {
"type" : "object",
Expand Down Expand Up @@ -3536,7 +3536,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -4121,7 +4121,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -4249,7 +4249,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -4658,7 +4658,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"status-counts" : {
"type" : "object",
Expand Down Expand Up @@ -5281,6 +5281,22 @@
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:TaskManagerDetailsInfo",
"properties" : {
"allocatedSlots" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:SlotInfo",
"properties" : {
"jobId" : {
"type" : "any"
},
"resource" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
}
}
}
},
"dataPort" : {
"type" : "integer"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,10 +948,10 @@ protected CompletableFuture<MultipleJobsDetails> handleRequest(
throws RestHandlerException {
JobDetails running =
new JobDetails(
new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[9], 0);
new JobID(), "job1", 0, 0, 0, JobStatus.RUNNING, 0, new int[10], 0);
JobDetails finished =
new JobDetails(
new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[9], 0);
new JobID(), "job2", 0, 0, 0, JobStatus.FINISHED, 0, new int[10], 0);
return CompletableFuture.completedFuture(
new MultipleJobsDetails(Arrays.asList(running, finished)));
}
Expand Down
12 changes: 6 additions & 6 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"start-time" : {
"type" : "integer"
Expand Down Expand Up @@ -1967,7 +1967,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"attempt" : {
"type" : "integer"
Expand Down Expand Up @@ -2279,7 +2279,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"attempt" : {
"type" : "integer"
Expand Down Expand Up @@ -2365,7 +2365,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"attempt" : {
"type" : "integer"
Expand Down Expand Up @@ -2600,7 +2600,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
},
"start-time" : {
"type" : "integer"
Expand Down Expand Up @@ -3274,4 +3274,4 @@
}
}
} ]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(

private static boolean isProducerAvailable(ExecutionState producerState) {
return producerState == ExecutionState.RUNNING
|| producerState == ExecutionState.RECOVERING
|| producerState == ExecutionState.FINISHED
|| producerState == ExecutionState.SCHEDULED
|| producerState == ExecutionState.DEPLOYING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,17 @@
* the state {@code CREATED} and switch states according to this diagram:
*
* <pre>{@code
* CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
* | | | |
* | | | +------+
* | | V V
* CREATED -> SCHEDULED -> DEPLOYING -> RECOVERING -> RUNNING -> FINISHED
* | | | | |
* | | | +-----+--------------+
* | | V V
* | | CANCELLING -----+----> CANCELED
* | | |
* | +-------------------------+
* |
* | ... -> FAILED
* V
* RECONCILING -> RUNNING | FINISHED | CANCELED | FAILED
* RECONCILING -> RECOVERING | RUNNING | FINISHED | CANCELED | FAILED
*
* }</pre>
*
Expand Down Expand Up @@ -68,7 +68,10 @@ public enum ExecutionState {

FAILED,

RECONCILING;
RECONCILING,

/** Restoring last possible valid state of the task if it has it. */
RECOVERING;

public boolean isTerminal() {
return this == FINISHED || this == CANCELED || this == FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,9 @@ private boolean updateStateInternal(
Map<String, Accumulator<?, ?>> accumulators;

switch (state.getExecutionState()) {
case RECOVERING:
return attempt.switchToRecovering();

case RUNNING:
return attempt.switchToRunning();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static org.apache.flink.runtime.execution.ExecutionState.DEPLOYING;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RECOVERING;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -622,7 +623,7 @@ public void cancel() {
}

// these two are the common cases where we need to send a cancel call
else if (current == RUNNING || current == DEPLOYING) {
else if (current == RECOVERING || current == RUNNING || current == DEPLOYING) {
// try to transition to canceling, if successful, send the cancel call
if (startCancelling(NUM_CANCEL_CALL_TRIES)) {
return;
Expand Down Expand Up @@ -662,6 +663,7 @@ else if (current == RUNNING || current == DEPLOYING) {
public CompletableFuture<?> suspend() {
switch (state) {
case RUNNING:
case RECOVERING:
case DEPLOYING:
case CREATED:
case SCHEDULED:
Expand Down Expand Up @@ -715,11 +717,13 @@ private void updatePartitionConsumers(final IntermediateResultPartition partitio
final ExecutionState consumerState = consumer.getState();

// ----------------------------------------------------------------
// Consumer is running => send update message now
// Consumer is recovering or running => send update message now
// Consumer is deploying => cache the partition info which would be
// sent after switching to running
// ----------------------------------------------------------------
if (consumerState == DEPLOYING || consumerState == RUNNING) {
if (consumerState == DEPLOYING
|| consumerState == RUNNING
|| consumerState == RECOVERING) {
final PartitionInfo partitionInfo = createPartitionInfo(partition);

if (consumerState == DEPLOYING) {
Expand Down Expand Up @@ -903,7 +907,7 @@ void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics met
while (true) {
ExecutionState current = this.state;

if (current == RUNNING || current == DEPLOYING) {
if (current == RECOVERING || current == RUNNING || current == DEPLOYING) {

if (transitionState(current, FINISHED)) {
try {
Expand Down Expand Up @@ -993,7 +997,10 @@ void completeCancelling(

if (current == CANCELED) {
return;
} else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
} else if (current == CANCELING
|| current == RUNNING
|| current == RECOVERING
|| current == DEPLOYING) {

updateAccumulatorsAndMetrics(userAccumulators, metrics);

Expand Down Expand Up @@ -1129,7 +1136,10 @@ private void maybeReleasePartitionsAndSendCancelRpcCall(

handlePartitionCleanup(releasePartitions, releasePartitions);

if (cancelTask && (stateBeforeFailed == RUNNING || stateBeforeFailed == DEPLOYING)) {
if (cancelTask
&& (stateBeforeFailed == RUNNING
|| stateBeforeFailed == RECOVERING
|| stateBeforeFailed == DEPLOYING)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
}
Expand All @@ -1148,10 +1158,22 @@ private void maybeReleasePartitionsAndSendCancelRpcCall(
}
}

boolean switchToRecovering() {
if (switchTo(DEPLOYING, RECOVERING)) {
sendPartitionInfos();
return true;
}

return false;
}

boolean switchToRunning() {
return switchTo(RECOVERING, RUNNING);
}

if (transitionState(DEPLOYING, RUNNING)) {
sendPartitionInfos();
private boolean switchTo(ExecutionState from, ExecutionState to) {

if (transitionState(from, to)) {
return true;
} else {
// something happened while the call was in progress.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ public static ExecutionState getAggregateJobVertexState(
return ExecutionState.CANCELING;
} else if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
return ExecutionState.CANCELED;
} else if (verticesPerState[ExecutionState.RECOVERING.ordinal()] > 0) {
return ExecutionState.RECOVERING;
} else if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
return ExecutionState.RUNNING;
} else if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ public boolean isProducerReadyOrAbortConsumption(ResponseHandle responseHandle)
private static boolean isConsumerStateValidForConsumption(
ExecutionState consumerExecutionState) {
return consumerExecutionState == ExecutionState.RUNNING
|| consumerExecutionState == ExecutionState.RECOVERING
|| consumerExecutionState == ExecutionState.DEPLOYING;
}

private boolean isProducerConsumerReady(ResponseHandle responseHandle) {
ExecutionState producerState = getProducerState(responseHandle);
return producerState == ExecutionState.SCHEDULED
|| producerState == ExecutionState.DEPLOYING
|| producerState == ExecutionState.RECOVERING
|| producerState == ExecutionState.RUNNING
|| producerState == ExecutionState.FINISHED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,16 @@ public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorE
throw new UnsupportedOperationException(
"dispatchOperatorEvent not supported by " + getClass().getName());
}

/**
* This method can be called before {@link #invoke()} to restore an invokable object for the
* last valid state, if it has it.
*
* <p>Every implementation determinate what should be restored by itself. (nothing happens by
* default).
*
* @throws Exception Tasks may forward their exceptions for the TaskManager to handle through
* failure/recovery.
*/
public void restore() throws Exception {}
}
Loading

0 comments on commit c0c156b

Please sign in to comment.