From e29ec0fbd2cb03a42b98142f63ce73b97dc2e915 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 16 Feb 2018 08:12:04 +0100 Subject: [PATCH] [FLINK-8670] Make MetricRegistryImpl#shutdown non blocking This commit makes the MetricRegistryImpl#shutdown method non blocking. Instead of waiting for the completion of the shutdown procedure, the method returns a future which is completed once the metric registry has completed the shut down. This closes #5504. --- .../org/apache/flink/util/ExecutorUtils.java | 21 +++ .../MesosApplicationMasterRunner.java | 2 +- .../ScheduledDropwizardReporterTest.java | 4 +- .../DropwizardFlinkHistogramWrapperTest.java | 2 +- .../flink/metrics/jmx/JMXReporterTest.java | 8 +- .../PrometheusReporterTaskScopeTest.java | 4 +- .../prometheus/PrometheusReporterTest.java | 20 +-- .../metrics/slf4j/Slf4jReporterTest.java | 4 +- .../metrics/statsd/StatsDReporterTest.java | 8 +- .../apache/flink/runtime/akka/ActorUtils.java | 89 +++++++++++++ .../runtime/metrics/MetricRegistryImpl.java | 122 ++++++++++-------- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../minicluster/FlinkMiniCluster.scala | 2 +- .../runtime/taskmanager/TaskManager.scala | 2 +- .../metrics/MetricRegistryImplTest.java | 42 +++--- .../metrics/TaskManagerMetricsTest.java | 2 +- .../groups/AbstractMetricGroupTest.java | 10 +- .../metrics/groups/JobManagerGroupTest.java | 16 +-- .../groups/JobManagerJobGroupTest.java | 12 +- .../groups/MetricGroupRegistrationTest.java | 8 +- .../metrics/groups/MetricGroupTest.java | 4 +- .../metrics/groups/OperatorGroupTest.java | 37 +++--- .../metrics/groups/TaskManagerGroupTest.java | 30 +++-- .../groups/TaskManagerJobGroupTest.java | 28 ++-- .../metrics/groups/TaskMetricGroupTest.java | 34 +++-- .../yarn/YarnApplicationMasterRunner.java | 2 +- 26 files changed, 334 insertions(+), 181 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java index d98bdd29cbd80..0a7f1613a8089 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java @@ -21,6 +21,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -74,4 +75,24 @@ public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService } } } + + /** + * Shuts the given {@link ExecutorService} down in a non-blocking fashion. The shut down will + * be executed by a thread from the common fork-join pool. + * + *

