Skip to content

Commit

Permalink
[FLINK-23625][metrics] Migrate TaskManagerJobMG instantiations to fac…
Browse files Browse the repository at this point in the history
…tory method
  • Loading branch information
shenzhu authored and zentol committed Aug 16, 2021
1 parent cb273e5 commit 778fc42
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;

Expand Down Expand Up @@ -121,14 +120,24 @@ public void setupReporter() {
TaskManagerMetricGroup tmMetricGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, TASK_MANAGER_HOST, new ResourceID(TASK_MANAGER_ID));
TaskManagerJobMetricGroup tmJobMetricGroup =
new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
taskMetricGroup1 =
tmJobMetricGroup.addTask(
taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER);
tmMetricGroup.addTaskForJob(
jobId,
JOB_NAME,
taskId1,
taskAttemptId1,
TASK_NAME,
SUBTASK_INDEX_1,
ATTEMPT_NUMBER);
taskMetricGroup2 =
tmJobMetricGroup.addTask(
taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER);
tmMetricGroup.addTaskForJob(
jobId,
JOB_NAME,
taskId2,
taskAttemptId2,
TASK_NAME,
SUBTASK_INDEX_2,
ATTEMPT_NUMBER);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.ReporterScopedSettings;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -215,26 +216,23 @@ public void histogramIsReportedAsPrometheusSummary() throws UnirestException {

@Test
public void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws UnirestException {
TaskManagerMetricGroup tmMetricGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, HOST_NAME, new ResourceID(TASK_MANAGER));
JobManagerMetricGroup jmMetricGroup =
JobManagerMetricGroup.createJobManagerMetricGroup(registry, HOST_NAME);

String metricName = "metric";

Counter metric1 = new SimpleCounter();
FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup1 =
FrontMetricGroup<JobManagerJobMetricGroup> metricGroup1 =
new FrontMetricGroup<>(
createReporterScopedSettings(),
new TaskManagerJobMetricGroup(
registry, tmMetricGroup, JobID.generate(), "job_1"));
jmMetricGroup.addJob(JobID.generate(), "job_1"));
reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1);

Counter metric2 = new SimpleCounter();
FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup2 =
FrontMetricGroup<JobManagerJobMetricGroup> metricGroup2 =
new FrontMetricGroup<>(
createReporterScopedSettings(),
new TaskManagerJobMetricGroup(
registry, tmMetricGroup, JobID.generate(), "job_2"));
jmMetricGroup.addJob(JobID.generate(), "job_2"));
reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);

reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class TaskManagerJobMetricGroup extends JobMetricGroup<TaskManagerMetricG

// ------------------------------------------------------------------------

public TaskManagerJobMetricGroup(
TaskManagerJobMetricGroup(
MetricRegistry registry,
TaskManagerMetricGroup parent,
JobID jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,15 @@ public void testGenerateScopeDefault() throws Exception {
TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));
TaskManagerJobMetricGroup jmGroup =
new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup =
jmGroup.addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 11, 0);
tmGroup.addTaskForJob(
new JobID(),
"myJobName",
new JobVertexID(),
new ExecutionAttemptID(),
"aTaskName",
11,
0);
InternalOperatorMetricGroup opGroup =
taskGroup.getOrAddOperator(new OperatorID(), "myOpName");

Expand Down Expand Up @@ -136,10 +141,15 @@ public void testIOMetricGroupInstantiation() throws Exception {
TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));
TaskManagerJobMetricGroup jmGroup =
new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup =
jmGroup.addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 11, 0);
tmGroup.addTaskForJob(
new JobID(),
"myJobName",
new JobVertexID(),
new ExecutionAttemptID(),
"aTaskName",
11,
0);
InternalOperatorMetricGroup opGroup =
taskGroup.getOrAddOperator(new OperatorID(), "myOpName");

Expand All @@ -158,9 +168,8 @@ public void testVariables() {
TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));
TaskManagerJobMetricGroup jmGroup =
new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
TaskMetricGroup taskGroup = jmGroup.addTask(tid, eid, "aTaskName", 11, 0);
TaskMetricGroup taskGroup =
tmGroup.addTaskForJob(jid, "myJobName", tid, eid, "aTaskName", 11, 0);
InternalOperatorMetricGroup opGroup = taskGroup.getOrAddOperator(oid, "myOpName");

