Skip to content

Commit

Permalink
[FLINK-21451][coordination] Remove JobID from TaskExecutionState
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Feb 24, 2021
1 parent 41ad173 commit 7f1853d
Show file tree
Hide file tree
Showing 25 changed files with 63 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.executiongraph;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
Expand Down Expand Up @@ -64,10 +63,6 @@ public ExecutionState getExecutionState() {
return taskExecutionState.getExecutionState();
}

public JobID getJobID() {
return taskExecutionState.getJobID();
}

public AccumulatorSnapshot getAccumulators() {
return taskExecutionState.getAccumulators();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ public void onMissingDeploymentsOf(
for (ExecutionAttemptID executionAttemptId : executionAttemptIds) {
schedulerNG.updateTaskExecutionState(
new TaskExecutionState(
jobGraph.getJobID(),
executionAttemptId,
ExecutionState.FAILED,
new FlinkException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ private ExecutionGraph createAndRestoreExecutionGraph(
}

newExecutionGraph.setInternalTaskFailuresListener(
new UpdateSchedulerNgOnInternalFailuresListener(this, jobGraph.getJobID()));
new UpdateSchedulerNgOnInternalFailuresListener(this));
newExecutionGraph.registerJobStatusListener(jobStatusListener);
newExecutionGraph.start(mainThreadExecutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.flink.runtime.scheduler;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
Expand All @@ -35,13 +34,9 @@ public class UpdateSchedulerNgOnInternalFailuresListener implements InternalFail

private final SchedulerNG schedulerNg;

private final JobID jobId;

public UpdateSchedulerNgOnInternalFailuresListener(
final SchedulerNG schedulerNg, final JobID jobId) {
public UpdateSchedulerNgOnInternalFailuresListener(final SchedulerNG schedulerNg) {

this.schedulerNg = checkNotNull(schedulerNg);
this.jobId = checkNotNull(jobId);
}

@Override
Expand All @@ -52,7 +47,7 @@ public void notifyTaskFailure(
final boolean releasePartitions) {

final TaskExecutionState state =
new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t);
new TaskExecutionState(attemptId, ExecutionState.FAILED, t);
schedulerNg.updateTaskExecutionState(
new TaskExecutionStateTransition(state, cancelTask, releasePartitions));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ public ExecutionGraph createExecutionGraphWithAvailableResources() throws Except
executionGraph.transitionToRunning();

executionGraph.setInternalTaskFailuresListener(
new UpdateSchedulerNgOnInternalFailuresListener(this, jobInformation.getJobID()));
new UpdateSchedulerNgOnInternalFailuresListener(this));

for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
final LogicalSlot assignedSlot =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1749,7 +1749,6 @@ private void unregisterTaskAndNotifyFinalState(
updateTaskExecutionState(
jobMasterGateway,
new TaskExecutionState(
task.getJobID(),
task.getExecutionId(),
task.getExecutionState(),
task.getFailureCause(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ private void doRun() {

// notify everyone that we switched to running
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
new TaskExecutionState(executionId, ExecutionState.RUNNING));

// make sure the user code classloader is accessible thread-locally
executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader());
Expand Down Expand Up @@ -1001,7 +1001,7 @@ private UserCodeClassLoader createUserCodeClassloader() throws Exception {
private void notifyFinalState() {
checkState(executionState.isTerminal());
taskManagerActions.updateTaskExecutionState(
new TaskExecutionState(jobId, executionId, executionState, failureCause));
new TaskExecutionState(executionId, executionState, failureCause));
}

private void notifyFatalError(String message, Throwable cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.runtime.taskmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand All @@ -39,8 +38,6 @@ public class TaskExecutionState implements Serializable {

private static final long serialVersionUID = 1L;

private final JobID jobID;

private final ExecutionAttemptID executionId;

private final ExecutionState executionState;
Expand All @@ -55,53 +52,44 @@ public class TaskExecutionState implements Serializable {
/**
* Creates a new task execution state update, with no attached exception and no accumulators.
*
* @param jobID the ID of the job the task belongs to
* @param executionId the ID of the task execution whose state is to be reported
* @param executionState the execution state to be reported
*/
public TaskExecutionState(
JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) {
this(jobID, executionId, executionState, null, null, null);
public TaskExecutionState(ExecutionAttemptID executionId, ExecutionState executionState) {
this(executionId, executionState, null, null, null);
}

/**
* Creates a new task execution state update, with an attached exception but no accumulators.
*
* @param jobID the ID of the job the task belongs to
* @param executionId the ID of the task execution whose state is to be reported
* @param executionState the execution state to be reported
*/
public TaskExecutionState(
JobID jobID,
ExecutionAttemptID executionId,
ExecutionState executionState,
Throwable error) {
this(jobID, executionId, executionState, error, null, null);
ExecutionAttemptID executionId, ExecutionState executionState, Throwable error) {
this(executionId, executionState, error, null, null);
}

/**
* Creates a new task execution state update, with an attached exception. This constructor may
* never throw an exception.
*
* @param jobID the ID of the job the task belongs to
* @param executionId the ID of the task execution whose state is to be reported
* @param executionState the execution state to be reported
* @param error an optional error
* @param accumulators The flink and user-defined accumulators which may be null.
*/
public TaskExecutionState(
JobID jobID,
ExecutionAttemptID executionId,
ExecutionState executionState,
Throwable error,
AccumulatorSnapshot accumulators,
IOMetrics ioMetrics) {

if (jobID == null || executionId == null || executionState == null) {
if (executionId == null || executionState == null) {
throw new NullPointerException();
}

this.jobID = jobID;
this.executionId = executionId;
this.executionState = executionState;
if (error != null) {
Expand Down Expand Up @@ -148,15 +136,6 @@ public ExecutionState getExecutionState() {
return this.executionState;
}

/**
* The ID of the job the task belongs to
*
* @return the ID of the job the task belongs to
*/
public JobID getJobID() {
return this.jobID;
}

/** Gets flink and user-defined accumulators in serialized form. */
public AccumulatorSnapshot getAccumulators() {
return accumulators;
Expand All @@ -172,8 +151,7 @@ public IOMetrics getIOMetrics() {
public boolean equals(Object obj) {
if (obj instanceof TaskExecutionState) {
TaskExecutionState other = (TaskExecutionState) obj;
return other.jobID.equals(this.jobID)
&& other.executionId.equals(this.executionId)
return other.executionId.equals(this.executionId)
&& other.executionState == this.executionState
&& (other.throwable == null) == (this.throwable == null);
} else {
Expand All @@ -183,16 +161,13 @@ public boolean equals(Object obj) {

@Override
public int hashCode() {
return jobID.hashCode() + executionId.hashCode() + executionState.ordinal();
return executionId.hashCode() + executionState.ordinal();
}

@Override
public String toString() {
return String.format(
"TaskExecutionState jobId=%s, executionId=%s, state=%s, error=%s",
jobID,
executionId,
executionState,
throwable == null ? "(null)" : throwable.toString());
"TaskExecutionState executionId=%s, state=%s, error=%s",
executionId, executionState, throwable == null ? "(null)" : throwable.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ public void testShutdownCheckpointCoordinatorOnFinished() throws Exception {
final Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
scheduler.updateTaskExecutionState(
new TaskExecutionState(
graph.getJobID(),
currentExecutionAttempt.getAttemptId(),
ExecutionState.FINISHED));
currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED));
}

assertThat(graph.getTerminationFuture().get(), is(JobStatus.FINISHED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public static void setupExecutionGraph() throws Exception {
scheduler.startScheduling();
scheduler.updateTaskExecutionState(
new TaskExecutionState(
jobGraph.getJobID(),
runtimeGraph
.getAllExecutionVertices()
.iterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception {

TaskExecutionState state =
new TaskExecutionState(
graph.getJobID(),
execution1.getAttemptId(),
ExecutionState.CANCELED,
null,
Expand All @@ -364,7 +363,6 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception {

TaskExecutionState state2 =
new TaskExecutionState(
graph.getJobID(),
execution2.getAttemptId(),
ExecutionState.FAILED,
null,
Expand Down Expand Up @@ -491,9 +489,9 @@ public void testNoResourceAvailableFailure() throws Exception {
.getCurrentExecutionAttempt()
.getAttemptId();
scheduler.updateTaskExecutionState(
new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING));
new TaskExecutionState(attemptID, ExecutionState.RUNNING));
scheduler.updateTaskExecutionState(
new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null));
new TaskExecutionState(attemptID, ExecutionState.FINISHED, null));

assertEquals(JobStatus.FAILED, eg.getState());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E
getCurrentExecution(sourceVertex, executionGraph);
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
sourceExecution.getAttemptId(),
ExecutionState.FINISHED));
sourceExecution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, empty());
});

Expand All @@ -103,9 +101,7 @@ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E
getCurrentExecution(operatorVertex, executionGraph);
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
operatorExecution.getAttemptId(),
ExecutionState.FINISHED));
operatorExecution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, hasSize(1));
assertThat(
releasedPartitions.remove(),
Expand All @@ -127,9 +123,7 @@ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E
final Execution sinkExecution = getCurrentExecution(sinkVertex, executionGraph);
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
sinkExecution.getAttemptId(),
ExecutionState.FINISHED));
sinkExecution.getAttemptId(), ExecutionState.FINISHED));

assertThat(releasedPartitions, hasSize(1));
assertThat(
Expand Down Expand Up @@ -187,9 +181,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
// consumer o1 was not finished
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
sourceExecution.getAttemptId(),
ExecutionState.FINISHED));
sourceExecution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, empty());
});

Expand All @@ -207,9 +199,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
}
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
operator1Execution.getAttemptId(),
ExecutionState.FINISHED));
operator1Execution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, empty());
});

Expand All @@ -221,9 +211,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
// finished
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
operator2Execution.getAttemptId(),
ExecutionState.FINISHED));
operator2Execution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, empty());
});

