From 7d375cca5e3f95402c00b9ab95f6b76421b5e683 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Wed, 27 Nov 2019 17:58:19 +0100 Subject: [PATCH] [FLINK-14993][metrics] Store delimiter in reporter settings --- .../flink/metrics/jmx/JMXReporterTest.java | 12 ++++++++---- .../prometheus/PrometheusReporterTest.java | 10 +++++++--- .../flink/runtime/metrics/MetricRegistry.java | 8 -------- .../runtime/metrics/MetricRegistryImpl.java | 16 +++++++++------- .../runtime/metrics/NoOpMetricRegistry.java | 5 ----- .../metrics/groups/AbstractMetricGroup.java | 7 +++---- .../runtime/metrics/groups/FrontMetricGroup.java | 6 +++--- .../metrics/groups/ReporterScopedSettings.java | 9 ++++++++- .../metrics/groups/AbstractMetricGroupTest.java | 9 ++------- .../state/RocksDBNativeMetricMonitorTest.java | 5 ----- .../flink/streaming/util/LatencyStatsTest.java | 5 ----- 11 files changed, 40 insertions(+), 52 deletions(-) diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 163debacc92ca..821d86679cfac 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -129,8 +129,8 @@ public Integer getValue() { } }; - rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(new ReporterScopedSettings(0), mg)); - rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(new ReporterScopedSettings(0), mg)); + rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(createReporterScopedSettings(0), mg)); + rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(createReporterScopedSettings(0), mg)); MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); @@ -183,9 +183,9 @@ public Integer getValue() { } }; - rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(new ReporterScopedSettings(0), mg)); + rep1.notifyOfAddedMetric(g1, "rep1", new FrontMetricGroup<>(createReporterScopedSettings(0), mg)); - rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(new ReporterScopedSettings(1), mg)); + rep2.notifyOfAddedMetric(g2, "rep2", new FrontMetricGroup<>(createReporterScopedSettings(1), mg)); ObjectName objectName1 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep1", JMXReporter.generateJmxTable(mg.getAllVariables())); ObjectName objectName2 = new ObjectName(JMX_DOMAIN_PREFIX + "taskmanager.rep2", JMXReporter.generateJmxTable(mg.getAllVariables())); @@ -304,4 +304,8 @@ public void testMeterReporting() throws Exception { } } } + + private static ReporterScopedSettings createReporterScopedSettings(int reporterIndex) { + return new ReporterScopedSettings(reporterIndex, ','); + } } diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 7e2d297ccafc0..f271ae81b228a 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -87,7 +87,7 @@ public void setupReporter() { MetricRegistryConfiguration.defaultMetricRegistryConfiguration(), Collections.singletonList(createReporterSetup("test1", portRangeProvider.next()))); metricGroup = new FrontMetricGroup<>( - new ReporterScopedSettings(0), + createReporterScopedSettings(), new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); reporter = (PrometheusReporter) registry.getReporters().get(0); } @@ -176,13 +176,13 @@ public void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws UnirestExc Counter metric1 = new SimpleCounter(); FrontMetricGroup metricGroup1 = new FrontMetricGroup<>( - new ReporterScopedSettings(0), + createReporterScopedSettings(), new TaskManagerJobMetricGroup(registry, tmMetricGroup, JobID.generate(), "job_1")); reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1); Counter metric2 = new SimpleCounter(); FrontMetricGroup metricGroup2 = new FrontMetricGroup<>( - new ReporterScopedSettings(0), + createReporterScopedSettings(), new TaskManagerJobMetricGroup(registry, tmMetricGroup, JobID.generate(), "job_2")); reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2); @@ -345,4 +345,8 @@ public String next() { return String.valueOf(lowEnd) + "-" + String.valueOf(highEnd); } } + + private static ReporterScopedSettings createReporterScopedSettings() { + return new ReporterScopedSettings(0, ','); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index c5e78a4679c56..b57d17a89ae6c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -37,14 +37,6 @@ public interface MetricRegistry { */ char getDelimiter(); - /** - * Returns the configured delimiter for the reporter with the given index. - * - * @param index index of the reporter whose delimiter should be used - * @return configured reporter delimiter, or global delimiter if index is invalid - */ - char getDelimiter(int index); - /** * Returns the number of registered reporters. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index cec8ee3d59cde..0c9ba04d01478 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -71,7 +71,6 @@ public class MetricRegistryImpl implements MetricRegistry { private final ScopeFormats scopeFormats; private final char globalDelimiter; - private final List delimiters; private final CompletableFuture terminationFuture; @@ -98,7 +97,6 @@ public MetricRegistryImpl(MetricRegistryConfiguration config, Collection(10); this.terminationFuture = new CompletableFuture<>(); this.isShutdown = false; @@ -147,14 +145,18 @@ public MetricRegistryImpl(MetricRegistryConfiguration config, Collection= scopeStrings.length)) { - char delimiter = registry.getDelimiter(); String newScopeString; if (filter != null) { newScopeString = ScopeFormat.concat(filter, delimiter, scopeComponents); @@ -266,7 +266,6 @@ public String getMetricIdentifier(String metricName, CharacterFilter filter, int } return newScopeString + delimiter + metricName; } else { - char delimiter = registry.getDelimiter(reporterIndex); if (scopeStrings[reporterIndex] == null) { if (filter != null) { scopeStrings[reporterIndex] = ScopeFormat.concat(filter, delimiter, scopeComponents); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java index f1c6a937586d4..0643826dfe571 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java @@ -39,16 +39,16 @@ public FrontMetricGroup(ReporterScopedSettings settings, P reference) { @Override public String getMetricIdentifier(String metricName) { - return parentMetricGroup.getMetricIdentifier(metricName, null, this.settings.getReporterIndex()); + return parentMetricGroup.getMetricIdentifier(metricName, null, this.settings.getReporterIndex(), this.settings.getDelimiter()); } @Override public String getMetricIdentifier(String metricName, CharacterFilter filter) { - return parentMetricGroup.getMetricIdentifier(metricName, filter, this.settings.getReporterIndex()); + return parentMetricGroup.getMetricIdentifier(metricName, filter, this.settings.getReporterIndex(), this.settings.getDelimiter()); } public String getLogicalScope(CharacterFilter filter) { - return parentMetricGroup.getLogicalScope(filter); + return parentMetricGroup.getLogicalScope(filter, this.settings.getDelimiter()); } public String getLogicalScope(CharacterFilter filter, char delimiter) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java index c0c8969b6c45b..0e064e47c85ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ReporterScopedSettings.java @@ -26,12 +26,19 @@ public class ReporterScopedSettings { private final int reporterIndex; - public ReporterScopedSettings(int reporterIndex) { + private final char delimiter; + + public ReporterScopedSettings(int reporterIndex, char delimiter) { Preconditions.checkArgument(reporterIndex >= 0); this.reporterIndex = reporterIndex; + this.delimiter = delimiter; } public int getReporterIndex() { return reporterIndex; } + + public char getDelimiter() { + return delimiter; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 790e85f6e2693..83ae45dddcbca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -266,8 +266,8 @@ public void testScopeGenerationWithoutReporters() throws Exception { // no caching should occur assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B)); // invalid reporter indices do not throw errors - assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1)); - assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2)); + assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1, '.')); + assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2, '.')); } finally { testRegistry.shutdown().get(); } @@ -326,11 +326,6 @@ public char getDelimiter() { return 0; } - @Override - public char getDelimiter(int index) { - return 0; - } - @Override public int getNumberReporters() { return 0; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java index f58eae4e8d208..eb299fc4b3238 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java @@ -168,11 +168,6 @@ public char getDelimiter() { return 0; } - @Override - public char getDelimiter(int index) { - return 0; - } - @Override public int getNumberReporters() { return 0; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java index 81cb4a70a7a38..1b19f24f76ea0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java @@ -186,11 +186,6 @@ public char getDelimiter() { return '.'; } - @Override - public char getDelimiter(int index) { - return 0; - } - @Override public int getNumberReporters() { return 0;