From 7e48549b822bc54f8dc0a5e1f9cbb5f3156fda06 Mon Sep 17 00:00:00 2001 From: Yangze Guo Date: Wed, 22 Apr 2020 14:14:29 +0800 Subject: [PATCH] [FLINK-17300] Log the lineage information between ExecutionAttemptID and AllocationID This closes #11852. --- .../org/apache/flink/runtime/executiongraph/Execution.java | 6 ++---- .../org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 3 ++- .../flink/yarn/YARNSessionCapacitySchedulerITCase.java | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index f6702f8359dc0..56415e08e76d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -728,10 +728,8 @@ public void deploy() throws JobException { return; } - if (LOG.isInfoEnabled()) { - LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), - attemptNumber, getAssignedResourceLocation())); - } + LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(), + attemptNumber, vertex.getCurrentExecutionAttempt().getAttemptId(), getAssignedResourceLocation(), slot.getAllocationId()); final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory .fromExecutionVertex(vertex, attemptNumber) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f8438f5910888..bd2834a50c49b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -643,7 +643,8 @@ public CompletableFuture submitTask( taskMetricGroup.gauge(MetricNames.IS_BACKPRESSURED, task::isBackPressured); - log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); + log.info("Received task {} ({}), deploy into slot with allocation id {}.", + task.getTaskInfo().getTaskNameWithSubtasks(), tdd.getExecutionAttemptId(), tdd.getAllocationId()); boolean taskAdded; diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java index af0f5327586da..aca45b1dac223 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java @@ -573,7 +573,7 @@ public boolean accept(File dir, String name) { String expected = "Starting TaskManagers"; Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log: '" + jobmanagerLog + "'", content.contains(expected)); - expected = " (2/2) (attempt #0) to "; + expected = " (2/2) (attempt #0) with attempt id "; Assert.assertTrue("Expected string '" + expected + "' not found in JobManager log." + "This string checks that the job has been started with a parallelism of 2. Log contents: '" + jobmanagerLog + "'", content.contains(expected));