Skip to content

Commit

Permalink
[FLINK-11922][metrics] Support MetricReporter factories
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 15, 2019
1 parent 7534b6c commit e63e4b9
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 20 deletions.
6 changes: 4 additions & 2 deletions docs/monitoring/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,14 @@ reporters will be instantiated on each job and task manager when they are starte

- `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`.
- `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`.
- `metrics.reporter.<name>.factory.class`: The reporter factory class to use for the reporter named `<name>`.
- `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`.
- `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`.
- `metrics.reporters`: (optional) A comma-separated include list of reporter names. By default all configured reporters will be used.

All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below,
we will list more settings specific to each reporter.
All reporters must at least have either the `class` or `factory.class` property. Which property may/should be used depends on the reporter implementation. See the individual reporter configuration sections for more information.
Some reporters (referred to as `Scheduled`) allow specifying a reporting `interval`.
Below more settings specific to each reporter will be listed.

Example reporter configuration that specifies multiple reporters:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,9 @@ public final class ConfigConstants {
/** The class of the reporter to use. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_CLASS_SUFFIX = "class";

/** The class of the reporter factory to use. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_FACTORY_CLASS_SUFFIX = "factory.class";

/** The interval between reports. This is used as a suffix in an actual reporter config */
public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
/**
* Reporters are used to export {@link Metric Metrics} to an external backend.
*
* <p>Reporters are instantiated via reflection and must be public, non-abstract, and have a
* public no-argument constructor.
* <p>Reporters are instantiated either
* a) via reflection, in which case they must be public, non-abstract, and have a public no-argument constructor.
* b) via a {@link MetricReporterFactory}, in which case no restrictions apply. (recommended)
*
* <p>Reporters are neither required nor encouraged to support both instantiation paths.
*/
public interface MetricReporter {

Expand All @@ -35,8 +38,11 @@ public interface MetricReporter {
// ------------------------------------------------------------------------

/**
* Configures this reporter. Since reporters are instantiated generically and hence parameter-less,
* this method is the place where the reporters set their basic fields based on configuration values.
* Configures this reporter.
*
* <p>If the reporter was instantiated generically and hence parameter-less,
* this method is the place where the reporter sets it's basic fields based on configuration values.
* Otherwise, this method will typically be a no-op since resources can be acquired in the constructor.
*
* <p>This method is always called first on a newly instantiated reporter.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.reporter;

import java.util.Properties;

/**
* {@link MetricReporter} factory.
*/
public interface MetricReporterFactory {

/**
* Creates a new metric reporter.
*
* @param properties configured properties for the reporter
* @return created metric reporter
*/
MetricReporter createMetricReporter(final Properties properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
Expand All @@ -54,7 +60,7 @@ public final class ReporterSetup {
Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
// [\S&&[^.]] = intersection of non-whitespace and non-period character classes
"([\\S&&[^.]]*)\\." +
Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX));
'(' + Pattern.quote(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX) + '|' + Pattern.quote(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX) + ')');

private final String name;
private final MetricConfig configuration;
Expand Down Expand Up @@ -142,33 +148,88 @@ public static List<ReporterSetup> fromConfiguration(final Configuration configur
configuration,
ConfigConstants.METRICS_REPORTER_PREFIX + namedReporter + '.');

reporterConfigurations.add(Tuple2.of(namedReporter, (Configuration) delegatingConfiguration));
reporterConfigurations.add(Tuple2.of(namedReporter, delegatingConfiguration));
}

final Map<String, MetricReporterFactory> reporterFactories = loadReporterFactories();
List<ReporterSetup> reporterArguments = new ArrayList<>(reporterConfigurations.size());
for (Tuple2<String, Configuration> reporterConfiguration: reporterConfigurations) {
String reporterName = reporterConfiguration.f0;
Configuration reporterConfig = reporterConfiguration.f1;

try {
final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
if (reporterClassName == null) {
LOG.error("No reporter class set for reporter " + reporterName + ". Metrics might not be exposed/reported.");
continue;
}

Class<?> reporterClass = Class.forName(reporterClassName);
MetricReporter reporter = (MetricReporter) reporterClass.newInstance();

MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);
Optional<MetricReporter> metricReporterOptional = loadReporter(reporterName, reporterConfig, reporterFactories);
metricReporterOptional.ifPresent(reporter -> {
MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);

reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
reporterArguments.add(createReporterSetup(reporterName, metricConfig, reporter));
});
}
catch (Throwable t) {
LOG.error("Could not instantiate metrics reporter {}. Metrics might not be exposed/reported.", reporterName, t);
}
}
return reporterArguments;
}

private static Map<String, MetricReporterFactory> loadReporterFactories() {
final ServiceLoader<MetricReporterFactory> serviceLoader = ServiceLoader.load(MetricReporterFactory.class);

final Map<String, MetricReporterFactory> reporterFactories = new HashMap<>(2);
final Iterator<MetricReporterFactory> factoryIterator = serviceLoader.iterator();
// do not use streams or for-each loops here because they do not allow catching individual ServiceConfigurationErrors
// such an error might be caused if the META-INF/services contains an entry to a non-existing factory class
while (factoryIterator.hasNext()) {
try {
MetricReporterFactory factory = factoryIterator.next();
reporterFactories.put(factory.getClass().getName(), factory);
} catch (Exception | ServiceConfigurationError e) {
LOG.warn("Error while loading reporter factory.", e);
}
}

return Collections.unmodifiableMap(reporterFactories);
}

private static Optional<MetricReporter> loadReporter(
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories)
throws ClassNotFoundException, IllegalAccessException, InstantiationException {

final String reporterClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
final String factoryClassName = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX, null);

if (factoryClassName != null) {
return loadViaFactory(factoryClassName, reporterName, reporterConfig, reporterFactories);
}

if (reporterClassName != null) {
final Class<?> reporterClass = Class.forName(reporterClassName);
return Optional.of((MetricReporter) reporterClass.newInstance());
}

LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
return Optional.empty();
}

private static Optional<MetricReporter> loadViaFactory(
final String factoryClassName,
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories) {

MetricReporterFactory factory = reporterFactories.get(factoryClassName);

if (factory == null) {
LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: ", factoryClassName, reporterName, reporterFactories.keySet());
return Optional.empty();
} else {
final MetricConfig metricConfig = new MetricConfig();
reporterConfig.addAllToProperties(metricConfig);

return Optional.of(factory.createMetricReporter(metricConfig));
}
}
}
Loading

0 comments on commit e63e4b9

Please sign in to comment.