Skip to content

Commit

Permalink
[FLINK-27206][metrics] Remove reflection annotations from reporters
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Apr 20, 2022
1 parent e77cd23 commit 200d0c1
Show file tree
Hide file tree
Showing 19 changed files with 20 additions and 111 deletions.
6 changes: 3 additions & 3 deletions docs/content.zh/docs/deployment/metric_reporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes: job_id;task_attempt_num
metrics.reporter.my_jmx_reporter.scope.variables.additional: cluster_name:my_test_cluster,tag_name:tag_value

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
```
Expand Down Expand Up @@ -180,7 +180,7 @@ Parameters:
Example configuration:

```yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
```

Flink metric types are mapped to Prometheus metric types as follows:
Expand All @@ -206,7 +206,7 @@ Parameters:
Example configuration:

```yaml
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http:https://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
Expand Down
6 changes: 3 additions & 3 deletions docs/content/docs/deployment/metric_reporters.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes: job_id;task_attempt_num
metrics.reporter.my_jmx_reporter.scope.variables.additional: cluster_name:my_test_cluster,tag_name:tag_value

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.factory.class: org.apache.flink.metrics.graphite.GraphiteReporterFactory
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
```
Expand Down Expand Up @@ -180,7 +180,7 @@ Parameters:
Example configuration:

```yaml
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
```

Flink metric types are mapped to Prometheus metric types as follows:
Expand All @@ -206,7 +206,7 @@ Parameters:
Example configuration:

```yaml
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.factory.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
metrics.reporter.promgateway.hostUrl: http:https://localhost:9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.prometheus.PrometheusReporter;
import org.apache.flink.metrics.prometheus.PrometheusReporterFactory;
import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
Expand Down Expand Up @@ -60,8 +59,6 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase.TestParams.InstantiationType.FACTORY;
import static org.apache.flink.metrics.prometheus.tests.PrometheusReporterEndToEndITCase.TestParams.InstantiationType.REFLECTION;
import static org.apache.flink.tests.util.AutoClosableProcess.runBlocking;
import static org.apache.flink.tests.util.AutoClosableProcess.runNonBlocking;

Expand Down Expand Up @@ -131,65 +128,37 @@ public static Collection<TestParams> testParameters() {
builder.moveJar(
PROMETHEUS_JAR_PREFIX,
JarLocation.PLUGINS,
JarLocation.LIB),
REFLECTION),
TestParams.from(
"Jar in 'lib'",
builder ->
builder.moveJar(
PROMETHEUS_JAR_PREFIX,
JarLocation.PLUGINS,
JarLocation.LIB),
FACTORY),
TestParams.from("Jar in 'plugins'", builder -> {}, REFLECTION),
TestParams.from("Jar in 'plugins'", builder -> {}, FACTORY),
JarLocation.LIB)),
TestParams.from("Jar in 'plugins'", builder -> {}),
TestParams.from(
"Jar in 'lib' and 'plugins'",
builder -> {
builder.copyJar(
PROMETHEUS_JAR_PREFIX, JarLocation.PLUGINS, JarLocation.LIB);
},
REFLECTION),
TestParams.from(
"Jar in 'lib' and 'plugins'",
builder -> {
builder.copyJar(
PROMETHEUS_JAR_PREFIX, JarLocation.PLUGINS, JarLocation.LIB);
},
FACTORY));
}));
}

@Rule public final FlinkResource dist;

public PrometheusReporterEndToEndITCase(TestParams params) {
final FlinkResourceSetup.FlinkResourceSetupBuilder builder = FlinkResourceSetup.builder();
params.getBuilderSetup().accept(builder);
builder.addConfiguration(getFlinkConfig(params.getInstantiationType()));
builder.addConfiguration(getFlinkConfig());
dist = new LocalStandaloneFlinkResourceFactory().create(builder.build());
}

@Rule public final TemporaryFolder tmp = new TemporaryFolder();

@Rule public final DownloadCache downloadCache = DownloadCache.get();

private static Configuration getFlinkConfig(TestParams.InstantiationType instantiationType) {
private static Configuration getFlinkConfig() {
final Configuration config = new Configuration();

switch (instantiationType) {
case FACTORY:
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "prom."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
PrometheusReporterFactory.class.getName());
break;
case REFLECTION:
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "prom."
+ MetricOptions.REPORTER_CLASS.key(),
PrometheusReporter.class.getCanonicalName());
}
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "prom."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
PrometheusReporterFactory.class.getName());

