Skip to content

Commit

Permalink
[FLINK-28134][runtime] Introduce SpeculativeExecutionJobVertex and Sp…
Browse files Browse the repository at this point in the history
…eculativeExecutionVertex

This closes apache#20082.
  • Loading branch information
zhuzhurk committed Jul 1, 2022
1 parent 335c404 commit b96d476
Show file tree
Hide file tree
Showing 14 changed files with 811 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.executiongraph;

/**
* {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link
* SpeculativeExecutionVertex}.
*/
public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex {

private static final long serialVersionUID = 1L;

public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) {
super(
vertex.getParallelSubtaskIndex(),
vertex.getTaskNameWithSubtaskIndex(),
vertex.getCurrentExecutionAttempt().archive(),
getCopyOfExecutionHistory(vertex));
}

private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) {
final ExecutionHistory executionHistory =
ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex);

// add all the executions to the execution history except for the only admitted current
// execution
final Execution currentAttempt = vertex.getCurrentExecutionAttempt();
for (Execution execution : vertex.getCurrentExecutions()) {
if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) {
executionHistory.add(execution.archive());
}
}

return executionHistory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG

private final boolean isDynamic;

private final ExecutionJobVertex.Factory executionJobVertexFactory;

// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
Expand All @@ -307,7 +309,8 @@ public DefaultExecutionGraph(
long initializationTimestamp,
VertexAttemptNumberStore initialAttemptCounts,
VertexParallelismStore vertexParallelismStore,
boolean isDynamic)
boolean isDynamic,
ExecutionJobVertex.Factory executionJobVertexFactory)
throws IOException {

this.executionGraphId = new ExecutionGraphID();
Expand Down Expand Up @@ -375,6 +378,8 @@ public DefaultExecutionGraph(

this.isDynamic = isDynamic;

this.executionJobVertexFactory = checkNotNull(executionJobVertexFactory);

LOG.info(
"Created execution graph {} for job {}.",
executionGraphId,
Expand Down Expand Up @@ -840,7 +845,9 @@ private void attachJobVertices(List<JobVertex> topologicallySorted) throws JobEx
parallelismStore.getParallelismInfo(jobVertex.getID());

// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, parallelismInfo);
ExecutionJobVertex ejv =
executionJobVertexFactory.createExecutionJobVertex(
this, jobVertex, parallelismInfo);

ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public static DefaultExecutionGraph buildGraph(
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
Supplier<CheckpointStatsTracker> checkpointStatsTrackerFactory,
boolean isDynamicGraph)
boolean isDynamicGraph,
ExecutionJobVertex.Factory executionJobVertexFactory)
throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");
Expand Down Expand Up @@ -136,7 +137,8 @@ public static DefaultExecutionGraph buildGraph(
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore,
isDynamicGraph);
isDynamicGraph,
executionJobVertexFactory);
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ protected void initialize(
// create all task vertices
for (int i = 0; i < this.parallelismInfo.getParallelism(); i++) {
ExecutionVertex vertex =
new ExecutionVertex(
createExecutionVertex(
this,
i,
producedDataSets,
Expand Down Expand Up @@ -260,6 +260,24 @@ protected void initialize(
}
}

protected ExecutionVertex createExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
long createTimestamp,
int executionHistorySizeLimit,
int initialAttemptCount) {
return new ExecutionVertex(
jobVertex,
subTaskIndex,
producedDataSets,
timeout,
createTimestamp,
executionHistorySizeLimit,
initialAttemptCount);
}

public boolean isInitialized() {
return taskVertices != null;
}
Expand Down Expand Up @@ -596,4 +614,15 @@ public static ExecutionState getAggregateJobVertexState(
return ExecutionState.CREATED;
}
}

/** Factory to create {@link ExecutionJobVertex}. */
public static class Factory {
ExecutionJobVertex createExecutionJobVertex(
InternalExecutionGraphAccessor graph,
JobVertex jobVertex,
VertexParallelismInformation parallelismInfo)
throws JobException {
return new ExecutionJobVertex(graph, jobVertex, parallelismInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
import static org.apache.flink.util.Preconditions.checkState;

/**
* The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several
Expand All @@ -61,25 +62,27 @@ public class ExecutionVertex

// --------------------------------------------------------------------------------------------

private final ExecutionJobVertex jobVertex;
final ExecutionJobVertex jobVertex;

private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;

private final int subTaskIndex;

private final ExecutionVertexID executionVertexId;

private final ExecutionHistory executionHistory;
final ExecutionHistory executionHistory;

private final Time timeout;

/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
private final String taskNameWithSubtask;

/** The current or latest execution attempt of this vertex's task. */
private Execution currentExecution; // this field must never be null
Execution currentExecution; // this field must never be null

private final ArrayList<InputSplit> inputSplits;
final ArrayList<InputSplit> inputSplits;

private int nextAttemptNumber;

/** This field holds the allocation id of the last successful assignment. */
@Nullable private TaskManagerLocation lastAssignedLocation;
Expand Down Expand Up @@ -134,24 +137,33 @@ public ExecutionVertex(

this.executionHistory = new ExecutionHistory(executionHistorySizeLimit);

this.currentExecution =
new Execution(
getExecutionGraphAccessor().getFutureExecutor(),
this,
initialAttemptCount,
createTimestamp,
timeout);

getExecutionGraphAccessor().registerExecution(currentExecution);
this.nextAttemptNumber = initialAttemptCount;

this.timeout = timeout;
this.inputSplits = new ArrayList<>();

this.currentExecution = createNewExecution(createTimestamp);

getExecutionGraphAccessor().registerExecution(currentExecution);
}

// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------

Execution createNewExecution(final long timestamp) {
return new Execution(
getExecutionGraphAccessor().getFutureExecutor(),
this,
nextAttemptNumber++,
timestamp,
timeout);
}

public Execution getPartitionProducer() {
return currentExecution;
}

public JobID getJobId() {
return this.jobVertex.getJobId();
}
Expand Down Expand Up @@ -248,30 +260,30 @@ public Collection<Execution> getCurrentExecutions() {

@Override
public ExecutionState getExecutionState() {
return currentExecution.getState();
return getCurrentExecutionAttempt().getState();
}

@Override
public long getStateTimestamp(ExecutionState state) {
return currentExecution.getStateTimestamp(state);
return getCurrentExecutionAttempt().getStateTimestamp(state);
}

@Override
public Optional<ErrorInfo> getFailureInfo() {
return currentExecution.getFailureInfo();
return getCurrentExecutionAttempt().getFailureInfo();
}

public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
return currentExecution.getTaskManagerLocationFuture();
return getCurrentExecutionAttempt().getTaskManagerLocationFuture();
}

public LogicalSlot getCurrentAssignedResource() {
return currentExecution.getAssignedResource();
return getCurrentExecutionAttempt().getAssignedResource();
}

@Override
public TaskManagerLocation getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
return getCurrentExecutionAttempt().getAssignedResourceLocation();
}

@Override
Expand Down Expand Up @@ -341,57 +353,56 @@ public void resetForNewExecution() {
}

private void resetForNewExecutionInternal(final long timestamp) {
final Execution oldExecution = currentExecution;
final ExecutionState oldState = oldExecution.getState();

if (oldState.isTerminal()) {
if (oldState == FINISHED) {
// pipelined partitions are released in Execution#cancel(), covering both job
// failures and vertex resets
// do not release pipelined partitions here to save RPC calls
oldExecution.handlePartitionCleanup(false, true);
getExecutionGraphAccessor()
.getPartitionGroupReleaseStrategy()
.vertexUnfinished(executionVertexId);
}
final boolean isFinished = (getExecutionState() == FINISHED);

executionHistory.add(oldExecution.archive());
resetExecutionsInternal();

final Execution newExecution =
new Execution(
getExecutionGraphAccessor().getFutureExecutor(),
this,
oldExecution.getAttemptNumber() + 1,
timestamp,
timeout);
InputSplitAssigner assigner = jobVertex.getSplitAssigner();
if (assigner != null) {
assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
inputSplits.clear();
}

currentExecution = newExecution;
// if the execution was 'FINISHED' before, tell the ExecutionGraph that
// we take one step back on the road to reaching global FINISHED
if (isFinished) {
getJobVertex().executionVertexUnFinished();
}

synchronized (inputSplits) {
InputSplitAssigner assigner = jobVertex.getSplitAssigner();
if (assigner != null) {
assigner.returnInputSplit(inputSplits, getParallelSubtaskIndex());
inputSplits.clear();
}
}
// reset the intermediate results
for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
resultPartition.resetForNewExecution();
}

// register this execution at the execution graph, to receive call backs
getExecutionGraphAccessor().registerExecution(newExecution);
final Execution newExecution = createNewExecution(timestamp);
currentExecution = newExecution;

// if the execution was 'FINISHED' before, tell the ExecutionGraph that
// we take one step back on the road to reaching global FINISHED
if (oldState == FINISHED) {
getJobVertex().executionVertexUnFinished();
}
// register this execution to the execution graph, to receive call backs
getExecutionGraphAccessor().registerExecution(newExecution);
}

// reset the intermediate results
for (IntermediateResultPartition resultPartition : resultPartitions.values()) {
resultPartition.resetForNewExecution();
}
} else {
throw new IllegalStateException(
"Cannot reset a vertex that is in non-terminal state " + oldState);
void resetExecutionsInternal() {
resetExecution(currentExecution);
}

void resetExecution(final Execution execution) {
final ExecutionState oldState = execution.getState();

checkState(
oldState.isTerminal(),
"Cannot reset an execution that is in non-terminal state " + oldState);

if (oldState == FINISHED) {
// pipelined partitions are released in Execution#cancel(), covering both job
// failures and vertex resets
// do not release pipelined partitions here to save RPC calls
execution.handlePartitionCleanup(false, true);
getExecutionGraphAccessor()
.getPartitionGroupReleaseStrategy()
.vertexUnfinished(executionVertexId);
}

executionHistory.add(execution.archive());
}

public void tryAssignResource(LogicalSlot slot) {
Expand Down
Loading

0 comments on commit b96d476

Please sign in to comment.