diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java index b453e4cc0c538..a0440b537f3d3 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java @@ -31,7 +31,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistryImpl; import org.apache.flink.runtime.metrics.MetricRegistryTestUtils; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; @@ -121,14 +120,24 @@ public void setupReporter() { TaskManagerMetricGroup tmMetricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, TASK_MANAGER_HOST, new ResourceID(TASK_MANAGER_ID)); - TaskManagerJobMetricGroup tmJobMetricGroup = - new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); taskMetricGroup1 = - tmJobMetricGroup.addTask( - taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + tmMetricGroup.addTaskForJob( + jobId, + JOB_NAME, + taskId1, + taskAttemptId1, + TASK_NAME, + SUBTASK_INDEX_1, + ATTEMPT_NUMBER); taskMetricGroup2 = - tmJobMetricGroup.addTask( - taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + tmMetricGroup.addTaskForJob( + jobId, + JOB_NAME, + taskId2, + taskAttemptId2, + TASK_NAME, + SUBTASK_INDEX_2, + ATTEMPT_NUMBER); } @After diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 4041aff5ee278..6f043ae1cb681 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -33,8 +33,9 @@ import org.apache.flink.runtime.metrics.MetricRegistryTestUtils; import org.apache.flink.runtime.metrics.ReporterSetup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.ReporterScopedSettings; -import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.util.TestLogger; @@ -215,26 +216,23 @@ public void histogramIsReportedAsPrometheusSummary() throws UnirestException { @Test public void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws UnirestException { - TaskManagerMetricGroup tmMetricGroup = - TaskManagerMetricGroup.createTaskManagerMetricGroup( - registry, HOST_NAME, new ResourceID(TASK_MANAGER)); + JobManagerMetricGroup jmMetricGroup = + JobManagerMetricGroup.createJobManagerMetricGroup(registry, HOST_NAME); String metricName = "metric"; Counter metric1 = new SimpleCounter(); - FrontMetricGroup metricGroup1 = + FrontMetricGroup metricGroup1 = new FrontMetricGroup<>( createReporterScopedSettings(), - new TaskManagerJobMetricGroup( - registry, tmMetricGroup, JobID.generate(), "job_1")); + jmMetricGroup.addJob(JobID.generate(), "job_1")); reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1); Counter metric2 = new SimpleCounter(); - FrontMetricGroup metricGroup2 = + FrontMetricGroup metricGroup2 = new FrontMetricGroup<>( createReporterScopedSettings(), - new TaskManagerJobMetricGroup( - registry, tmMetricGroup, JobID.generate(), "job_2")); + jmMetricGroup.addJob(JobID.generate(), "job_2")); reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2); reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup1); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java index e9f1e2f23a241..2535acc7fa33e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobMetricGroup.java @@ -45,7 +45,7 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup variables = opGroup.getAllVariables(); @@ -194,8 +203,7 @@ public void testCreateQueryServiceMetricInfo() { TaskManagerMetricGroup tm = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "host", new ResourceID("id")); - TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); - TaskMetricGroup task = job.addTask(vid, eid, "taskName", 4, 5); + TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5); InternalOperatorMetricGroup operator = task.getOrAddOperator(oid, "operator"); QueryScopeInfo.OperatorQueryScopeInfo info = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index dce4f5c3f90d6..0da3f45790711 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -341,8 +341,7 @@ public void testCreateQueryServiceMetricInfo() { TaskManagerMetricGroup tm = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "host", new ResourceID("id")); - TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); - TaskMetricGroup task = job.addTask(vid, eid, "taskName", 4, 5); + TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5); GenericMetricGroup userGroup1 = new GenericMetricGroup(registry, task, "hello"); GenericMetricGroup userGroup2 = new GenericMetricGroup(registry, userGroup1, "world"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index b35ff9cd1cdb2..b0da84c27f01b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -69,10 +69,15 @@ public void testGenerateScopeDefault() { TaskManagerMetricGroup tmGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "theHostName", new ResourceID("test-tm-id")); - TaskManagerJobMetricGroup jmGroup = - new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = - jmGroup.addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 13, 2); + tmGroup.addTaskForJob( + new JobID(), + "myJobName", + new JobVertexID(), + new ExecutionAttemptID(), + "aTaskName", + 13, + 2); assertArrayEquals( new String[] { @@ -102,9 +107,8 @@ public void testGenerateScopeCustom() throws Exception { TaskManagerMetricGroup tmGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "theHostName", new ResourceID("test-tm-id")); - TaskManagerJobMetricGroup jmGroup = - new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName"); - TaskMetricGroup taskGroup = jmGroup.addTask(vertexId, executionId, "aTaskName", 13, 2); + TaskMetricGroup taskGroup = + tmGroup.addTaskForJob(jid, "myJobName", vertexId, executionId, "aTaskName", 13, 2); assertArrayEquals( new String[] { @@ -130,11 +134,16 @@ public void testGenerateScopeWilcard() throws Exception { TaskManagerMetricGroup tmGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "theHostName", new ResourceID("test-tm-id")); - TaskManagerJobMetricGroup jmGroup = - new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = - jmGroup.addTask(new JobVertexID(), executionId, "aTaskName", 13, 1); + tmGroup.addTaskForJob( + new JobID(), + "myJobName", + new JobVertexID(), + executionId, + "aTaskName", + 13, + 1); assertArrayEquals( new String[] { @@ -161,8 +170,7 @@ public void testCreateQueryServiceMetricInfo() { TaskManagerMetricGroup tm = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "host", new ResourceID("id")); - TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); - TaskMetricGroup task = job.addTask(vid, eid, "taskName", 4, 5); + TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5); QueryScopeInfo.TaskQueryScopeInfo info = task.createQueryServiceMetricInfo(new DummyCharacterFilter()); @@ -178,11 +186,15 @@ public void testTaskMetricGroupCleanup() throws Exception { TaskManagerMetricGroup taskManagerMetricGroup = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "localhost", new ResourceID("0")); - TaskManagerJobMetricGroup taskManagerJobMetricGroup = - new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); TaskMetricGroup taskMetricGroup = - taskManagerJobMetricGroup.addTask( - new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0); + taskManagerMetricGroup.addTaskForJob( + new JobID(), + "job", + new JobVertexID(), + new ExecutionAttemptID(), + "task", + 0, + 0); // the io metric should have registered predefined metrics assertTrue(registry.getNumberRegisteredMetrics() > 0); @@ -204,10 +216,15 @@ public void testOperatorNameTruncation() throws Exception { TaskManagerMetricGroup tm = TaskManagerMetricGroup.createTaskManagerMetricGroup( registry, "host", new ResourceID("id")); - TaskManagerJobMetricGroup job = - new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname"); TaskMetricGroup taskMetricGroup = - job.addTask(new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0); + tm.addTaskForJob( + new JobID(), + "jobname", + new JobVertexID(), + new ExecutionAttemptID(), + "task", + 0, + 0); String originalName = new String(new char[100]).replace("\0", "-"); InternalOperatorMetricGroup operatorMetricGroup =