The executor services will be shut down gracefully for the given timeout period. Afterwards + * {@link ExecutorService#shutdownNow()} will be called. + * + * @param timeout before {@link ExecutorService#shutdownNow()} is called + * @param unit time unit of the timeout + * @param executorServices to shut down + * @return Future which is completed once the {@link ExecutorService} are shut down + */ + public static CompletableFuture nonBlockingShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) { + return CompletableFuture.supplyAsync( + () -> { + gracefulShutdown(timeout, unit, executorServices); + return null; + }); + } } diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 94804aceaf60f..630fa8390003a 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -433,7 +433,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie if (metricRegistry != null) { try { - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } catch (Throwable t) { LOG.error("Could not shut down metric registry.", t); } diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java index 4a2ca3ab34d54..b69b8d8e3fa51 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java @@ -74,7 +74,7 @@ public ScheduledReporter getReporter(MetricConfig config) { * Tests that the registered metrics' names don't contain invalid characters. */ @Test - public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException { + public void testAddingMetrics() throws Exception { Configuration configuration = new Configuration(); String taskName = "test\"Ta\"..sk"; String jobName = "testJ\"ob:-!ax..?"; @@ -131,7 +131,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept assertEquals(expectedCounterName, counters.get(myCounter)); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java index fb21a75b86ddc..d23a22ca30df4 100644 --- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java +++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java @@ -150,7 +150,7 @@ public void testDropwizardHistogramWrapperReporting() throws Exception { assertEquals(0, testingReporter.getMetrics().size()); } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } } diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java index 40b7f158639fb..6e4564671c595 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java @@ -145,7 +145,7 @@ public Integer getValue() { rep1.notifyOfRemovedMetric(g2, "rep2", null); mg.close(); - reg.shutdown(); + reg.shutdown().get(); } /** @@ -219,7 +219,7 @@ public Integer getValue() { rep1.close(); rep2.close(); mg.close(); - reg.shutdown(); + reg.shutdown().get(); } /** @@ -266,7 +266,7 @@ public void testHistogramReporting() throws Exception { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } } @@ -306,7 +306,7 @@ public void testMeterReporting() throws Exception { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } } diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java index d4ad1f9ea0d14..724a79b86eb85 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java @@ -90,9 +90,9 @@ public void setupReporter() { } @After - public void shutdownRegistry() { + public void shutdownRegistry() throws Exception { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java index 6833a0661b931..e9fd985e4ac2c 100644 --- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java +++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java @@ -84,9 +84,9 @@ public void setupReporter() { } @After - public void shutdownRegistry() { + public void shutdownRegistry() throws Exception { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } } @@ -237,7 +237,7 @@ class SomeMetricType implements Metric{} } @Test - public void cannotStartTwoReportersOnSamePort() { + public void cannotStartTwoReportersOnSamePort() throws Exception { final MetricRegistryImpl fixedPort1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRangeProvider.next()))); assertThat(fixedPort1.getReporters(), hasSize(1)); @@ -246,12 +246,12 @@ public void cannotStartTwoReportersOnSamePort() { final MetricRegistryImpl fixedPort2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort())))); assertThat(fixedPort2.getReporters(), hasSize(0)); - fixedPort1.shutdown(); - fixedPort2.shutdown(); + fixedPort1.shutdown().get(); + fixedPort2.shutdown().get(); } @Test - public void canStartTwoReportersWhenUsingPortRange() { + public void canStartTwoReportersWhenUsingPortRange() throws Exception { String portRange = portRangeProvider.next(); final MetricRegistryImpl portRange1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRange))); final MetricRegistryImpl portRange2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", portRange))); @@ -259,8 +259,8 @@ public void canStartTwoReportersWhenUsingPortRange() { assertThat(portRange1.getReporters(), hasSize(1)); assertThat(portRange2.getReporters(), hasSize(1)); - portRange1.shutdown(); - portRange2.shutdown(); + portRange1.shutdown().get(); + portRange2.shutdown().get(); } private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException { @@ -280,8 +280,8 @@ static Configuration createConfigWithOneReporter(String reporterName, String por } @After - public void closeReporterAndShutdownRegistry() { - registry.shutdown(); + public void closeReporterAndShutdownRegistry() throws Exception { + registry.shutdown().get(); } /** diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java index b344f45bca608..172c79c775f50 100644 --- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java +++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java @@ -76,8 +76,8 @@ public static void setUp() { } @AfterClass - public static void tearDown() { - registry.shutdown(); + public static void tearDown() throws Exception { + registry.shutdown().get(); } @Test diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 08d4998c859f2..c9f5af07a72a1 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -73,7 +73,7 @@ public void testReplaceInvalidChars() throws NoSuchMethodException, InvocationTa * Tests that the registered metrics' names don't contain invalid characters. */ @Test - public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException { + public void testAddingMetrics() throws Exception { Configuration configuration = new Configuration(); String taskName = "testTask"; String jobName = "testJob:-!ax..?"; @@ -124,7 +124,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept assertEquals(expectedCounterName, counters.get(myCounter)); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** @@ -187,7 +187,7 @@ public void testStatsDHistogramReporting() throws Exception { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } if (receiver != null) { @@ -247,7 +247,7 @@ public void testStatsDMetersReporting() throws Exception { } finally { if (registry != null) { - registry.shutdown(); + registry.shutdown().get(); } if (receiver != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java new file mode 100644 index 0000000000000..f2f905971c332 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka; + +import org.apache.flink.runtime.concurrent.FutureUtils; + +import akka.actor.ActorRef; +import akka.actor.Kill; +import akka.pattern.Patterns; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +/** + * Utility functions for the interaction with Akka {@link akka.actor.Actor}. + */ +public class ActorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class); + + /** + * Shuts the given {@link akka.actor.Actor} down in a non blocking fashion. The method first tries to + * gracefully shut them down. If this is not successful, then the actors will be terminated by sending + * a {@link akka.actor.Kill} message. + * + * @param gracePeriod for the graceful shutdown + * @param timeUnit time unit of the grace period + * @param actors to shut down + * @return Future which is completed once all actors have been shut down gracefully or forceful + * kill messages have been sent to all actors. Occurring errors will be suppressed into one error. + */ + public static CompletableFuture nonBlockingShutDown(long gracePeriod, TimeUnit timeUnit, ActorRef... actors) { + final Collection> terminationFutures = new ArrayList<>(actors.length); + final FiniteDuration timeout = new FiniteDuration(gracePeriod, timeUnit); + + for (ActorRef actor : actors) { + try { + final Future booleanFuture = Patterns.gracefulStop(actor, timeout); + final CompletableFuture terminationFuture = FutureUtils.toJava(booleanFuture) + .thenApply(ignored -> null) + .exceptionally((Throwable throwable) -> { + if (throwable instanceof TimeoutException) { + // the actor did not gracefully stop within the grace period --> Let's kill him + actor.tell(Kill.getInstance(), ActorRef.noSender()); + return null; + } else { + throw new CompletionException(throwable); + } + }); + + terminationFutures.add(terminationFuture); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the actor down + LOG.debug("The actor {} has already been stopped because the " + + "underlying ActorSystem has already been shut down.", actor.path()); + } + } + + return FutureUtils.completeAll(terminationFutures); + } + + private ActorUtils() {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java index c8f449022b285..6b3770907a94a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.metrics; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -28,35 +29,36 @@ import org.apache.flink.metrics.View; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; +import org.apache.flink.runtime.akka.ActorUtils; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import akka.actor.ActorRef; import akka.actor.ActorSystem; -import akka.actor.Kill; -import akka.pattern.Patterns; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - /** * A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the * connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}. @@ -66,8 +68,14 @@ public class MetricRegistryImpl implements MetricRegistry { private final Object lock = new Object(); - private List reporters; - private ScheduledExecutorService executor; + private final List reporters; + private final ScheduledExecutorService executor; + + private final ScopeFormats scopeFormats; + private final char globalDelimiter; + private final List delimiters; + + private final CompletableFuture terminationFuture; @Nullable private ActorRef queryService; @@ -77,9 +85,7 @@ public class MetricRegistryImpl implements MetricRegistry { private ViewUpdater viewUpdater; - private final ScopeFormats scopeFormats; - private final char globalDelimiter; - private final List delimiters = new ArrayList<>(); + private boolean isShutdown; /** * Creates a new MetricRegistry and starts the configured reporter. @@ -87,9 +93,12 @@ public class MetricRegistryImpl implements MetricRegistry { public MetricRegistryImpl(MetricRegistryConfiguration config) { this.scopeFormats = config.getScopeFormats(); this.globalDelimiter = config.getDelimiter(); + this.delimiters = new ArrayList<>(10); + this.terminationFuture = new CompletableFuture<>(); + this.isShutdown = false; // second, instantiate any custom configured reporters - this.reporters = new ArrayList<>(); + this.reporters = new ArrayList<>(4); List> reporterConfigurations = config.getReporterConfigurations(); @@ -226,71 +235,72 @@ public List getReporters() { */ public boolean isShutdown() { synchronized (lock) { - return reporters == null && executor.isShutdown(); + return isShutdown; } } /** * Shuts down this registry and the associated {@link MetricReporter}. + * + *

NOTE: This operation is asynchronous and returns a future which is completed + * once the shutdown operation has been completed. + * + * @return Future which is completed once the {@link MetricRegistryImpl} + * is shut down. */ - public void shutdown() { + public CompletableFuture shutdown() { synchronized (lock) { - Future stopFuture = null; - FiniteDuration stopTimeout = null; + if (isShutdown) { + return terminationFuture; + } else { + isShutdown = true; + final Collection> terminationFutures = new ArrayList<>(3); + final Time gracePeriod = Time.seconds(1L); - if (queryService != null) { - stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); + if (queryService != null) { + final CompletableFuture queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown( + gracePeriod.toMilliseconds(), + TimeUnit.MILLISECONDS, + queryService); - try { - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); - } catch (IllegalStateException ignored) { - // this can happen if the underlying actor system has been stopped before shutting - // the metric registry down - // TODO: Pull the MetricQueryService actor out of the MetricRegistry - LOG.debug("The metric query service actor has already been stopped because the " + - "underlying ActorSystem has already been shut down."); + terminationFutures.add(queryServiceTerminationFuture); } - } - if (reporters != null) { + Throwable throwable = null; for (MetricReporter reporter : reporters) { try { reporter.close(); } catch (Throwable t) { - LOG.warn("Metrics reporter did not shut down cleanly", t); + throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } } - reporters = null; - } - shutdownExecutor(); - - if (stopFuture != null) { - boolean stopped = false; - - try { - stopped = Await.result(stopFuture, stopTimeout); - } catch (Exception e) { - LOG.warn("Query actor did not properly stop.", e); - } + reporters.clear(); - if (!stopped) { - // the query actor did not stop in time, let's kill him - queryService.tell(Kill.getInstance(), ActorRef.noSender()); + if (throwable != null) { + terminationFutures.add( + FutureUtils.completedExceptionally( + new FlinkException("Could not shut down the metric reporters properly.", throwable))); } - } - } - } - private void shutdownExecutor() { - if (executor != null) { - executor.shutdown(); + final CompletableFuture executorShutdownFuture = ExecutorUtils.nonBlockingShutdown( + gracePeriod.toMilliseconds(), + TimeUnit.MILLISECONDS, + executor); + + terminationFutures.add(executorShutdownFuture); + + FutureUtils + .completeAll(terminationFutures) + .whenComplete( + (Void ignored, Throwable error) -> { + if (error != null) { + terminationFuture.completeExceptionally(error); + } else { + terminationFuture.complete(null); + } + }); - try { - if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) { - executor.shutdownNow(); - } - } catch (InterruptedException e) { - executor.shutdownNow(); + return terminationFuture; } } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d0e401cea5db4..1dfaa5d344bf7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -2054,7 +2054,7 @@ object JobManager { } try { - metricRegistry.shutdown() + metricRegistry.shutdown().get() } catch { case t: Throwable => LOG.warn("Could not properly shut down the metric registry.", t) diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index 7948ba177e22a..6c9ee5ba6d6ae 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -468,7 +468,7 @@ abstract class FlinkMiniCluster( Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout) - metricRegistryOpt.foreach(_.shutdown()) + metricRegistryOpt.foreach(_.shutdown().get()) if (!useSingleActorSystem) { taskManagerActorSystems foreach { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 106bea14ff8c1..485add57c6cfb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1900,7 +1900,7 @@ object TaskManager { // shut down the metric query service try { - metricRegistry.shutdown() + metricRegistry.shutdown().get() } catch { case t: Throwable => LOG.error("Could not properly shut down the metric registry.", t) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java index 2eccc0cb7a04d..adb622d7e1c19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java @@ -59,12 +59,12 @@ public class MetricRegistryImplTest extends TestLogger { private static final char GLOBAL_DEFAULT_DELIMITER = '.'; @Test - public void testIsShutdown() { + public void testIsShutdown() throws Exception { MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); Assert.assertFalse(metricRegistry.isShutdown()); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); Assert.assertTrue(metricRegistry.isShutdown()); } @@ -73,7 +73,7 @@ public void testIsShutdown() { * Verifies that the reporter name list is correctly used to determine which reporters should be instantiated. */ @Test - public void testReporterInclusion() { + public void testReporterInclusion() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.REPORTERS_LIST, "test"); @@ -87,7 +87,7 @@ public void testReporterInclusion() { Assert.assertTrue(TestReporter1.wasOpened); Assert.assertFalse(TestReporter11.wasOpened); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** @@ -106,7 +106,7 @@ public void open(MetricConfig config) { * Verifies that multiple reporters are instantiated correctly. */ @Test - public void testMultipleReporterInstantiation() { + public void testMultipleReporterInstantiation() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName()); @@ -121,7 +121,7 @@ public void testMultipleReporterInstantiation() { Assert.assertTrue(TestReporter12.wasOpened); Assert.assertTrue(TestReporter13.wasOpened); - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } /** @@ -164,14 +164,14 @@ public void open(MetricConfig config) { * Verifies that configured arguments are properly forwarded to the reporter. */ @Test - public void testReporterArgumentForwarding() { + public void testReporterArgumentForwarding() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world"); - new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown(); + new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown().get(); Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null)); Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null)); @@ -190,11 +190,9 @@ public void open(MetricConfig config) { /** * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics. - * - * @throws InterruptedException */ @Test - public void testReporterScheduling() throws InterruptedException { + public void testReporterScheduling() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName()); @@ -225,7 +223,7 @@ public void testReporterScheduling() throws InterruptedException { } Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); - registry.shutdown(); + registry.shutdown().get(); } /** @@ -244,7 +242,7 @@ public void report() { * Verifies that reporters are notified of added/removed metrics. */ @Test - public void testReporterNotifications() { + public void testReporterNotifications() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName()); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName()); @@ -268,7 +266,7 @@ public void testReporterNotifications() { assertTrue(TestReporter7.removedMetric instanceof Counter); assertEquals("rootCounter", TestReporter7.removedMetricName); - registry.shutdown(); + registry.shutdown().get(); } /** @@ -338,7 +336,7 @@ public void testScopeConfig() { } @Test - public void testConfigurableDelimiter() { + public void testConfigurableDelimiter() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.SCOPE_DELIMITER, "_"); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E"); @@ -348,11 +346,11 @@ public void testConfigurableDelimiter() { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id"); assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testConfigurableDelimiterForReporters() { + public void testConfigurableDelimiterForReporters() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); @@ -370,11 +368,11 @@ public void testConfigurableDelimiterForReporters() { assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3)); assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1)); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testConfigurableDelimiterForReportersInGroup() { + public void testConfigurableDelimiterForReportersInGroup() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); @@ -395,7 +393,7 @@ public void testConfigurableDelimiterForReportersInGroup() { TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); group.counter("C"); group.close(); - registry.shutdown(); + registry.shutdown().get(); assertEquals(4, TestReporter8.numCorrectDelimitersForRegister); assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister); } @@ -415,7 +413,7 @@ public void testQueryActorShutdown() throws Exception { ActorRef queryServiceActor = registry.getQueryService(); - registry.shutdown(); + registry.shutdown().get(); try { Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout); @@ -471,7 +469,7 @@ public void testExceptionIsolation() throws Exception { assertEquals(metric, TestReporter7.removedMetric); assertEquals("counter", TestReporter7.removedMetricName); - registry.shutdown(); + registry.shutdown().get(); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java index fe22095fdb276..179885112b12b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java @@ -162,7 +162,7 @@ protected void run() { highAvailabilityServices.closeAndCleanupAllData(); } - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java index 325982baa2b3a..f8ed3c6a6d8e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java @@ -44,7 +44,7 @@ public class AbstractMetricGroupTest { * called and the parent is null. */ @Test - public void testGetAllVariables() { + public void testGetAllVariables() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); AbstractMetricGroup group = new AbstractMetricGroup>(registry, new String[0], null) { @@ -60,7 +60,7 @@ protected String getGroupName(CharacterFilter filter) { }; assertTrue(group.getAllVariables().isEmpty()); - registry.shutdown(); + registry.shutdown().get(); } // ======================================================================== @@ -101,7 +101,7 @@ public void testScopeCachingForMultipleReporters() throws Exception { } } } finally { - testRegistry.shutdown(); + testRegistry.shutdown().get(); } } @@ -176,7 +176,7 @@ public String filterCharacters(String input) { } @Test - public void testScopeGenerationWithoutReporters() { + public void testScopeGenerationWithoutReporters() throws Exception { Configuration config = new Configuration(); config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D"); MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); @@ -193,7 +193,7 @@ public void testScopeGenerationWithoutReporters() { assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1)); assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2)); } finally { - testRegistry.shutdown(); + testRegistry.shutdown().get(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java index 05a72acb4d1e4..cb5ec67c97c92 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java @@ -44,7 +44,7 @@ public class JobManagerGroupTest extends TestLogger { // ------------------------------------------------------------------------ @Test - public void addAndRemoveJobs() { + public void addAndRemoveJobs() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); @@ -72,11 +72,11 @@ public void addAndRemoveJobs() { assertTrue(jmJobGroup21.isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testCloseClosesAll() { + public void testCloseClosesAll() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); @@ -94,7 +94,7 @@ public void testCloseClosesAll() { assertTrue(jmJobGroup11.isClosed()); assertTrue(jmJobGroup21.isClosed()); - registry.shutdown(); + registry.shutdown().get(); } // ------------------------------------------------------------------------ @@ -102,18 +102,18 @@ public void testCloseClosesAll() { // ------------------------------------------------------------------------ @Test - public void testGenerateScopeDefault() { + public void testGenerateScopeDefault() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost"); assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents()); assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant..foo."); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -123,7 +123,7 @@ public void testGenerateScopeCustom() { assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java index 4373f80c37d9b..6f4751b07d9b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java @@ -38,7 +38,7 @@ public class JobManagerJobGroupTest extends TestLogger { @Test - public void testGenerateScopeDefault() { + public void testGenerateScopeDefault() throws Exception { MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName"); @@ -52,11 +52,11 @@ public void testGenerateScopeDefault() { "theHostName.jobmanager.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant."); @@ -75,11 +75,11 @@ public void testGenerateScopeCustom() { "some-constant.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustomWildcard() { + public void testGenerateScopeCustomWildcard() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter"); cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant."); @@ -98,7 +98,7 @@ public void testGenerateScopeCustomWildcard() { "peter.some-constant." + jid + ".name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java index bcdcd63ac6dc9..22148db75a776 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java @@ -44,7 +44,7 @@ public class MetricGroupRegistrationTest extends TestLogger { * Verifies that group methods instantiate the correct metric with the given name. */ @Test - public void testMetricInstantiation() { + public void testMetricInstantiation() throws Exception { Configuration config = new Configuration(); config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName()); @@ -85,7 +85,7 @@ public HistogramStatistics getStatistics() { Assert.assertEquals(histogram, TestReporter1.lastPassedMetric); assertEquals("histogram", TestReporter1.lastPassedName); - registry.shutdown(); + registry.shutdown().get(); } /** @@ -107,7 +107,7 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr * Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead. */ @Test - public void testDuplicateGroupName() { + public void testDuplicateGroupName() throws Exception { Configuration config = new Configuration(); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)); @@ -119,6 +119,6 @@ public void testDuplicateGroupName() { MetricGroup group3 = root.addGroup("group"); Assert.assertTrue(group1 == group2 && group2 == group3); - registry.shutdown(); + registry.shutdown().get(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 4dc5edf8a17b6..71ae7f18ecb7e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -62,8 +62,8 @@ public void createRegistry() { } @After - public void shutdownRegistry() { - this.registry.shutdown(); + public void shutdownRegistry() throws Exception { + this.registry.shutdown().get(); this.registry = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java index 820b73efc0010..58198e4f92313 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java @@ -32,6 +32,8 @@ import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.util.Map; @@ -45,10 +47,22 @@ */ public class OperatorGroupTest extends TestLogger { - @Test - public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + @Test + public void testGenerateScopeDefault() throws Exception { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( @@ -62,12 +76,10 @@ public void testGenerateScopeDefault() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name", opGroup.getMetricIdentifier("name")); - - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "...."); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -91,14 +103,12 @@ public void testGenerateScopeCustom() { String.format("%s.%s.%s.%s.%s.name", tmID, jid, vertexId, operatorName, operatorID), operatorGroup.getMetricIdentifier("name")); } finally { - registry.shutdown(); + registry.shutdown().get(); } } @Test - public void testIOMetricGroupInstantiation() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - + public void testIOMetricGroupInstantiation() throws Exception { TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); TaskMetricGroup taskGroup = new TaskMetricGroup( @@ -108,14 +118,10 @@ public void testIOMetricGroupInstantiation() { assertNotNull(opGroup.getIOMetricGroup()); assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter()); assertNotNull(opGroup.getIOMetricGroup().getNumRecordsOutCounter()); - - registry.shutdown(); } @Test public void testVariables() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - JobID jid = new JobID(); JobVertexID tid = new JobVertexID(); AbstractID eid = new AbstractID(); @@ -140,8 +146,6 @@ public void testVariables() { testVariable(variables, ScopeFormat.SCOPE_TASK_ATTEMPT_NUM, "0"); testVariable(variables, ScopeFormat.SCOPE_OPERATOR_ID, oid.toString()); testVariable(variables, ScopeFormat.SCOPE_OPERATOR_NAME, "myOpName"); - - registry.shutdown(); } private static void testVariable(Map variables, String key, String expectedValue) { @@ -156,7 +160,6 @@ public void testCreateQueryServiceMetricInfo() { JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); OperatorID oid = new OperatorID(); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java index 3272f73765a2b..addbcbde66a73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java @@ -30,6 +30,8 @@ import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -44,14 +46,26 @@ */ public class TaskManagerGroupTest extends TestLogger { + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + // ------------------------------------------------------------------------ // adding and removing jobs // ------------------------------------------------------------------------ @Test public void addAndRemoveJobs() throws IOException { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - final TaskManagerMetricGroup group = new TaskManagerMetricGroup( registry, "localhost", new AbstractID().toString()); @@ -106,13 +120,10 @@ public void addAndRemoveJobs() throws IOException { assertTrue(tmGroup13.parent().isClosed()); assertEquals(0, group.numRegisteredJobMetricGroups()); - - registry.shutdown(); } @Test public void testCloseClosesAll() throws IOException { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); final TaskManagerMetricGroup group = new TaskManagerMetricGroup( registry, "localhost", new AbstractID().toString()); @@ -142,8 +153,6 @@ public void testCloseClosesAll() throws IOException { assertTrue(tmGroup11.isClosed()); assertTrue(tmGroup12.isClosed()); assertTrue(tmGroup21.isClosed()); - - registry.shutdown(); } // ------------------------------------------------------------------------ @@ -152,16 +161,14 @@ public void testCloseClosesAll() throws IOException { @Test public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id"); assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents()); assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name")); - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant..foo."); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -169,12 +176,11 @@ public void testGenerateScopeCustom() { assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents()); assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test public void testCreateQueryServiceMetricInfo() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java index b6be31c2dbfb3..52ee578ad5bfe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.metrics.util.DummyCharacterFilter; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -37,10 +39,22 @@ */ public class TaskManagerJobGroupTest extends TestLogger { + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + @Test public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); - TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id"); JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName"); @@ -51,11 +65,10 @@ public void testGenerateScopeDefault() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant."); @@ -73,11 +86,11 @@ public void testGenerateScopeCustom() { assertEquals( "some-constant.myJobName.name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeCustomWildcard() { + public void testGenerateScopeCustomWildcard() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter."); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant."); @@ -95,13 +108,12 @@ public void testGenerateScopeCustomWildcard() { assertEquals( "peter.test-tm-id.some-constant." + jid + ".name", jmGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test public void testCreateQueryServiceMetricInfo() { JobID jid = new JobID(); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index 47ee1a91db113..d9e6158ccc869 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -31,7 +31,9 @@ import org.apache.flink.util.AbstractID; import org.apache.flink.util.TestLogger; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import static org.junit.Assert.assertArrayEquals; @@ -43,13 +45,26 @@ */ public class TaskMetricGroupTest extends TestLogger { + private MetricRegistryImpl registry; + + @Before + public void setup() { + registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); + } + + @After + public void teardown() throws Exception { + if (registry != null) { + registry.shutdown().get(); + } + } + // ------------------------------------------------------------------------ // scope tests // ----------------------------------------------------------------------- @Test public void testGenerateScopeDefault() { - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); JobVertexID vertexId = new JobVertexID(); AbstractID executionId = new AbstractID(); @@ -64,11 +79,10 @@ public void testGenerateScopeDefault() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name", taskGroup.getMetricIdentifier("name")); - registry.shutdown(); } @Test - public void testGenerateScopeCustom() { + public void testGenerateScopeCustom() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc"); cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def"); @@ -91,11 +105,11 @@ public void testGenerateScopeCustom() { assertEquals( String.format("test-tm-id.%s.%s.%s.name", jid, vertexId, executionId), taskGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testGenerateScopeWilcard() { + public void testGenerateScopeWilcard() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*.."); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -115,7 +129,7 @@ public void testGenerateScopeWilcard() { assertEquals( "theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13.name", taskGroup.getMetricIdentifier("name")); - registry.shutdown(); + registry.shutdown().get(); } @Test @@ -123,7 +137,6 @@ public void testCreateQueryServiceMetricInfo() { JobID jid = new JobID(); JobVertexID vid = new JobVertexID(); AbstractID eid = new AbstractID(); - MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()); TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id"); TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname"); TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5); @@ -136,7 +149,7 @@ public void testCreateQueryServiceMetricInfo() { } @Test - public void testTaskMetricGroupCleanup() { + public void testTaskMetricGroupCleanup() throws Exception { CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration()); TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0"); TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job"); @@ -150,11 +163,11 @@ public void testTaskMetricGroupCleanup() { // now all registered metrics should have been unregistered assertEquals(0, registry.getNumberRegisteredMetrics()); - registry.shutdown(); + registry.shutdown().get(); } @Test - public void testOperatorNameTruncation() { + public void testOperatorNameTruncation() throws Exception { Configuration cfg = new Configuration(); cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME); MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg)); @@ -168,6 +181,7 @@ public void testOperatorNameTruncation() { String storedName = operatorMetricGroup.getScopeComponents()[0]; Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length()); Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName); + registry.shutdown().get(); } private static class CountingMetricRegistry extends MetricRegistryImpl { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 9d1af35c38b3e..ed1aad35d7494 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -457,7 +457,7 @@ protected int runApplicationMaster(Configuration config) { if (metricRegistry != null) { try { - metricRegistry.shutdown(); + metricRegistry.shutdown().get(); } catch (Throwable t) { LOG.error("Could not properly shut down the metric registry.", t); }