Skip to content

Commit

Permalink
[FLINK-9665][metrics] Unregister individual metrics in PrometheusRepo…
Browse files Browse the repository at this point in the history
…rter

This closes apache#6239.
  • Loading branch information
Jelmer Kuperus authored and zentol committed Jul 3, 2018
1 parent a49587a commit 75d12f9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,37 @@ private static void addMetric(Metric metric, List<String> dimensionValues, Colle
}
}

private static void removeMetric(Metric metric, List<String> dimensionValues, Collector collector) {
if (metric instanceof Gauge) {
((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
} else if (metric instanceof Counter) {
((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
} else if (metric instanceof Meter) {
((io.prometheus.client.Gauge) collector).remove(toArray(dimensionValues));
} else if (metric instanceof Histogram) {
((HistogramSummaryProxy) collector).remove(dimensionValues);
} else {
LOG.warn("Cannot remove unknown metric type: {}. This indicates that the metric type is not supported by this reporter.",
metric.getClass().getName());
}
}

@Override
public void notifyOfRemovedMetric(final Metric metric, final String metricName, final MetricGroup group) {

List<String> dimensionValues = new LinkedList<>();
for (final Map.Entry<String, String> dimension : group.getAllVariables().entrySet()) {
dimensionValues.add(CHARACTER_FILTER.filterCharacters(dimension.getValue()));
}

final String scopedMetricName = getScopedName(metricName, group);
synchronized (this) {
final AbstractMap.SimpleImmutableEntry<Collector, Integer> collectorWithCount = collectorsWithCountByMetricName.get(scopedMetricName);
final Integer count = collectorWithCount.getValue();
final Collector collector = collectorWithCount.getKey();

removeMetric(metric, dimensionValues, collector);

if (count == 1) {
try {
CollectorRegistry.defaultRegistry.unregister(collector);
Expand Down Expand Up @@ -295,6 +319,10 @@ void addChild(final Histogram histogram, final List<String> labelValues) {
histogramsByLabelValues.put(labelValues, histogram);
}

void remove(final List<String> labelValues) {
histogramsByLabelValues.remove(labelValues);
}

private void addSamples(final List<String> labelValues, final Histogram histogram, final List<MetricFamilySamples.Sample> samples) {
samples.add(new MetricFamilySamples.Sample(metricName + "_count",
labelNamesWithQuantile.subList(0, labelNamesWithQuantile.size() - 1), labelValues, histogram.getCount()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.metrics.prometheus;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;

Expand All @@ -51,6 +53,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

/**
Expand Down Expand Up @@ -159,6 +162,27 @@ public void histogramIsReportedAsPrometheusSummary() throws UnirestException {
}
}

@Test
public void metricIsRemovedWhenCollectorIsNotUnregisteredYet() throws UnirestException {
TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER);

String metricName = "metric";

Counter metric1 = new SimpleCounter();
FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup1 = new FrontMetricGroup<>(0, new TaskManagerJobMetricGroup(registry, tmMetricGroup, JobID.generate(), "job_1"));
reporter.notifyOfAddedMetric(metric1, metricName, metricGroup1);

Counter metric2 = new SimpleCounter();
FrontMetricGroup<TaskManagerJobMetricGroup> metricGroup2 = new FrontMetricGroup<>(0, new TaskManagerJobMetricGroup(registry, tmMetricGroup, JobID.generate(), "job_2"));
reporter.notifyOfAddedMetric(metric2, metricName, metricGroup2);

reporter.notifyOfRemovedMetric(metric1, metricName, metricGroup1);

String response = pollMetrics(reporter.getPort()).getBody();

assertThat(response, not(containsString("job_1")));
}

@Test
public void invalidCharactersAreReplacedWithUnderscore() {
assertThat(PrometheusReporter.replaceInvalidChars(""), equalTo(""));
Expand Down

0 comments on commit 75d12f9

Please sign in to comment.