Skip to content

Commit

Permalink
[FLINK-11922][metrics] Add utils for backwards compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed May 15, 2019
1 parent e63e4b9 commit 81b0464
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics.reporter;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation for {@link MetricReporter MetricReporters} that support factories but want to maintain
* backwards-compatibility with existing reflection-based configurations.
*
* <p>When an annotated reporter is configured to be used via reflection the given factory will be used instead.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface InstantiateViaFactory {
String factoryClassName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;

Expand Down Expand Up @@ -206,8 +207,7 @@ private static Optional<MetricReporter> loadReporter(
}

if (reporterClassName != null) {
final Class<?> reporterClass = Class.forName(reporterClassName);
return Optional.of((MetricReporter) reporterClass.newInstance());
return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories);
}

LOG.warn("No reporter class nor factory set for reporter {}. Metrics might not be exposed/reported.", reporterName);
Expand All @@ -232,4 +232,29 @@ private static Optional<MetricReporter> loadViaFactory(
return Optional.of(factory.createMetricReporter(metricConfig));
}
}

private static Optional<MetricReporter> loadViaReflection(
final String reporterClassName,
final String reporterName,
final Configuration reporterConfig,
final Map<String, MetricReporterFactory> reporterFactories) throws ClassNotFoundException, IllegalAccessException, InstantiationException {

final Class<?> reporterClass = Class.forName(reporterClassName);

final InstantiateViaFactory alternativeFactoryAnnotation = reporterClass.getAnnotation(InstantiateViaFactory.class);
if (alternativeFactoryAnnotation != null) {
final String alternativeFactoryClassName = alternativeFactoryAnnotation.factoryClassName();
LOG.info("The reporter configuration of {} is out-dated (but still supported)." +
" Please configure a factory class instead: '{}{}.{}: {}' to ensure that the configuration" +
" continues to work with future versions.",
reporterName,
ConfigConstants.METRICS_REPORTER_PREFIX,
reporterName,
ConfigConstants.METRICS_REPORTER_FACTORY_CLASS_SUFFIX,
alternativeFactoryClassName);
return loadViaFactory(alternativeFactoryClassName, reporterName, reporterConfig, reporterFactories);
}

return Optional.of((MetricReporter) reporterClass.newInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.InstantiateViaFactory;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.runtime.metrics.util.TestReporter;
Expand Down Expand Up @@ -302,6 +303,24 @@ public void testFactoryArgumentForwarding() throws Exception {
assertEquals("hello", passedConfig.getProperty("arg"));
}

/**
* Verifies that the factory approach is used if the factory is annotated with {@link InstantiateViaFactory}.
*/
@Test
public void testFactoryAnnotation() {
final Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter2.class.getName());

final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config);

assertEquals(1, reporterSetups.size());

final ReporterSetup reporterSetup = reporterSetups.get(0);
final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter();

assertTrue(metricReporter.createdByFactory);
}

/**
* Factory that exposed the last provided metric config.
*/
Expand Down Expand Up @@ -370,4 +389,11 @@ public boolean isCreatedByFactory() {
return createdByFactory;
}
}

/**
* Annotated reporter that exposes which constructor was called.
*/
@InstantiateViaFactory(factoryClassName = "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory")
protected static class InstantiationTypeTrackingTestReporter2 extends InstantiationTypeTrackingTestReporter {
}
}

0 comments on commit 81b0464

Please sign in to comment.