From 7f1853d3258c38b24e4be36adcfe08a59f211f39 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Wed, 24 Feb 2021 12:05:49 +0100 Subject: [PATCH] [FLINK-21451][coordination] Remove JobID from TaskExecutionState --- .../TaskExecutionStateTransition.java | 5 -- .../flink/runtime/jobmaster/JobMaster.java | 1 - .../runtime/scheduler/SchedulerBase.java | 2 +- ...SchedulerNgOnInternalFailuresListener.java | 9 +--- .../scheduler/adaptive/AdaptiveScheduler.java | 2 +- .../runtime/taskexecutor/TaskExecutor.java | 1 - .../flink/runtime/taskmanager/Task.java | 4 +- .../taskmanager/TaskExecutionState.java | 43 ++++------------- ...ecutionGraphCheckpointCoordinatorTest.java | 4 +- .../ArchivedExecutionGraphTest.java | 1 - .../ExecutionGraphDeploymentTest.java | 6 +-- .../ExecutionGraphPartitionReleaseTest.java | 28 +++-------- .../UpdatePartitionConsumersTest.java | 5 +- .../JobMasterPartitionReleaseTest.java | 4 +- .../runtime/jobmaster/JobMasterTest.java | 15 ++---- .../DefaultSchedulerBatchSchedulingTest.java | 4 +- .../scheduler/DefaultSchedulerTest.java | 48 +++++-------------- .../scheduler/SchedulerTestingUtils.java | 19 ++------ .../adaptive/AdaptiveSchedulerTest.java | 4 +- .../scheduler/adaptive/CancelingTest.java | 1 - .../scheduler/adaptive/ExecutingTest.java | 16 ++----- .../scheduler/adaptive/FailingTest.java | 1 - .../taskmanager/TaskExecutionStateTest.java | 17 +++---- .../flink/runtime/taskmanager/TaskTest.java | 1 - .../runtime/tasks/StreamTaskTest.java | 3 +- 25 files changed, 63 insertions(+), 181 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateTransition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateTransition.java index dc59d1011c6c4..2faca71ed0ad3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateTransition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskExecutionStateTransition.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.executiongraph; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -64,10 +63,6 @@ public ExecutionState getExecutionState() { return taskExecutionState.getExecutionState(); } - public JobID getJobID() { - return taskExecutionState.getJobID(); - } - public AccumulatorSnapshot getAccumulators() { return taskExecutionState.getAccumulators(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 483b06ab207ba..694907556ee78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -236,7 +236,6 @@ public void onMissingDeploymentsOf( for (ExecutionAttemptID executionAttemptId : executionAttemptIds) { schedulerNG.updateTaskExecutionState( new TaskExecutionState( - jobGraph.getJobID(), executionAttemptId, ExecutionState.FAILED, new FlinkException( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 9b1e83de7aeb5..e86c444643d18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -305,7 +305,7 @@ private ExecutionGraph createAndRestoreExecutionGraph( } newExecutionGraph.setInternalTaskFailuresListener( - new UpdateSchedulerNgOnInternalFailuresListener(this, jobGraph.getJobID())); + new UpdateSchedulerNgOnInternalFailuresListener(this)); newExecutionGraph.registerJobStatusListener(jobStatusListener); newExecutionGraph.start(mainThreadExecutor); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java index b97620e32d437..a2db3a7ac7f8d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/UpdateSchedulerNgOnInternalFailuresListener.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.scheduler; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; @@ -35,13 +34,9 @@ public class UpdateSchedulerNgOnInternalFailuresListener implements InternalFail private final SchedulerNG schedulerNg; - private final JobID jobId; - - public UpdateSchedulerNgOnInternalFailuresListener( - final SchedulerNG schedulerNg, final JobID jobId) { + public UpdateSchedulerNgOnInternalFailuresListener(final SchedulerNG schedulerNg) { this.schedulerNg = checkNotNull(schedulerNg); - this.jobId = checkNotNull(jobId); } @Override @@ -52,7 +47,7 @@ public void notifyTaskFailure( final boolean releasePartitions) { final TaskExecutionState state = - new TaskExecutionState(jobId, attemptId, ExecutionState.FAILED, t); + new TaskExecutionState(attemptId, ExecutionState.FAILED, t); schedulerNg.updateTaskExecutionState( new TaskExecutionStateTransition(state, cancelTask, releasePartitions)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 7a332cfbe0482..6ad4316fc6d5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -609,7 +609,7 @@ public ExecutionGraph createExecutionGraphWithAvailableResources() throws Except executionGraph.transitionToRunning(); executionGraph.setInternalTaskFailuresListener( - new UpdateSchedulerNgOnInternalFailuresListener(this, jobInformation.getJobID())); + new UpdateSchedulerNgOnInternalFailuresListener(this)); for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { final LogicalSlot assignedSlot = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 0d9f80925b1f8..60ce79609b3bf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -1749,7 +1749,6 @@ private void unregisterTaskAndNotifyFinalState( updateTaskExecutionState( jobMasterGateway, new TaskExecutionState( - task.getJobID(), task.getExecutionId(), task.getExecutionState(), task.getFailureCause(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index a315683cb3b95..b455e473522a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -745,7 +745,7 @@ private void doRun() { // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState( - new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); + new TaskExecutionState(executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader.asClassLoader()); @@ -1001,7 +1001,7 @@ private UserCodeClassLoader createUserCodeClassloader() throws Exception { private void notifyFinalState() { checkState(executionState.isTerminal()); taskManagerActions.updateTaskExecutionState( - new TaskExecutionState(jobId, executionId, executionState, failureCause)); + new TaskExecutionState(executionId, executionState, failureCause)); } private void notifyFatalError(String message, Throwable cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index 6c9fa39ecf13d..a5d71ac80407f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -39,8 +38,6 @@ public class TaskExecutionState implements Serializable { private static final long serialVersionUID = 1L; - private final JobID jobID; - private final ExecutionAttemptID executionId; private final ExecutionState executionState; @@ -55,53 +52,44 @@ public class TaskExecutionState implements Serializable { /** * Creates a new task execution state update, with no attached exception and no accumulators. * - * @param jobID the ID of the job the task belongs to * @param executionId the ID of the task execution whose state is to be reported * @param executionState the execution state to be reported */ - public TaskExecutionState( - JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState) { - this(jobID, executionId, executionState, null, null, null); + public TaskExecutionState(ExecutionAttemptID executionId, ExecutionState executionState) { + this(executionId, executionState, null, null, null); } /** * Creates a new task execution state update, with an attached exception but no accumulators. * - * @param jobID the ID of the job the task belongs to * @param executionId the ID of the task execution whose state is to be reported * @param executionState the execution state to be reported */ public TaskExecutionState( - JobID jobID, - ExecutionAttemptID executionId, - ExecutionState executionState, - Throwable error) { - this(jobID, executionId, executionState, error, null, null); + ExecutionAttemptID executionId, ExecutionState executionState, Throwable error) { + this(executionId, executionState, error, null, null); } /** * Creates a new task execution state update, with an attached exception. This constructor may * never throw an exception. * - * @param jobID the ID of the job the task belongs to * @param executionId the ID of the task execution whose state is to be reported * @param executionState the execution state to be reported * @param error an optional error * @param accumulators The flink and user-defined accumulators which may be null. */ public TaskExecutionState( - JobID jobID, ExecutionAttemptID executionId, ExecutionState executionState, Throwable error, AccumulatorSnapshot accumulators, IOMetrics ioMetrics) { - if (jobID == null || executionId == null || executionState == null) { + if (executionId == null || executionState == null) { throw new NullPointerException(); } - this.jobID = jobID; this.executionId = executionId; this.executionState = executionState; if (error != null) { @@ -148,15 +136,6 @@ public ExecutionState getExecutionState() { return this.executionState; } - /** - * The ID of the job the task belongs to - * - * @return the ID of the job the task belongs to - */ - public JobID getJobID() { - return this.jobID; - } - /** Gets flink and user-defined accumulators in serialized form. */ public AccumulatorSnapshot getAccumulators() { return accumulators; @@ -172,8 +151,7 @@ public IOMetrics getIOMetrics() { public boolean equals(Object obj) { if (obj instanceof TaskExecutionState) { TaskExecutionState other = (TaskExecutionState) obj; - return other.jobID.equals(this.jobID) - && other.executionId.equals(this.executionId) + return other.executionId.equals(this.executionId) && other.executionState == this.executionState && (other.throwable == null) == (this.throwable == null); } else { @@ -183,16 +161,13 @@ public boolean equals(Object obj) { @Override public int hashCode() { - return jobID.hashCode() + executionId.hashCode() + executionState.ordinal(); + return executionId.hashCode() + executionState.ordinal(); } @Override public String toString() { return String.format( - "TaskExecutionState jobId=%s, executionId=%s, state=%s, error=%s", - jobID, - executionId, - executionState, - throwable == null ? "(null)" : throwable.toString()); + "TaskExecutionState executionId=%s, state=%s, error=%s", + executionId, executionState, throwable == null ? "(null)" : throwable.toString()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index e10c9e63683ef..180d593ed7d5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -117,9 +117,7 @@ public void testShutdownCheckpointCoordinatorOnFinished() throws Exception { final Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt(); scheduler.updateTaskExecutionState( new TaskExecutionState( - graph.getJobID(), - currentExecutionAttempt.getAttemptId(), - ExecutionState.FINISHED)); + currentExecutionAttempt.getAttemptId(), ExecutionState.FINISHED)); } assertThat(graph.getTerminationFuture().get(), is(JobStatus.FINISHED)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 52ab3e01b20ee..cbba06640762c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -118,7 +118,6 @@ public static void setupExecutionGraph() throws Exception { scheduler.startScheduling(); scheduler.updateTaskExecutionState( new TaskExecutionState( - jobGraph.getJobID(), runtimeGraph .getAllExecutionVertices() .iterator() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 702b489f5d81e..49f5144dec944 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -340,7 +340,6 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception { TaskExecutionState state = new TaskExecutionState( - graph.getJobID(), execution1.getAttemptId(), ExecutionState.CANCELED, null, @@ -364,7 +363,6 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception { TaskExecutionState state2 = new TaskExecutionState( - graph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, null, @@ -491,9 +489,9 @@ public void testNoResourceAvailableFailure() throws Exception { .getCurrentExecutionAttempt() .getAttemptId(); scheduler.updateTaskExecutionState( - new TaskExecutionState(jobId, attemptID, ExecutionState.RUNNING)); + new TaskExecutionState(attemptID, ExecutionState.RUNNING)); scheduler.updateTaskExecutionState( - new TaskExecutionState(jobId, attemptID, ExecutionState.FINISHED, null)); + new TaskExecutionState(attemptID, ExecutionState.FINISHED, null)); assertEquals(JobStatus.FAILED, eg.getState()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index ac4b64cab0eb1..0eb830c19feeb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -89,9 +89,7 @@ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E getCurrentExecution(sourceVertex, executionGraph); scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - sourceExecution.getAttemptId(), - ExecutionState.FINISHED)); + sourceExecution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, empty()); }); @@ -103,9 +101,7 @@ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E getCurrentExecution(operatorVertex, executionGraph); scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - operatorExecution.getAttemptId(), - ExecutionState.FINISHED)); + operatorExecution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, hasSize(1)); assertThat( releasedPartitions.remove(), @@ -127,9 +123,7 @@ public void testStrategyNotifiedOfFinishedVerticesAndResultsRespected() throws E final Execution sinkExecution = getCurrentExecution(sinkVertex, executionGraph); scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - sinkExecution.getAttemptId(), - ExecutionState.FINISHED)); + sinkExecution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, hasSize(1)); assertThat( @@ -187,9 +181,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception { // consumer o1 was not finished scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - sourceExecution.getAttemptId(), - ExecutionState.FINISHED)); + sourceExecution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, empty()); }); @@ -207,9 +199,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception { } scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - operator1Execution.getAttemptId(), - ExecutionState.FINISHED)); + operator1Execution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, empty()); }); @@ -221,9 +211,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception { // finished scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - operator2Execution.getAttemptId(), - ExecutionState.FINISHED)); + operator2Execution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, empty()); }); @@ -243,9 +231,7 @@ public void testStrategyNotifiedOfUnFinishedVertices() throws Exception { // finish o3; this should not result in any release calls since o2 was reset scheduler.updateTaskExecutionState( new TaskExecutionState( - executionGraph.getJobID(), - operator3Execution.getAttemptId(), - ExecutionState.FINISHED)); + operator3Execution.getAttemptId(), ExecutionState.FINISHED)); assertThat(releasedPartitions, empty()); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java index c96091fa355c7..0d955df17c8fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java @@ -189,9 +189,6 @@ public void testUpdatePartitionConsumers() throws Exception { private void updateState( SchedulerBase scheduler, ExecutionVertex vertex, ExecutionState state) { scheduler.updateTaskExecutionState( - new TaskExecutionState( - jobGraph.getJobID(), - vertex.getCurrentExecutionAttempt().getAttemptId(), - state)); + new TaskExecutionState(vertex.getCurrentExecutionAttempt().getAttemptId(), state)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java index 7f7c598dc5c9c..d97446c8d319f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java @@ -164,9 +164,7 @@ private void testPartitionReleaseOrPromotionOnJobTermination( taskDeploymentDescriptorFuture.get(); jobMasterGateway.updateTaskExecutionState( new TaskExecutionState( - taskDeploymentDescriptor.getJobId(), - taskDeploymentDescriptor.getExecutionAttemptId(), - finalExecutionState)); + taskDeploymentDescriptor.getExecutionAttemptId(), finalExecutionState)); assertThat( taskExecutorCallSelector.apply(testSetup).get(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index f0d2637f48473..a1e6255a01bc7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1144,10 +1144,7 @@ private void runRequestNextInputSplitTest( // fail the first execution to trigger a failover jobMasterGateway .updateTaskExecutionState( - new TaskExecutionState( - inputSplitJobGraph.getJobID(), - initialAttemptId, - ExecutionState.FAILED)) + new TaskExecutionState(initialAttemptId, ExecutionState.FAILED)) .get(); // wait until the job has been recovered @@ -1404,10 +1401,7 @@ public void testRequestPartitionState() throws Exception { // finish the producer task jobMasterGateway .updateTaskExecutionState( - new TaskExecutionState( - producerConsumerJobGraph.getJobID(), - executionAttemptId, - ExecutionState.FINISHED)) + new TaskExecutionState(executionAttemptId, ExecutionState.FINISHED)) .get(); // request the state of the result partition of the producer @@ -1822,10 +1816,7 @@ private void runJobFailureWhenTaskExecutorTerminatesTest( jobMasterGateway .updateTaskExecutionState( - new TaskExecutionState( - jobGraph.getJobID(), - executionAttemptId, - ExecutionState.RUNNING)) + new TaskExecutionState(executionAttemptId, ExecutionState.RUNNING)) .get(); jobReachedRunningState.accept(taskManagerUnresolvedLocation, jobMasterGateway); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java index 4069cef988259..e57be4390bdc1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java @@ -168,10 +168,10 @@ private void finishExecution( () -> { scheduler.updateTaskExecutionState( new TaskExecutionState( - jobId, executionAttemptId, ExecutionState.RUNNING)); + executionAttemptId, ExecutionState.RUNNING)); scheduler.updateTaskExecutionState( new TaskExecutionState( - jobId, executionAttemptId, ExecutionState.FINISHED)); + executionAttemptId, ExecutionState.FINISHED)); }, mainThreadExecutor) .join(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 1e33ae09243f0..64c96d69158a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -564,7 +564,7 @@ public void handleGlobalFailure() { final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); scheduler.updateTaskExecutionState( - new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.CANCELED)); + new TaskExecutionState(attemptId, ExecutionState.CANCELED)); taskRestartExecutor.triggerScheduledTasks(); @@ -628,7 +628,7 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception { final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); scheduler.updateTaskExecutionState( - new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING)); + new TaskExecutionState(attemptId, ExecutionState.RUNNING)); final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); @@ -660,7 +660,7 @@ public void restoreStateWhenRestartingTasks() throws Exception { final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); scheduler.updateTaskExecutionState( - new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING)); + new TaskExecutionState(attemptId, ExecutionState.RUNNING)); final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); @@ -700,7 +700,7 @@ public void failGlobalWhenRestoringStateFails() throws Exception { final ExecutionAttemptID attemptId = onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId(); scheduler.updateTaskExecutionState( - new TaskExecutionState(jobGraph.getJobID(), attemptId, ExecutionState.RUNNING)); + new TaskExecutionState(attemptId, ExecutionState.RUNNING)); final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler); @@ -799,17 +799,11 @@ public void jobStatusIsRestartingIfOneVertexIsWaitingForRestart() { scheduler.updateTaskExecutionState( new TaskExecutionState( - jobId, - attemptId1, - ExecutionState.FAILED, - new RuntimeException("expected"))); + attemptId1, ExecutionState.FAILED, new RuntimeException("expected"))); final JobStatus jobStatusAfterFirstFailure = scheduler.requestJobStatus(); scheduler.updateTaskExecutionState( new TaskExecutionState( - jobId, - attemptId2, - ExecutionState.FAILED, - new RuntimeException("expected"))); + attemptId2, ExecutionState.FAILED, new RuntimeException("expected"))); taskRestartExecutor.triggerNonPeriodicScheduledTask(); final JobStatus jobStatusWithPendingRestarts = scheduler.requestJobStatus(); @@ -843,20 +837,14 @@ public void cancelWhileRestartingShouldWaitForRunningTasks() { scheduler.updateTaskExecutionState( new TaskExecutionState( - jobid, - attemptId1, - ExecutionState.FAILED, - new RuntimeException("expected"))); + attemptId1, ExecutionState.FAILED, new RuntimeException("expected"))); scheduler.cancel(); final ExecutionState vertex2StateAfterCancel = topology.getVertex(executionVertex2).getState(); final JobStatus statusAfterCancelWhileRestarting = scheduler.requestJobStatus(); scheduler.updateTaskExecutionState( new TaskExecutionState( - jobid, - attemptId2, - ExecutionState.CANCELED, - new RuntimeException("expected"))); + attemptId2, ExecutionState.CANCELED, new RuntimeException("expected"))); assertThat(vertex2StateAfterCancel, is(equalTo(ExecutionState.CANCELING))); assertThat(statusAfterCancelWhileRestarting, is(equalTo(JobStatus.CANCELLING))); @@ -881,10 +869,7 @@ public void failureInfoIsSetAfterTaskFailure() { final String exceptionMessage = "expected exception"; scheduler.updateTaskExecutionState( new TaskExecutionState( - jobId, - attemptId, - ExecutionState.FAILED, - new RuntimeException(exceptionMessage))); + attemptId, ExecutionState.FAILED, new RuntimeException(exceptionMessage))); final ErrorInfo failureInfo = scheduler.requestJob().getArchivedExecutionGraph().getFailureInfo(); @@ -919,7 +904,6 @@ public void allocationIsCanceledWhenVertexIsFailedOrCanceled() throws Exception final String exceptionMessage = "expected exception"; scheduler.updateTaskExecutionState( new TaskExecutionState( - jobId, v1.getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, new RuntimeException(exceptionMessage))); @@ -958,11 +942,7 @@ public void testExceptionHistoryWithGlobalFailOver() { // we have to cancel the task and trigger the restart to have the exception history // populated scheduler.updateTaskExecutionState( - new TaskExecutionState( - jobGraph.getJobID(), - attemptId, - ExecutionState.CANCELED, - expectedException)); + new TaskExecutionState(attemptId, ExecutionState.CANCELED, expectedException)); taskRestartExecutor.triggerScheduledTasks(); final long end = System.currentTimeMillis(); @@ -1049,10 +1029,7 @@ public void testExceptionHistoryWithRestartableFailure() { private static TaskExecutionState createFailedTaskExecutionState( JobID jobId, ExecutionAttemptID executionAttemptID) { return new TaskExecutionState( - jobId, - executionAttemptID, - ExecutionState.FAILED, - new Exception("Expected failure cause")); + executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause")); } private static Range initiateFailure( @@ -1062,8 +1039,7 @@ private static Range initiateFailure( Throwable exception) { long start = System.currentTimeMillis(); scheduler.updateTaskExecutionState( - new TaskExecutionState( - jobId, executionAttemptID, ExecutionState.FAILED, exception)); + new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, exception)); return Range.closed(start, System.currentTimeMillis()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index ad3e98fb9c5e9..1901953f73813 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -210,10 +210,7 @@ public static void failExecution(DefaultScheduler scheduler, JobVertexID jvid, i final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask); scheduler.updateTaskExecutionState( new TaskExecutionState( - scheduler.getJobId(), - attemptID, - ExecutionState.FAILED, - new Exception("test task failure"))); + attemptID, ExecutionState.FAILED, new Exception("test task failure"))); } public static void canceledExecution( @@ -221,35 +218,29 @@ public static void canceledExecution( final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask); scheduler.updateTaskExecutionState( new TaskExecutionState( - scheduler.getJobId(), - attemptID, - ExecutionState.CANCELED, - new Exception("test task failure"))); + attemptID, ExecutionState.CANCELED, new Exception("test task failure"))); } public static void setExecutionToRunning( DefaultScheduler scheduler, JobVertexID jvid, int subtask) { final ExecutionAttemptID attemptID = getAttemptId(scheduler, jvid, subtask); scheduler.updateTaskExecutionState( - new TaskExecutionState(scheduler.getJobId(), attemptID, ExecutionState.RUNNING)); + new TaskExecutionState(attemptID, ExecutionState.RUNNING)); } public static void setAllExecutionsToRunning(final DefaultScheduler scheduler) { - final JobID jid = scheduler.getJobId(); getAllCurrentExecutionAttempts(scheduler) .forEach( (attemptId) -> scheduler.updateTaskExecutionState( - new TaskExecutionState( - jid, attemptId, ExecutionState.RUNNING))); + new TaskExecutionState(attemptId, ExecutionState.RUNNING))); } public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) { - final JobID jid = scheduler.getJobId(); for (final ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) { final boolean setToRunning = scheduler.updateTaskExecutionState( - new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED)); + new TaskExecutionState(attemptId, ExecutionState.CANCELED)); assertTrue("could not switch task to RUNNING", setToRunning); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 14294d7490213..60ebb9040b0d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -549,9 +549,7 @@ public void testUpdateTaskExecutionStateReturnsFalseInIllegalState() throws Exce scheduler.updateTaskExecutionState( new TaskExecutionStateTransition( new TaskExecutionState( - jobGraph.getJobID(), - new ExecutionAttemptID(), - ExecutionState.FAILED))), + new ExecutionAttemptID(), ExecutionState.FAILED))), is(false)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java index cd210c9f9292c..a9248aa3d70fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CancelingTest.java @@ -101,7 +101,6 @@ public void testTaskFailuresAreIgnored() throws Exception { TaskExecutionStateTransition update = new TaskExecutionStateTransition( new TaskExecutionState( - canceling.getJob().getJobID(), ejv.getMockExecutionVertex() .getCurrentExecutionAttempt() .getAttemptId(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index c9e43a7634419..0e7269ae8f685 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -203,8 +203,7 @@ public void testFailureReportedViaUpdateTaskExecutionStateCausesFailingOnNoResta ctx.setHowToHandleFailure(Executing.FailureResult::canNotRestart); ctx.setExpectFailing(assertNonNull()); - exec.updateTaskExecutionState( - createFailingStateTransition(exec.getExecutionGraph().getJobID())); + exec.updateTaskExecutionState(createFailingStateTransition()); } } @@ -220,8 +219,7 @@ public void testFailureReportedViaUpdateTaskExecutionStateCausesRestart() throws ctx.setHowToHandleFailure((ign) -> Executing.FailureResult.canRestart(Duration.ZERO)); ctx.setExpectRestarting(assertNonNull()); - exec.updateTaskExecutionState( - createFailingStateTransition(exec.getExecutionGraph().getJobID())); + exec.updateTaskExecutionState(createFailingStateTransition()); } } @@ -235,20 +233,16 @@ public void testFalseReportsViaUpdateTaskExecutionStateAreIgnored() throws Excep .setExecutionGraph(returnsFailedStateExecutionGraph) .build(ctx); - exec.updateTaskExecutionState( - createFailingStateTransition(exec.getExecutionGraph().getJobID())); + exec.updateTaskExecutionState(createFailingStateTransition()); ctx.assertNoStateTransition(); } } - private static TaskExecutionStateTransition createFailingStateTransition(JobID jobId) { + private static TaskExecutionStateTransition createFailingStateTransition() { return new TaskExecutionStateTransition( new TaskExecutionState( - jobId, - new ExecutionAttemptID(), - ExecutionState.FAILED, - new RuntimeException())); + new ExecutionAttemptID(), ExecutionState.FAILED, new RuntimeException())); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java index c2c4c9656a1a2..ed09daba8980f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/FailingTest.java @@ -107,7 +107,6 @@ public void testTaskFailuresAreIgnored() throws Exception { TaskExecutionStateTransition update = new TaskExecutionStateTransition( new TaskExecutionState( - failing.getJob().getJobID(), ejv.getMockExecutionVertex() .getCurrentExecutionAttempt() .getAttemptId(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java index b15e2a0e0da39..b89e6e675117a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskmanager; -import org.apache.flink.api.common.JobID; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -29,7 +28,8 @@ import java.io.PrintStream; import java.io.PrintWriter; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; /** * Correctness tests for hash/equals and serialization for the {@link @@ -40,13 +40,12 @@ public class TaskExecutionStateTest { @Test public void testEqualsHashCode() { try { - final JobID jid = new JobID(); final ExecutionAttemptID executionId = new ExecutionAttemptID(); final ExecutionState state = ExecutionState.RUNNING; final Throwable error = new RuntimeException("some test error message"); - TaskExecutionState s1 = new TaskExecutionState(jid, executionId, state, error); - TaskExecutionState s2 = new TaskExecutionState(jid, executionId, state, error); + TaskExecutionState s1 = new TaskExecutionState(executionId, state, error); + TaskExecutionState s2 = new TaskExecutionState(executionId, state, error); assertEquals(s1.hashCode(), s2.hashCode()); assertEquals(s1, s2); @@ -59,13 +58,12 @@ public void testEqualsHashCode() { @Test public void testSerialization() { try { - final JobID jid = new JobID(); final ExecutionAttemptID executionId = new ExecutionAttemptID(); final ExecutionState state = ExecutionState.DEPLOYING; final Throwable error = new IOException("fubar"); - TaskExecutionState original1 = new TaskExecutionState(jid, executionId, state, error); - TaskExecutionState original2 = new TaskExecutionState(jid, executionId, state); + TaskExecutionState original1 = new TaskExecutionState(executionId, state, error); + TaskExecutionState original2 = new TaskExecutionState(executionId, state); TaskExecutionState javaSerCopy1 = CommonTestUtils.createCopySerializable(original1); TaskExecutionState javaSerCopy2 = CommonTestUtils.createCopySerializable(original2); @@ -110,8 +108,7 @@ public void printStackTrace(PrintWriter s) { } }; - new TaskExecutionState( - new JobID(), new ExecutionAttemptID(), ExecutionState.FAILED, hostile); + new TaskExecutionState(new ExecutionAttemptID(), ExecutionState.FAILED, hostile); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index fe472952e6b15..331e1066847f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -1009,7 +1009,6 @@ private void validateListenerMessage(ExecutionState state, Task task, Throwable final TaskExecutionState taskState = queue.take(); assertNotNull("There is no additional listener message", state); - assertEquals(task.getJobID(), taskState.getJobID()); assertEquals(task.getExecutionId(), taskState.getID()); assertEquals(state, taskState.getExecutionState()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 41f1585493fee..56eb6fea60252 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -382,8 +382,7 @@ public void testEarlyCanceling() throws Exception { .build(); final TaskExecutionState state = - new TaskExecutionState( - task.getJobID(), task.getExecutionId(), ExecutionState.RUNNING); + new TaskExecutionState(task.getExecutionId(), ExecutionState.RUNNING); task.startTaskThread();