Skip to content

Commit

Permalink
[FLINK-22215][runtime] RECOVERING state is renamed to INITIALIZING
Browse files Browse the repository at this point in the history
  • Loading branch information
akalash authored and dawidwys committed Apr 13, 2021
1 parent f829418 commit b387928
Show file tree
Hide file tree
Showing 23 changed files with 72 additions and 72 deletions.
10 changes: 5 additions & 5 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1558,7 +1558,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"tasks" : {
"type" : "object",
Expand Down Expand Up @@ -3560,7 +3560,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -4145,7 +4145,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -4273,7 +4273,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"subtask" : {
"type" : "integer"
Expand Down Expand Up @@ -4682,7 +4682,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"status-counts" : {
"type" : "object",
Expand Down
10 changes: 5 additions & 5 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"start-time" : {
"type" : "integer"
Expand Down Expand Up @@ -1967,7 +1967,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"attempt" : {
"type" : "integer"
Expand Down Expand Up @@ -2279,7 +2279,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"attempt" : {
"type" : "integer"
Expand Down Expand Up @@ -2365,7 +2365,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"attempt" : {
"type" : "integer"
Expand Down Expand Up @@ -2600,7 +2600,7 @@
},
"status" : {
"type" : "string",
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "RECOVERING" ]
"enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ]
},
"start-time" : {
"type" : "integer"
Expand Down
3 changes: 1 addition & 2 deletions flink-runtime-web/web-dashboard/src/app/app.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ export const COLOR_MAP = {
SCHEDULED: '#722ed1',
COMPLETED: '#1890ff',
RESTARTING: '#13c2c2',
INITIALIZING: '#738df8',
RECOVERING: '#18c98b'
INITIALIZING: '#738df8'
};
export const LONG_MIN_VALUE = -9223372036854776000;
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ interface TasksStatus {
FAILED: number;
RECONCILING: number;
CANCELING: number;
RECOVERING: number;
INITIALIZING: number;
}

interface MetricsStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ export interface TaskStatusInterface {
RUNNING: number;
SCHEDULED: number;
TOTAL: number;
RECOVERING: number;
INITIALIZING: number;
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface VertexTaskManagerDetailInterface {
RECONCILING: number;
RUNNING: number;
SCHEDULED: number;
RECOVERING: number;
INITIALIZING: number;
};
'taskmanager-id': string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private static RuntimeException handleConsumedPartitionShuffleDescriptorErrors(

private static boolean isProducerAvailable(ExecutionState producerState) {
return producerState == ExecutionState.RUNNING
|| producerState == ExecutionState.RECOVERING
|| producerState == ExecutionState.INITIALIZING
|| producerState == ExecutionState.FINISHED
|| producerState == ExecutionState.SCHEDULED
|| producerState == ExecutionState.DEPLOYING;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* the state {@code CREATED} and switch states according to this diagram:
*
* <pre>{@code
* CREATED -> SCHEDULED -> DEPLOYING -> RECOVERING -> RUNNING -> FINISHED
* CREATED -> SCHEDULED -> DEPLOYING -> INITIALIZING -> RUNNING -> FINISHED
* | | | | |
* | | | +-----+--------------+
* | | V V
Expand All @@ -33,7 +33,7 @@
* |
* | ... -> FAILED
* V
* RECONCILING -> RECOVERING | RUNNING | FINISHED | CANCELED | FAILED
* RECONCILING -> INITIALIZING | RUNNING | FINISHED | CANCELED | FAILED
*
* }</pre>
*
Expand Down Expand Up @@ -71,7 +71,7 @@ public enum ExecutionState {
RECONCILING,

/** Restoring last possible valid state of the task if it has it. */
RECOVERING;
INITIALIZING;

public boolean isTerminal() {
return this == FINISHED || this == CANCELED || this == FAILED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ void failGlobalIfExecutionIsStillRunning(Throwable cause, ExecutionAttemptID fai
final Execution failedExecution = currentExecutions.get(failingAttempt);
if (failedExecution != null
&& (failedExecution.getState() == ExecutionState.RUNNING
|| failedExecution.getState() == ExecutionState.RECOVERING)) {
|| failedExecution.getState() == ExecutionState.INITIALIZING)) {
failGlobal(cause);
} else {
LOG.debug(
Expand Down Expand Up @@ -1226,7 +1226,7 @@ private boolean updateStateInternal(
Map<String, Accumulator<?, ?>> accumulators;

switch (state.getExecutionState()) {
case RECOVERING:
case INITIALIZING:
return attempt.switchToRecovering();

case RUNNING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
import static org.apache.flink.runtime.execution.ExecutionState.DEPLOYING;
import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.runtime.execution.ExecutionState.RECOVERING;
import static org.apache.flink.runtime.execution.ExecutionState.INITIALIZING;
import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -623,7 +623,7 @@ public void cancel() {
}

// these two are the common cases where we need to send a cancel call
else if (current == RECOVERING || current == RUNNING || current == DEPLOYING) {
else if (current == INITIALIZING || current == RUNNING || current == DEPLOYING) {
// try to transition to canceling, if successful, send the cancel call
if (startCancelling(NUM_CANCEL_CALL_TRIES)) {
return;
Expand Down Expand Up @@ -663,7 +663,7 @@ else if (current == RECOVERING || current == RUNNING || current == DEPLOYING) {
public CompletableFuture<?> suspend() {
switch (state) {
case RUNNING:
case RECOVERING:
case INITIALIZING:
case DEPLOYING:
case CREATED:
case SCHEDULED:
Expand Down Expand Up @@ -723,7 +723,7 @@ private void updatePartitionConsumers(final IntermediateResultPartition partitio
// ----------------------------------------------------------------
if (consumerState == DEPLOYING
|| consumerState == RUNNING
|| consumerState == RECOVERING) {
|| consumerState == INITIALIZING) {
final PartitionInfo partitionInfo = createPartitionInfo(partition);

if (consumerState == DEPLOYING) {
Expand Down Expand Up @@ -856,7 +856,7 @@ public CompletableFuture<Acknowledge> sendOperatorEvent(
OperatorID operatorId, SerializedValue<OperatorEvent> event) {
final LogicalSlot slot = assignedResource;

if (slot != null && (getState() == RUNNING || getState() == RECOVERING)) {
if (slot != null && (getState() == RUNNING || getState() == INITIALIZING)) {
final TaskExecutorOperatorEventGateway eventGateway = slot.getTaskManagerGateway();
return eventGateway.sendOperatorEventToTask(getAttemptId(), operatorId, event);
} else {
Expand Down Expand Up @@ -907,7 +907,7 @@ void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics met
while (true) {
ExecutionState current = this.state;

if (current == RECOVERING || current == RUNNING || current == DEPLOYING) {
if (current == INITIALIZING || current == RUNNING || current == DEPLOYING) {

if (transitionState(current, FINISHED)) {
try {
Expand Down Expand Up @@ -999,7 +999,7 @@ void completeCancelling(
return;
} else if (current == CANCELING
|| current == RUNNING
|| current == RECOVERING
|| current == INITIALIZING
|| current == DEPLOYING) {

updateAccumulatorsAndMetrics(userAccumulators, metrics);
Expand Down Expand Up @@ -1138,7 +1138,7 @@ private void maybeReleasePartitionsAndSendCancelRpcCall(

if (cancelTask
&& (stateBeforeFailed == RUNNING
|| stateBeforeFailed == RECOVERING
|| stateBeforeFailed == INITIALIZING
|| stateBeforeFailed == DEPLOYING)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
Expand All @@ -1159,7 +1159,7 @@ private void maybeReleasePartitionsAndSendCancelRpcCall(
}

boolean switchToRecovering() {
if (switchTo(DEPLOYING, RECOVERING)) {
if (switchTo(DEPLOYING, INITIALIZING)) {
sendPartitionInfos();
return true;
}
Expand All @@ -1168,7 +1168,7 @@ boolean switchToRecovering() {
}

boolean switchToRunning() {
return switchTo(RECOVERING, RUNNING);
return switchTo(INITIALIZING, RUNNING);
}

private boolean switchTo(ExecutionState from, ExecutionState to) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,8 @@ public static ExecutionState getAggregateJobVertexState(
return ExecutionState.CANCELING;
} else if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
return ExecutionState.CANCELED;
} else if (verticesPerState[ExecutionState.RECOVERING.ordinal()] > 0) {
return ExecutionState.RECOVERING;
} else if (verticesPerState[ExecutionState.INITIALIZING.ordinal()] > 0) {
return ExecutionState.INITIALIZING;
} else if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
return ExecutionState.RUNNING;
} else if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public boolean isProducerReadyOrAbortConsumption(ResponseHandle responseHandle)
private static boolean isConsumerStateValidForConsumption(
ExecutionState consumerExecutionState) {
return consumerExecutionState == ExecutionState.RUNNING
|| consumerExecutionState == ExecutionState.RECOVERING
|| consumerExecutionState == ExecutionState.INITIALIZING
|| consumerExecutionState == ExecutionState.DEPLOYING;
}

private boolean isProducerConsumerReady(ResponseHandle responseHandle) {
ExecutionState producerState = getProducerState(responseHandle);
return producerState == ExecutionState.SCHEDULED
|| producerState == ExecutionState.DEPLOYING
|| producerState == ExecutionState.RECOVERING
|| producerState == ExecutionState.INITIALIZING
|| producerState == ExecutionState.RUNNING
|| producerState == ExecutionState.FINISHED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void deliverOperatorEventToCoordinator(
final Execution exec = executionGraph.getRegisteredExecutions().get(taskExecutionId);
if (exec == null
|| exec.getState() != ExecutionState.RUNNING
&& exec.getState() != ExecutionState.RECOVERING) {
&& exec.getState() != ExecutionState.INITIALIZING) {
// This situation is common when cancellation happens, or when the task failed while the
// event was just being dispatched asynchronously on the TM side.
// It should be fine in those expected situations to just ignore this event, but, to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ AbstractInvokable getInvokable() {
public boolean isBackPressured() {
if (invokable == null
|| consumableNotifyingPartitionWriters.length == 0
|| (executionState != ExecutionState.RECOVERING
|| (executionState != ExecutionState.INITIALIZING
&& executionState != ExecutionState.RUNNING)) {
return false;
}
Expand Down Expand Up @@ -738,14 +738,14 @@ private void doRun() {
// by the time we switched to running.
this.invokable = invokable;

// switch to the RECOVERING state, if that fails, we have been canceled/failed in the
// switch to the INITIALIZING state, if that fails, we have been canceled/failed in the
// meantime
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RECOVERING)) {
if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
throw new CancelTaskException();
}

taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(executionId, ExecutionState.RECOVERING));
new TaskExecutionState(executionId, ExecutionState.INITIALIZING));

// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
Expand All @@ -758,7 +758,7 @@ private void doRun() {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}

if (!transitionState(ExecutionState.RECOVERING, ExecutionState.RUNNING)) {
if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) {
throw new CancelTaskException();
}

Expand Down Expand Up @@ -833,15 +833,15 @@ private void doRun() {
}
}

// transition into our final state. we should be either in DEPLOYING, RECOVERING,
// transition into our final state. we should be either in DEPLOYING, INITIALIZING,
// RUNNING, CANCELING, or FAILED
// loop for multiple retries during concurrent state changes via calls to cancel()
// or to failExternally()
while (true) {
ExecutionState current = this.executionState;

if (current == ExecutionState.RUNNING
|| current == ExecutionState.RECOVERING
|| current == ExecutionState.INITIALIZING
|| current == ExecutionState.DEPLOYING) {
if (t instanceof CancelTaskException) {
if (transitionState(current, ExecutionState.CANCELED)) {
Expand Down Expand Up @@ -1145,7 +1145,8 @@ void cancelOrFailAndCancelInvokableInternal(ExecutionState targetState, Throwabl
this.failureCause = cause;
return;
}
} else if (current == ExecutionState.RECOVERING || current == ExecutionState.RUNNING) {
} else if (current == ExecutionState.INITIALIZING
|| current == ExecutionState.RUNNING) {
if (transitionState(current, targetState, cause)) {
// we are canceling / failing out of the running state
// we need to cancel the invokable
Expand Down Expand Up @@ -1405,7 +1406,7 @@ public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEv

if (invokable == null
|| (executionState != ExecutionState.RUNNING
&& executionState != ExecutionState.RECOVERING)) {
&& executionState != ExecutionState.INITIALIZING)) {
throw new TaskNotRunningException("Task is not yet running.");
}

Expand All @@ -1415,7 +1416,7 @@ public void deliverOperatorEvent(OperatorID operator, SerializedValue<OperatorEv
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);

if (getExecutionState() == ExecutionState.RUNNING
|| getExecutionState() == ExecutionState.RECOVERING) {
|| getExecutionState() == ExecutionState.INITIALIZING) {
FlinkException e = new FlinkException("Error while handling operator event", t);
failExternally(e);
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testMixedLocalRemoteUnknownDeployment() throws Exception {

// These states are allowed
if (state == ExecutionState.RUNNING
|| state == ExecutionState.RECOVERING
|| state == ExecutionState.INITIALIZING
|| state == ExecutionState.FINISHED
|| state == ExecutionState.SCHEDULED
|| state == ExecutionState.DEPLOYING) {
Expand Down
Loading

0 comments on commit b387928

Please sign in to comment.