config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100");
return config;
Expand Down Expand Up @@ -324,42 +293,27 @@ private static void checkMetricAvailability(final OkHttpClient client, final Str
static class TestParams {
private final String jarLocationDescription;
private final Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup;
private final InstantiationType instantiationType;

private TestParams(
String jarLocationDescription,
Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup,
InstantiationType instantiationType) {
Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup) {
this.jarLocationDescription = jarLocationDescription;
this.builderSetup = builderSetup;
this.instantiationType = instantiationType;
}

public static TestParams from(
String jarLocationDesription,
Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup,
InstantiationType instantiationType) {
return new TestParams(jarLocationDesription, builderSetup, instantiationType);
Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> builderSetup) {
return new TestParams(jarLocationDesription, builderSetup);
}

public Consumer<FlinkResourceSetup.FlinkResourceSetupBuilder> getBuilderSetup() {
return builderSetup;
}

public InstantiationType getInstantiationType() {
return instantiationType;
}

@Override
public String toString() {
return jarLocationDescription
+ ", instantiated via "
+ instantiationType.name().toLowerCase();
}

public enum InstantiationType {
REFLECTION,
FACTORY
return jarLocationDescription;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;

Expand All @@ -44,8 +43,6 @@
*
* <p>Variables in metrics scope will be sent to Datadog as tags.
*/
@InstantiateViaFactory(
factoryClassName = "org.apache.flink.metrics.datadog.DatadogHttpReporterFactory")
public class DatadogHttpReporter implements MetricReporter, Scheduled {
private static final Logger LOGGER = LoggerFactory.getLogger(DatadogHttpReporter.class);
private static final String HOST_VARIABLE = "<host>";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

package org.apache.flink.metrics.datadog;

import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;

import java.util.Properties;

/** {@link MetricReporterFactory} for {@link DatadogHttpReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.datadog.DatadogHttpReporter")
public class DatadogHttpReporterFactory implements MetricReporterFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;

import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.graphite.Graphite;
Expand All @@ -34,8 +33,6 @@
* allows using it as a Flink reporter.
*/
@PublicEvolving
@InstantiateViaFactory(
factoryClassName = "org.apache.flink.metrics.graphite.GraphiteReporterFactory")
public class GraphiteReporter extends ScheduledDropwizardReporter {

public static final String ARG_PROTOCOL = "protocol";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

package org.apache.flink.metrics.graphite;

import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;

import java.util.Properties;

/** {@link MetricReporterFactory} for {@link GraphiteReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.graphite.GraphiteReporter")
public class GraphiteReporterFactory implements MetricReporterFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.util.NetUtils;
Expand Down Expand Up @@ -57,8 +56,6 @@
import static org.apache.flink.metrics.influxdb.InfluxdbReporterOptions.getString;

/** {@link MetricReporter} that exports {@link Metric Metrics} via InfluxDB. */
@InstantiateViaFactory(
factoryClassName = "org.apache.flink.metrics.influxdb.InfluxdbReporterFactory")
public class InfluxdbReporter extends AbstractReporter<MeasurementInfo> implements Scheduled {

private String database;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

package org.apache.flink.metrics.influxdb;

import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;

import java.util.Properties;

/** {@link MetricReporterFactory} for {@link InfluxdbReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.influxdb.InfluxdbReporter")
public class InfluxdbReporterFactory implements MetricReporterFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;

import org.slf4j.Logger;
Expand All @@ -55,7 +54,6 @@
* <p>Largely based on the JmxReporter class of the dropwizard metrics library
* https://github.com/dropwizard/metrics/blob/master/metrics-core/src/main/java/io/dropwizard/metrics/JmxReporter.java
*/
@InstantiateViaFactory(factoryClassName = "org.apache.flink.metrics.jmx.JMXReporterFactory")
public class JMXReporter implements MetricReporter {

static final String JMX_DOMAIN_PREFIX = "org.apache.flink.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.flink.metrics.jmx;

import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;

import java.util.Properties;

/** {@link MetricReporterFactory} for {@link JMXReporter}. */
@InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.metrics.jmx.JMXReporter")
public class JMXReporterFactory implements MetricReporterFactory {

static final String ARG_PORT = "port";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
import org.apache.flink.util.Preconditions;
Expand All @@ -37,9 +36,6 @@
* {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus {@link PushGateway}.
*/
@PublicEvolving
@InstantiateViaFactory(
factoryClassName =
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory")
public class PrometheusPushGatewayReporter extends AbstractPrometheusReporter implements Scheduled {

private final PushGateway pushGateway;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.StringUtils;
Expand All @@ -43,8 +42,6 @@
import static org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterOptions.RANDOM_JOB_NAME_SUFFIX;

/** {@link MetricReporterFactory} for {@link PrometheusPushGatewayReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter")
public class PrometheusPushGatewayReporterFactory implements MetricReporterFactory {

private static final Logger LOG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.util.Preconditions;

Expand All @@ -32,8 +31,6 @@

/** {@link MetricReporter} that exports {@link Metric Metrics} via Prometheus. */
@PublicEvolving
@InstantiateViaFactory(
factoryClassName = "org.apache.flink.metrics.prometheus.PrometheusReporterFactory")
public class PrometheusReporter extends AbstractPrometheusReporter {

private HTTPServer httpServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package org.apache.flink.metrics.prometheus;

import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.util.NetUtils;

import java.util.Iterator;
import java.util.Properties;

/** {@link MetricReporterFactory} for {@link PrometheusReporter}. */
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.prometheus.PrometheusReporter")
public class PrometheusReporterFactory implements MetricReporterFactory {

static final String ARG_PORT = "port";
Expand Down
Loading

0 comments on commit 200d0c1

Please sign in to comment.