Skip to content

Commit

Permalink
[FLINK-11923][metrics] Move reporter setup out of MetricRegistry
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 17, 2019
1 parent 9438147 commit 6f78a94
Show file tree
Hide file tree
Showing 19 changed files with 607 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.dropwizard;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
Expand All @@ -36,6 +35,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
Expand All @@ -45,6 +45,7 @@
import org.junit.Test;

import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -82,16 +83,14 @@ public void testAddingMetrics() throws Exception {
String taskManagerId = "tas:kMana::ger";
String counterName = "testCounter";

configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
"org.apache.flink.dropwizard.ScheduledDropwizardReporterTest$TestingScheduledDropwizardReporter");

configuration.setString(MetricOptions.SCOPE_NAMING_TASK, "<host>.<tm_id>.<job_name>");
configuration.setString(MetricOptions.SCOPE_DELIMITER, "_");

MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);

MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
metricRegistryConfiguration,
Collections.singletonList(ReporterSetup.forReporter("test", new TestingScheduledDropwizardReporter())));

char delimiter = metricRegistry.getDelimiter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.flink.dropwizard.metrics;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;

Expand All @@ -39,6 +39,7 @@
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -99,16 +100,16 @@ public void testDropwizardHistogramWrapperReporting() throws Exception {
long timeout = 30000;
int size = 10;
String histogramMetricName = "histogram";
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");

MetricRegistryImpl registry = null;
MetricConfig config = new MetricConfig();
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");

MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);
MetricRegistryImpl registry = null;

try {
registry = new MetricRegistryImpl(metricRegistryConfiguration);
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Collections.singletonList(ReporterSetup.forReporter("test", config, new TestingReporter())));
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.flink.metrics.influxdb;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;

Expand All @@ -33,6 +33,8 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.Collections;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.containing;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
Expand Down Expand Up @@ -117,15 +119,14 @@ public void testMetricReporting() throws Exception {
}

private MetricRegistryImpl createMetricRegistry() {
String configPrefix = ConfigConstants.METRICS_REPORTER_PREFIX + "test.";
Configuration configuration = new Configuration();
configuration.setString(
configPrefix + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
InfluxdbReporter.class.getTypeName());
configuration.setString(configPrefix + "host", "localhost");
configuration.setString(configPrefix + "port", String.valueOf(wireMockRule.port()));
configuration.setString(configPrefix + "db", TEST_INFLUXDB_DB);
return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty("host", "localhost");
metricConfig.setProperty("port", String.valueOf(wireMockRule.port()));
metricConfig.setProperty("db", TEST_INFLUXDB_DB);

return new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Collections.singletonList(ReporterSetup.forReporter("test", metricConfig, new InfluxdbReporter())));
}

private static Counter registerTestMetric(String metricName, MetricRegistry metricRegistry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@

package org.apache.flink.metrics.jmx;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestReporter;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand All @@ -43,6 +42,8 @@
import javax.management.remote.JMXServiceURL;

import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
Expand Down Expand Up @@ -98,15 +99,15 @@ public void testGenerateTable() {
*/
@Test
public void testPortConflictHandling() throws Exception {
Configuration cfg = new Configuration();
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty("port", "9020-9035");

cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9020-9035");
ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());

cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");

MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
MetricRegistryImpl reg = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Arrays.asList(reporterSetup1, reporterSetup2));

TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");

Expand Down Expand Up @@ -155,16 +156,15 @@ public Integer getValue() {
*/
@Test
public void testJMXAvailability() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());

cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1.port", "9040-9055");
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty("port", "9040-9055");

cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");
ReporterSetup reporterSetup1 = ReporterSetup.forReporter("test1", metricConfig, new JMXReporter());
ReporterSetup reporterSetup2 = ReporterSetup.forReporter("test2", metricConfig, new JMXReporter());

MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
MetricRegistryImpl reg = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Arrays.asList(reporterSetup1, reporterSetup2));

TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");