Expand All @@ -243,9 +231,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception {
// finish o3; this should not result in any release calls since o2 was reset
scheduler.updateTaskExecutionState(
new TaskExecutionState(
executionGraph.getJobID(),
operator3Execution.getAttemptId(),
ExecutionState.FINISHED));
operator3Execution.getAttemptId(), ExecutionState.FINISHED));
assertThat(releasedPartitions, empty());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ public void testUpdatePartitionConsumers() throws Exception {
private void updateState(
SchedulerBase scheduler, ExecutionVertex vertex, ExecutionState state) {
scheduler.updateTaskExecutionState(
new TaskExecutionState(
jobGraph.getJobID(),
vertex.getCurrentExecutionAttempt().getAttemptId(),
state));
new TaskExecutionState(vertex.getCurrentExecutionAttempt().getAttemptId(), state));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ private void testPartitionReleaseOrPromotionOnJobTermination(
taskDeploymentDescriptorFuture.get();
jobMasterGateway.updateTaskExecutionState(
new TaskExecutionState(
taskDeploymentDescriptor.getJobId(),
taskDeploymentDescriptor.getExecutionAttemptId(),
finalExecutionState));
taskDeploymentDescriptor.getExecutionAttemptId(), finalExecutionState));

assertThat(
taskExecutorCallSelector.apply(testSetup).get(),
Expand Down
Loading

0 comments on commit 7f1853d

Please sign in to comment.