Skip to content

Commit

Permalink
[FLINK-6861][metrics] Use OperatorID in metric system
Browse files Browse the repository at this point in the history
This closes apache#4849.
  • Loading branch information
zentol committed Oct 24, 2017
1 parent 8e3d86d commit a292b21
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 36 deletions.
4 changes: 3 additions & 1 deletion docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ or by assigning unique names to jobs and operators.
- TaskManager: <host>, <tm_id>
- Job: <job_id>, <job_name>
- Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>
- Operator: <operator_name>, <subtask_index>
- Operator: <operator_id>,<operator_name>, <subtask_index>

**Important:** For the Batch API, <operator_id> is always equal to <task_id>.

## Reporter

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
Expand Down Expand Up @@ -97,7 +98,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept

TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new AbstractID(), new AbstractID(), taskName, 0, 0);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

SimpleCounter myCounter = new SimpleCounter();
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept

TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(metricRegistry, hostname, taskManagerId);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(metricRegistry, tmMetricGroup, new JobID(), jobName);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new AbstractID(), new AbstractID(), taskName, 0, 0);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(metricRegistry, tmJobMetricGroup, new JobVertexID(), new AbstractID(), taskName, 0, 0);

SimpleCounter myCounter = new SimpleCounter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
Expand All @@ -35,11 +36,13 @@
@Internal
public class OperatorMetricGroup extends ComponentMetricGroup<TaskMetricGroup> {
private final String operatorName;
private final OperatorID operatorID;

private final OperatorIOMetricGroup ioMetrics;

public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, String operatorName) {
super(registry, registry.getScopeFormats().getOperatorFormat().formatScope(checkNotNull(parent), operatorName), parent);
public OperatorMetricGroup(MetricRegistry registry, TaskMetricGroup parent, OperatorID operatorID, String operatorName) {
super(registry, registry.getScopeFormats().getOperatorFormat().formatScope(checkNotNull(parent), operatorID, operatorName), parent);
this.operatorID = operatorID;
this.operatorName = operatorName;

ioMetrics = new OperatorIOMetricGroup(this);
Expand Down Expand Up @@ -75,6 +78,7 @@ public OperatorIOMetricGroup getIOMetricGroup() {

@Override
protected void putVariables(Map<String, String> variables) {
variables.put(ScopeFormat.SCOPE_OPERATOR_ID, String.valueOf(operatorID));
variables.put(ScopeFormat.SCOPE_OPERATOR_NAME, operatorName);
// we don't enter the subtask_index as the task group does that already
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
Expand All @@ -40,7 +42,7 @@
@Internal
public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> {

private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>();

static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;

Expand All @@ -50,7 +52,7 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
private final AbstractID executionId;

@Nullable
protected final AbstractID vertexId;
protected final JobVertexID vertexId;

@Nullable
private final String taskName;
Expand All @@ -64,7 +66,7 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
public TaskMetricGroup(
MetricRegistry registry,
TaskManagerJobMetricGroup parent,
@Nullable AbstractID vertexId,
@Nullable JobVertexID vertexId,
AbstractID executionId,
@Nullable String taskName,
int subtaskIndex,
Expand Down Expand Up @@ -124,7 +126,7 @@ public TaskIOMetricGroup getIOMetricGroup() {
protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
return new QueryScopeInfo.TaskQueryScopeInfo(
this.parent.jobId.toString(),
this.vertexId.toString(),
String.valueOf(this.vertexId),
this.subtaskIndex);
}

Expand All @@ -133,20 +135,24 @@ protected QueryScopeInfo.TaskQueryScopeInfo createQueryServiceMetricInfo(Charact
// ------------------------------------------------------------------------

public OperatorMetricGroup addOperator(String name) {
return addOperator(OperatorID.fromJobVertexID(vertexId), name);
}

public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
}
OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, name);
OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name);

synchronized (this) {
OperatorMetricGroup previous = operators.put(name, operator);
OperatorMetricGroup previous = operators.put(operatorID, operator);
if (previous == null) {
// no operator group so far
return operator;
} else {
// already had an operator group. restore that one.
operators.put(name, previous);
operators.put(operatorID, previous);
return previous;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.metrics.scope;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;

/**
Expand All @@ -36,11 +37,12 @@ public OperatorScopeFormat(String format, TaskScopeFormat parentFormat) {
SCOPE_TASK_NAME,
SCOPE_TASK_SUBTASK_INDEX,
SCOPE_TASK_ATTEMPT_NUM,
SCOPE_OPERATOR_ID,
SCOPE_OPERATOR_NAME
});
}

public String[] formatScope(TaskMetricGroup parent, String operatorName) {
public String[] formatScope(TaskMetricGroup parent, OperatorID operatorID, String operatorName) {

final String[] template = copyTemplate();
final String[] values = {
Expand All @@ -53,6 +55,7 @@ public String[] formatScope(TaskMetricGroup parent, String operatorName) {
valueOrNull(parent.taskName()),
String.valueOf(parent.subtaskIndex()),
String.valueOf(parent.attemptNumber()),
valueOrNull(operatorID),
valueOrNull(operatorName)
};
return bindVariables(template, values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public String filterCharacters(String input) {

// ----- Operator ----

public static final String SCOPE_OPERATOR_ID = asVariable("operator_id");
public static final String SCOPE_OPERATOR_NAME = asVariable("operator_name");


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void tolerateMetricAndGroupNameCollisions() {
@Test
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
AbstractID vid = new AbstractID();
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
MetricRegistry registry = new MetricRegistry(defaultMetricRegistryConfiguration);
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
Expand Down Expand Up @@ -47,8 +52,8 @@ public void testGenerateScopeDefault() {
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName");
registry, jmGroup, new JobVertexID(), new AbstractID(), "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, new OperatorID(), "myOpName");

assertArrayEquals(
new String[] { "theHostName", "taskmanager", "test-tm-id", "myJobName", "myOpName", "11" },
Expand All @@ -61,15 +66,44 @@ public void testGenerateScopeDefault() {
registry.shutdown();
}

@Test
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>");
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
try {
String tmID = "test-tm-id";
JobID jid = new JobID();
JobVertexID vertexId = new JobVertexID();
OperatorID operatorID = new OperatorID();
String operatorName = "operatorName";

OperatorMetricGroup operatorGroup =
new TaskManagerMetricGroup(registry, "theHostName", tmID)
.addTaskForJob(jid, "myJobName", vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
.addOperator(operatorID, operatorName);

assertArrayEquals(
new String[]{tmID, jid.toString(), vertexId.toString(), operatorName, operatorID.toString()},
operatorGroup.getScopeComponents());

assertEquals(
String.format("%s.%s.%s.%s.%s.name", tmID, jid, vertexId, operatorName, operatorID),
operatorGroup.getMetricIdentifier("name"));
} finally {
registry.shutdown();
}
}

@Test
public void testIOMetricGroupInstantiation() {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());

TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, new AbstractID(), new AbstractID(), "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName");
registry, jmGroup, new JobVertexID(), new AbstractID(), "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, new OperatorID(), "myOpName");

assertNotNull(opGroup.getIOMetricGroup());
assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter());
Expand All @@ -83,14 +117,15 @@ public void testVariables() {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());

JobID jid = new JobID();
AbstractID tid = new AbstractID();
JobVertexID tid = new JobVertexID();
AbstractID eid = new AbstractID();
OperatorID oid = new OperatorID();

TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, jid, "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, tid, eid, "aTaskName", 11, 0);
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, "myOpName");
OperatorMetricGroup opGroup = new OperatorMetricGroup(registry, taskGroup, oid, "myOpName");

Map<String, String> variables = opGroup.getAllVariables();

Expand All @@ -103,6 +138,7 @@ public void testVariables() {
testVariable(variables, ScopeFormat.SCOPE_TASK_ATTEMPT_ID, eid.toString());
testVariable(variables, ScopeFormat.SCOPE_TASK_SUBTASK_INDEX, "11");
testVariable(variables, ScopeFormat.SCOPE_TASK_ATTEMPT_NUM, "0");
testVariable(variables, ScopeFormat.SCOPE_OPERATOR_ID, oid.toString());
testVariable(variables, ScopeFormat.SCOPE_OPERATOR_NAME, "myOpName");

registry.shutdown();
Expand All @@ -117,13 +153,14 @@ private static void testVariable(Map<String, String> variables, String key, Stri
@Test
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
AbstractID vid = new AbstractID();
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
OperatorID oid = new OperatorID();
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
OperatorMetricGroup operator = new OperatorMetricGroup(registry, task, "operator");
OperatorMetricGroup operator = new OperatorMetricGroup(registry, task, oid, "operator");

QueryScopeInfo.OperatorQueryScopeInfo info = operator.createQueryServiceMetricInfo(new DummyCharacterFilter());
assertEquals("", info.scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class TaskMetricGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
AbstractID vertexId = new AbstractID();
JobVertexID vertexId = new JobVertexID();
AbstractID executionId = new AbstractID();

TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
Expand All @@ -75,7 +76,7 @@ public void testGenerateScopeCustom() {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));

JobID jid = new JobID();
AbstractID vertexId = new AbstractID();
JobVertexID vertexId = new JobVertexID();
AbstractID executionId = new AbstractID();

TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
Expand Down Expand Up @@ -105,7 +106,7 @@ public void testGenerateScopeWilcard() {
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");

TaskMetricGroup taskGroup = new TaskMetricGroup(
registry, jmGroup, new AbstractID(), executionId, "aTaskName", 13, 1);
registry, jmGroup, new JobVertexID(), executionId, "aTaskName", 13, 1);

assertArrayEquals(
new String[]{"theHostName", "taskmanager", "test-tm-id", "myJobName", executionId.toString(), "13"},
Expand All @@ -120,7 +121,7 @@ public void testGenerateScopeWilcard() {
@Test
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
AbstractID vid = new AbstractID();
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
Expand All @@ -139,7 +140,7 @@ public void testTaskMetricGroupCleanup() {
CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration());
TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0");
TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new AbstractID(), new AbstractID(), "task", 0, 0);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, taskManagerJobMetricGroup, new JobVertexID(), new AbstractID(), "task", 0, 0);

// the io metric should have registered predefined metrics
assertTrue(registry.getNumberRegisteredMetrics() > 0);
Expand All @@ -159,7 +160,7 @@ public void testOperatorNameTruncation() {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, new JobID(), "jobname");
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new AbstractID(), new AbstractID(), "task", 0, 0);
TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0);

String originalName = new String(new char[100]).replace("\0", "-");
OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
Expand Down Expand Up @@ -76,7 +77,7 @@ public DummyTaskIOMetricGroup() {

public static class DummyOperatorMetricGroup extends OperatorMetricGroup {
public DummyOperatorMetricGroup() {
super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), "testoperator");
super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator");
}
}
}
Loading

0 comments on commit a292b21

Please sign in to comment.