Skip to content

Commit

Permalink
[FLINK-7933][metrics] Improve PrometheusReporter tests
Browse files Browse the repository at this point in the history
This closes apache#4908.
  • Loading branch information
zentol committed Oct 30, 2017
1 parent 6b8f7dc commit e22c777
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;

import io.prometheus.client.Collector;
import io.prometheus.client.CollectorRegistry;
Expand Down Expand Up @@ -73,8 +74,15 @@ public String filterCharacters(String input) {
private static final String SCOPE_PREFIX = "flink" + SCOPE_SEPARATOR;

private HTTPServer httpServer;
private int port;
private final Map<String, AbstractMap.SimpleImmutableEntry<Collector, Integer>> collectorsWithCountByMetricName = new HashMap<>();

@VisibleForTesting
int getPort() {
Preconditions.checkState(httpServer != null, "Server has not been initialized.");
return port;
}

@VisibleForTesting
static String replaceInvalidChars(final String input) {
// https://prometheus.io/docs/instrumenting/writing_exporters/
Expand All @@ -91,6 +99,7 @@ public void open(MetricConfig config) {
int port = ports.next();
try {
httpServer = new HTTPServer(port);
this.port = port;
LOG.info("Started PrometheusReporter HTTP server on port {}.", port);
break;
} catch (IOException ioe) { //assume port conflict
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.mashape.unirest.http.exceptions.UnirestException;
import io.prometheus.client.CollectorRegistry;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
Expand All @@ -62,7 +63,6 @@ public class PrometheusReporterTaskScopeTest {
private static final int SUBTASK_INDEX_1 = 0;
private static final int SUBTASK_INDEX_2 = 1;

private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9429")));

private final JobID jobId = new JobID();
private final JobVertexID taskId1 = new JobVertexID();
Expand All @@ -72,10 +72,29 @@ public class PrometheusReporterTaskScopeTest {
private final AbstractID taskAttemptId2 = new AbstractID();
private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2};

private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER);
private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER);
private TaskMetricGroup taskMetricGroup1;
private TaskMetricGroup taskMetricGroup2;

private MetricRegistry registry;
private PrometheusReporter reporter;

@Before
public void setupReporter() {
registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
reporter = (PrometheusReporter) registry.getReporters().get(0);

TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME);
taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER);
taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER);
}

@After
public void shutdownRegistry() {
if (registry != null) {
registry.shutdown();
}
}

@Test
public void countersCanBeAddedSeveralTimesIfTheyDifferInLabels() throws UnirestException {
Expand Down Expand Up @@ -137,7 +156,7 @@ public void histogramsCanBeAddedSeveralTimesIfTheyDifferInLabels() throws Unires
taskMetricGroup1.histogram("my_histogram", histogram);
taskMetricGroup2.histogram("my_histogram", histogram);

final String exportedMetrics = pollMetrics().getBody();
final String exportedMetrics = pollMetrics(reporter.getPort()).getBody();
assertThat(exportedMetrics, containsString("subtask_index=\"0\",quantile=\"0.5\",} 0.5")); // histogram
assertThat(exportedMetrics, containsString("subtask_index=\"1\",quantile=\"0.5\",} 0.5")); // histogram

Expand Down Expand Up @@ -179,10 +198,4 @@ private String[] addToArray(String[] array, String element) {
labelNames[LABEL_NAMES.length] = element;
return labelNames;
}

@After
public void shutdownRegistry() {
registry.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
Expand All @@ -40,6 +39,7 @@
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -56,7 +56,6 @@
* Basic test for {@link PrometheusReporter}.
*/
public class PrometheusReporterTest extends TestLogger {
private static final int NON_DEFAULT_PORT = 9429;

private static final String HOST_NAME = "hostname";
private static final String TASK_MANAGER = "tm";
Expand All @@ -70,9 +69,23 @@ public class PrometheusReporterTest extends TestLogger {
@Rule
public ExpectedException thrown = ExpectedException.none();

private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "" + NON_DEFAULT_PORT)));
private final FrontMetricGroup<TaskManagerMetricGroup> metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER));
private final MetricReporter reporter = registry.getReporters().get(0);
private MetricRegistry registry;
private FrontMetricGroup<TaskManagerMetricGroup> metricGroup;
private PrometheusReporter reporter;

@Before
public void setupReporter() {
registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER));
reporter = (PrometheusReporter) registry.getReporters().get(0);
}

@After
public void shutdownRegistry() {
if (registry != null) {
registry.shutdown();
}
}

/**
* {@link io.prometheus.client.Counter} may not decrease, so report {@link Counter} as {@link io.prometheus.client.Gauge}.
Expand Down Expand Up @@ -145,9 +158,11 @@ public void histogramIsReportedAsPrometheusSummary() throws UnirestException {

@Test
public void endpointIsUnavailableAfterReporterIsClosed() throws UnirestException {
MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0);
reporter.close();
thrown.expect(UnirestException.class);
pollMetrics();
pollMetrics(reporter.getPort());
}

@Test
Expand Down Expand Up @@ -229,10 +244,12 @@ class SomeMetricType implements Metric{}

@Test
public void cannotStartTwoReportersOnSamePort() {
final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "12345")));
final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "12345")));

final MetricRegistry fixedPort1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
assertThat(fixedPort1.getReporters(), hasSize(1));

PrometheusReporter firstReporter = (PrometheusReporter) fixedPort1.getReporters().get(0);

final MetricRegistry fixedPort2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort()))));
assertThat(fixedPort2.getReporters(), hasSize(0));

fixedPort1.shutdown();
Expand All @@ -241,8 +258,8 @@ public void cannotStartTwoReportersOnSamePort() {

@Test
public void canStartTwoReportersWhenUsingPortRange() {
final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9249-9252")));
final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9249-9252")));
final MetricRegistry portRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9200-9300")));
final MetricRegistry portRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9200-9300")));

assertThat(portRange1.getReporters(), hasSize(1));
assertThat(portRange2.getReporters(), hasSize(1));
Expand All @@ -251,28 +268,13 @@ public void canStartTwoReportersWhenUsingPortRange() {
portRange2.shutdown();
}

@Test
public void cannotStartThreeReportersWhenPortRangeIsTooSmall() {
final MetricRegistry smallPortRange1 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9253-9254")));
final MetricRegistry smallPortRange2 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", "9253-9254")));
final MetricRegistry smallPortRange3 = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test3", "9253-9254")));

assertThat(smallPortRange1.getReporters(), hasSize(1));
assertThat(smallPortRange2.getReporters(), hasSize(1));
assertThat(smallPortRange3.getReporters(), hasSize(0));

smallPortRange1.shutdown();
smallPortRange2.shutdown();
smallPortRange3.shutdown();
}

private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
reporter.notifyOfAddedMetric(metric, metricName, metricGroup);
return pollMetrics().getBody();
return pollMetrics(reporter.getPort()).getBody();
}

static HttpResponse<String> pollMetrics() throws UnirestException {
return Unirest.get("http:https://localhost:" + NON_DEFAULT_PORT + "/metrics").asString();
static HttpResponse<String> pollMetrics(int port) throws UnirestException {
return Unirest.get("http:https://localhost:" + port + "/metrics").asString();
}

static Configuration createConfigWithOneReporter(String reporterName, String portString) {
Expand All @@ -285,7 +287,6 @@ static Configuration createConfigWithOneReporter(String reporterName, String por

@After
public void closeReporterAndShutdownRegistry() {
reporter.close();
registry.shutdown();
}
}

0 comments on commit e22c777

Please sign in to comment.