Expand Down Expand Up @@ -231,10 +231,9 @@ public void testHistogramReporting() throws Exception {
String histogramName = "histogram";

try {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());

registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));

TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");

Expand Down Expand Up @@ -280,10 +279,9 @@ public void testMeterReporting() throws Exception {
String meterName = "meter";

try {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());

registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Collections.singletonList(ReporterSetup.forReporter("test", new JMXReporter())));

TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.metrics.prometheus.PrometheusReporterTest.createConfigWithOneReporter;
import static org.apache.flink.metrics.prometheus.PrometheusReporterTest.createReporterSetup;
import static org.apache.flink.metrics.prometheus.PrometheusReporterTest.pollMetrics;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -80,7 +81,9 @@ public class PrometheusReporterTaskScopeTest {

@Before
public void setupReporter() {
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Collections.singletonList(createReporterSetup("test1", "9400-9500")));
reporter = (PrometheusReporter) registry.getReporters().get(0);

TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@
package org.apache.flink.metrics.prometheus;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestHistogram;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
Expand All @@ -40,19 +40,20 @@
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;

import static org.apache.flink.metrics.prometheus.PrometheusReporter.ARG_PORT;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

Expand Down Expand Up @@ -81,7 +82,9 @@ public class PrometheusReporterTest extends TestLogger {

@Before
public void setupReporter() {
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRangeProvider.next())));
registry = new MetricRegistryImpl(
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
Collections.singletonList(createReporterSetup("test1", portRangeProvider.next())));
metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER));
reporter = (PrometheusReporter) registry.getReporters().get(0);
}
Expand Down Expand Up @@ -262,29 +265,29 @@ class SomeMetricType implements Metric{}

@Test
public void cannotStartTwoReportersOnSamePort() throws Exception {
final MetricRegistryImpl fixedPort1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRangeProvider.next())));
assertThat(fixedPort1.getReporters(), hasSize(1));
ReporterSetup setup1 = createReporterSetup("test1", portRangeProvider.next());

PrometheusReporter firstReporter = (PrometheusReporter) fixedPort1.getReporters().get(0);
int usedPort = ((PrometheusReporter) setup1.getReporter()).getPort();

final MetricRegistryImpl fixedPort2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort()))));
assertThat(fixedPort2.getReporters(), hasSize(0));

fixedPort1.shutdown().get();
fixedPort2.shutdown().get();
try {
createReporterSetup("test2", String.valueOf(usedPort));
Assert.fail("Should've failed since port is unavailable.");
} catch (Exception e) {
// expected
} finally {
setup1.getReporter().close();
}
}

@Test
public void canStartTwoReportersWhenUsingPortRange() throws Exception {
String portRange = portRangeProvider.next();
final MetricRegistryImpl portRange1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRange)));
final MetricRegistryImpl portRange2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", portRange)));

assertThat(portRange1.getReporters(), hasSize(1));
assertThat(portRange2.getReporters(), hasSize(1));
ReporterSetup setup1 = createReporterSetup("test1", portRange);
ReporterSetup setup2 = createReporterSetup("test2", portRange);

portRange1.shutdown().get();
portRange2.shutdown().get();
setup1.getReporter().close();
setup2.getReporter().close();
}

private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
Expand All @@ -296,11 +299,11 @@ static HttpResponse<String> pollMetrics(int port) throws UnirestException {
return Unirest.get("http:https://localhost:" + port + "/metrics").asString();
}

static Configuration createConfigWithOneReporter(String reporterName, String portString) {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, PrometheusReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + "." + ARG_PORT, portString);
return cfg;
static ReporterSetup createReporterSetup(String reporterName, String portString) {
MetricConfig metricConfig = new MetricConfig();
metricConfig.setProperty(ARG_PORT, portString);

return ReporterSetup.forReporter(reporterName, metricConfig, new PrometheusReporter());
}

@After
Expand Down
Loading

0 comments on commit 6f78a94

Please sign in to comment.