Map<String, String> variables = opGroup.getAllVariables();
Expand Down Expand Up @@ -194,8 +203,7 @@ public void testCreateQueryServiceMetricInfo() {
TaskManagerMetricGroup tm =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "host", new ResourceID("id"));
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = job.addTask(vid, eid, "taskName", 4, 5);
TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5);
InternalOperatorMetricGroup operator = task.getOrAddOperator(oid, "operator");

QueryScopeInfo.OperatorQueryScopeInfo info =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,7 @@ public void testCreateQueryServiceMetricInfo() {
TaskManagerMetricGroup tm =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "host", new ResourceID("id"));
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = job.addTask(vid, eid, "taskName", 4, 5);
TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5);
GenericMetricGroup userGroup1 = new GenericMetricGroup(registry, task, "hello");
GenericMetricGroup userGroup2 = new GenericMetricGroup(registry, userGroup1, "world");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,15 @@ public void testGenerateScopeDefault() {
TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));
TaskManagerJobMetricGroup jmGroup =
new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup =
jmGroup.addTask(new JobVertexID(), new ExecutionAttemptID(), "aTaskName", 13, 2);
tmGroup.addTaskForJob(
new JobID(),
"myJobName",
new JobVertexID(),
new ExecutionAttemptID(),
"aTaskName",
13,
2);

assertArrayEquals(
new String[] {
Expand Down Expand Up @@ -102,9 +107,8 @@ public void testGenerateScopeCustom() throws Exception {
TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));
TaskManagerJobMetricGroup jmGroup =
new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
TaskMetricGroup taskGroup = jmGroup.addTask(vertexId, executionId, "aTaskName", 13, 2);
TaskMetricGroup taskGroup =
tmGroup.addTaskForJob(jid, "myJobName", vertexId, executionId, "aTaskName", 13, 2);

assertArrayEquals(
new String[] {
Expand All @@ -130,11 +134,16 @@ public void testGenerateScopeWilcard() throws Exception {
TaskManagerMetricGroup tmGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "theHostName", new ResourceID("test-tm-id"));
TaskManagerJobMetricGroup jmGroup =
new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");

TaskMetricGroup taskGroup =
jmGroup.addTask(new JobVertexID(), executionId, "aTaskName", 13, 1);
tmGroup.addTaskForJob(
new JobID(),
"myJobName",
new JobVertexID(),
executionId,
"aTaskName",
13,
1);

assertArrayEquals(
new String[] {
Expand All @@ -161,8 +170,7 @@ public void testCreateQueryServiceMetricInfo() {
TaskManagerMetricGroup tm =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "host", new ResourceID("id"));
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = job.addTask(vid, eid, "taskName", 4, 5);
TaskMetricGroup task = tm.addTaskForJob(jid, "jobname", vid, eid, "taskName", 4, 5);

QueryScopeInfo.TaskQueryScopeInfo info =
task.createQueryServiceMetricInfo(new DummyCharacterFilter());
Expand All @@ -178,11 +186,15 @@ public void testTaskMetricGroupCleanup() throws Exception {
TaskManagerMetricGroup taskManagerMetricGroup =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "localhost", new ResourceID("0"));
TaskManagerJobMetricGroup taskManagerJobMetricGroup =
new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
TaskMetricGroup taskMetricGroup =
taskManagerJobMetricGroup.addTask(
new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0);
taskManagerMetricGroup.addTaskForJob(
new JobID(),
"job",
new JobVertexID(),
new ExecutionAttemptID(),
"task",
0,
0);

// the io metric should have registered predefined metrics
assertTrue(registry.getNumberRegisteredMetrics() > 0);
Expand All @@ -204,10 +216,15 @@ public void testOperatorNameTruncation() throws Exception {
TaskManagerMetricGroup tm =
TaskManagerMetricGroup.createTaskManagerMetricGroup(
registry, "host", new ResourceID("id"));
TaskManagerJobMetricGroup job =
new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
TaskMetricGroup taskMetricGroup =
job.addTask(new JobVertexID(), new ExecutionAttemptID(), "task", 0, 0);
tm.addTaskForJob(
new JobID(),
"jobname",
new JobVertexID(),
new ExecutionAttemptID(),
"task",
0,
0);

String originalName = new String(new char[100]).replace("\0", "-");
InternalOperatorMetricGroup operatorMetricGroup =
Expand Down

0 comments on commit 778fc42

Please sign in to comment.