diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
index d98bdd29cbd80..0a7f1613a8089 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -74,4 +75,24 @@ public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService
}
}
}
+
+ /**
+ * Shuts the given {@link ExecutorService} down in a non-blocking fashion. The shut down will
+ * be executed by a thread from the common fork-join pool.
+ *
+ *
The executor services will be shut down gracefully for the given timeout period. Afterwards
+ * {@link ExecutorService#shutdownNow()} will be called.
+ *
+ * @param timeout before {@link ExecutorService#shutdownNow()} is called
+ * @param unit time unit of the timeout
+ * @param executorServices to shut down
+ * @return Future which is completed once the {@link ExecutorService} are shut down
+ */
+ public static CompletableFuture nonBlockingShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ gracefulShutdown(timeout, unit, executorServices);
+ return null;
+ });
+ }
}
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 94804aceaf60f..630fa8390003a 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -433,7 +433,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
if (metricRegistry != null) {
try {
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
} catch (Throwable t) {
LOG.error("Could not shut down metric registry.", t);
}
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 4a2ca3ab34d54..b69b8d8e3fa51 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -74,7 +74,7 @@ public ScheduledReporter getReporter(MetricConfig config) {
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
- public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException {
+ public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "test\"Ta\"..sk";
String jobName = "testJ\"ob:-!ax..?";
@@ -131,7 +131,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept
assertEquals(expectedCounterName, counters.get(myCounter));
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
}
/**
diff --git a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
index fb21a75b86ddc..d23a22ca30df4 100644
--- a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
+++ b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
@@ -150,7 +150,7 @@ public void testDropwizardHistogramWrapperReporting() throws Exception {
assertEquals(0, testingReporter.getMetrics().size());
} finally {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
}
}
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 40b7f158639fb..6e4564671c595 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -145,7 +145,7 @@ public Integer getValue() {
rep1.notifyOfRemovedMetric(g2, "rep2", null);
mg.close();
- reg.shutdown();
+ reg.shutdown().get();
}
/**
@@ -219,7 +219,7 @@ public Integer getValue() {
rep1.close();
rep2.close();
mg.close();
- reg.shutdown();
+ reg.shutdown().get();
}
/**
@@ -266,7 +266,7 @@ public void testHistogramReporting() throws Exception {
} finally {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
}
}
@@ -306,7 +306,7 @@ public void testMeterReporting() throws Exception {
} finally {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
}
}
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
index d4ad1f9ea0d14..724a79b86eb85 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java
@@ -90,9 +90,9 @@ public void setupReporter() {
}
@After
- public void shutdownRegistry() {
+ public void shutdownRegistry() throws Exception {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
}
diff --git a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
index 6833a0661b931..e9fd985e4ac2c 100644
--- a/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
+++ b/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java
@@ -84,9 +84,9 @@ public void setupReporter() {
}
@After
- public void shutdownRegistry() {
+ public void shutdownRegistry() throws Exception {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
}
@@ -237,7 +237,7 @@ class SomeMetricType implements Metric{}
}
@Test
- public void cannotStartTwoReportersOnSamePort() {
+ public void cannotStartTwoReportersOnSamePort() throws Exception {
final MetricRegistryImpl fixedPort1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRangeProvider.next())));
assertThat(fixedPort1.getReporters(), hasSize(1));
@@ -246,12 +246,12 @@ public void cannotStartTwoReportersOnSamePort() {
final MetricRegistryImpl fixedPort2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort()))));
assertThat(fixedPort2.getReporters(), hasSize(0));
- fixedPort1.shutdown();
- fixedPort2.shutdown();
+ fixedPort1.shutdown().get();
+ fixedPort2.shutdown().get();
}
@Test
- public void canStartTwoReportersWhenUsingPortRange() {
+ public void canStartTwoReportersWhenUsingPortRange() throws Exception {
String portRange = portRangeProvider.next();
final MetricRegistryImpl portRange1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRange)));
final MetricRegistryImpl portRange2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", portRange)));
@@ -259,8 +259,8 @@ public void canStartTwoReportersWhenUsingPortRange() {
assertThat(portRange1.getReporters(), hasSize(1));
assertThat(portRange2.getReporters(), hasSize(1));
- portRange1.shutdown();
- portRange2.shutdown();
+ portRange1.shutdown().get();
+ portRange2.shutdown().get();
}
private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
@@ -280,8 +280,8 @@ static Configuration createConfigWithOneReporter(String reporterName, String por
}
@After
- public void closeReporterAndShutdownRegistry() {
- registry.shutdown();
+ public void closeReporterAndShutdownRegistry() throws Exception {
+ registry.shutdown().get();
}
/**
diff --git a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
index b344f45bca608..172c79c775f50 100644
--- a/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
+++ b/flink-metrics/flink-metrics-slf4j/src/test/java/org/apache/flink/metrics/slf4j/Slf4jReporterTest.java
@@ -76,8 +76,8 @@ public static void setUp() {
}
@AfterClass
- public static void tearDown() {
- registry.shutdown();
+ public static void tearDown() throws Exception {
+ registry.shutdown().get();
}
@Test
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 08d4998c859f2..c9f5af07a72a1 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -73,7 +73,7 @@ public void testReplaceInvalidChars() throws NoSuchMethodException, InvocationTa
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
- public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException {
+ public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "testTask";
String jobName = "testJob:-!ax..?";
@@ -124,7 +124,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept
assertEquals(expectedCounterName, counters.get(myCounter));
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
}
/**
@@ -187,7 +187,7 @@ public void testStatsDHistogramReporting() throws Exception {
} finally {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
if (receiver != null) {
@@ -247,7 +247,7 @@ public void testStatsDMetersReporting() throws Exception {
} finally {
if (registry != null) {
- registry.shutdown();
+ registry.shutdown().get();
}
if (receiver != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
new file mode 100644
index 0000000000000..f2f905971c332
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/ActorUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+
+import akka.actor.ActorRef;
+import akka.actor.Kill;
+import akka.pattern.Patterns;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Utility functions for the interaction with Akka {@link akka.actor.Actor}.
+ */
+public class ActorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);
+
+ /**
+ * Shuts the given {@link akka.actor.Actor} down in a non blocking fashion. The method first tries to
+ * gracefully shut them down. If this is not successful, then the actors will be terminated by sending
+ * a {@link akka.actor.Kill} message.
+ *
+ * @param gracePeriod for the graceful shutdown
+ * @param timeUnit time unit of the grace period
+ * @param actors to shut down
+ * @return Future which is completed once all actors have been shut down gracefully or forceful
+ * kill messages have been sent to all actors. Occurring errors will be suppressed into one error.
+ */
+ public static CompletableFuture nonBlockingShutDown(long gracePeriod, TimeUnit timeUnit, ActorRef... actors) {
+ final Collection> terminationFutures = new ArrayList<>(actors.length);
+ final FiniteDuration timeout = new FiniteDuration(gracePeriod, timeUnit);
+
+ for (ActorRef actor : actors) {
+ try {
+ final Future booleanFuture = Patterns.gracefulStop(actor, timeout);
+ final CompletableFuture terminationFuture = FutureUtils.toJava(booleanFuture)
+ .thenApply(ignored -> null)
+ .exceptionally((Throwable throwable) -> {
+ if (throwable instanceof TimeoutException) {
+ // the actor did not gracefully stop within the grace period --> Let's kill him
+ actor.tell(Kill.getInstance(), ActorRef.noSender());
+ return null;
+ } else {
+ throw new CompletionException(throwable);
+ }
+ });
+
+ terminationFutures.add(terminationFuture);
+ } catch (IllegalStateException ignored) {
+ // this can happen if the underlying actor system has been stopped before shutting
+ // the actor down
+ LOG.debug("The actor {} has already been stopped because the " +
+ "underlying ActorSystem has already been shut down.", actor.path());
+ }
+ }
+
+ return FutureUtils.completeAll(terminationFutures);
+ }
+
+ private ActorUtils() {}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
index c8f449022b285..6b3770907a94a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.metrics;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
@@ -28,35 +29,36 @@
import org.apache.flink.metrics.View;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.scope.ScopeFormats;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.Kill;
-import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
/**
* A MetricRegistry keeps track of all registered {@link Metric Metrics}. It serves as the
* connection between {@link MetricGroup MetricGroups} and {@link MetricReporter MetricReporters}.
@@ -66,8 +68,14 @@ public class MetricRegistryImpl implements MetricRegistry {
private final Object lock = new Object();
- private List reporters;
- private ScheduledExecutorService executor;
+ private final List reporters;
+ private final ScheduledExecutorService executor;
+
+ private final ScopeFormats scopeFormats;
+ private final char globalDelimiter;
+ private final List delimiters;
+
+ private final CompletableFuture terminationFuture;
@Nullable
private ActorRef queryService;
@@ -77,9 +85,7 @@ public class MetricRegistryImpl implements MetricRegistry {
private ViewUpdater viewUpdater;
- private final ScopeFormats scopeFormats;
- private final char globalDelimiter;
- private final List delimiters = new ArrayList<>();
+ private boolean isShutdown;
/**
* Creates a new MetricRegistry and starts the configured reporter.
@@ -87,9 +93,12 @@ public class MetricRegistryImpl implements MetricRegistry {
public MetricRegistryImpl(MetricRegistryConfiguration config) {
this.scopeFormats = config.getScopeFormats();
this.globalDelimiter = config.getDelimiter();
+ this.delimiters = new ArrayList<>(10);
+ this.terminationFuture = new CompletableFuture<>();
+ this.isShutdown = false;
// second, instantiate any custom configured reporters
- this.reporters = new ArrayList<>();
+ this.reporters = new ArrayList<>(4);
List> reporterConfigurations = config.getReporterConfigurations();
@@ -226,71 +235,72 @@ public List getReporters() {
*/
public boolean isShutdown() {
synchronized (lock) {
- return reporters == null && executor.isShutdown();
+ return isShutdown;
}
}
/**
* Shuts down this registry and the associated {@link MetricReporter}.
+ *
+ * NOTE: This operation is asynchronous and returns a future which is completed
+ * once the shutdown operation has been completed.
+ *
+ * @return Future which is completed once the {@link MetricRegistryImpl}
+ * is shut down.
*/
- public void shutdown() {
+ public CompletableFuture shutdown() {
synchronized (lock) {
- Future stopFuture = null;
- FiniteDuration stopTimeout = null;
+ if (isShutdown) {
+ return terminationFuture;
+ } else {
+ isShutdown = true;
+ final Collection> terminationFutures = new ArrayList<>(3);
+ final Time gracePeriod = Time.seconds(1L);
- if (queryService != null) {
- stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS);
+ if (queryService != null) {
+ final CompletableFuture queryServiceTerminationFuture = ActorUtils.nonBlockingShutDown(
+ gracePeriod.toMilliseconds(),
+ TimeUnit.MILLISECONDS,
+ queryService);
- try {
- stopFuture = Patterns.gracefulStop(queryService, stopTimeout);
- } catch (IllegalStateException ignored) {
- // this can happen if the underlying actor system has been stopped before shutting
- // the metric registry down
- // TODO: Pull the MetricQueryService actor out of the MetricRegistry
- LOG.debug("The metric query service actor has already been stopped because the " +
- "underlying ActorSystem has already been shut down.");
+ terminationFutures.add(queryServiceTerminationFuture);
}
- }
- if (reporters != null) {
+ Throwable throwable = null;
for (MetricReporter reporter : reporters) {
try {
reporter.close();
} catch (Throwable t) {
- LOG.warn("Metrics reporter did not shut down cleanly", t);
+ throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
}
}
- reporters = null;
- }
- shutdownExecutor();
-
- if (stopFuture != null) {
- boolean stopped = false;
-
- try {
- stopped = Await.result(stopFuture, stopTimeout);
- } catch (Exception e) {
- LOG.warn("Query actor did not properly stop.", e);
- }
+ reporters.clear();
- if (!stopped) {
- // the query actor did not stop in time, let's kill him
- queryService.tell(Kill.getInstance(), ActorRef.noSender());
+ if (throwable != null) {
+ terminationFutures.add(
+ FutureUtils.completedExceptionally(
+ new FlinkException("Could not shut down the metric reporters properly.", throwable)));
}
- }
- }
- }
- private void shutdownExecutor() {
- if (executor != null) {
- executor.shutdown();
+ final CompletableFuture executorShutdownFuture = ExecutorUtils.nonBlockingShutdown(
+ gracePeriod.toMilliseconds(),
+ TimeUnit.MILLISECONDS,
+ executor);
+
+ terminationFutures.add(executorShutdownFuture);
+
+ FutureUtils
+ .completeAll(terminationFutures)
+ .whenComplete(
+ (Void ignored, Throwable error) -> {
+ if (error != null) {
+ terminationFuture.completeExceptionally(error);
+ } else {
+ terminationFuture.complete(null);
+ }
+ });
- try {
- if (!executor.awaitTermination(1L, TimeUnit.SECONDS)) {
- executor.shutdownNow();
- }
- } catch (InterruptedException e) {
- executor.shutdownNow();
+ return terminationFuture;
}
}
}
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d0e401cea5db4..1dfaa5d344bf7 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2054,7 +2054,7 @@ object JobManager {
}
try {
- metricRegistry.shutdown()
+ metricRegistry.shutdown().get()
} catch {
case t: Throwable =>
LOG.warn("Could not properly shut down the metric registry.", t)
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 7948ba177e22a..6c9ee5ba6d6ae 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -468,7 +468,7 @@ abstract class FlinkMiniCluster(
Await.ready(Future.sequence(jmFutures ++ tmFutures ++ rmFutures), timeout)
- metricRegistryOpt.foreach(_.shutdown())
+ metricRegistryOpt.foreach(_.shutdown().get())
if (!useSingleActorSystem) {
taskManagerActorSystems foreach {
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 106bea14ff8c1..485add57c6cfb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1900,7 +1900,7 @@ object TaskManager {
// shut down the metric query service
try {
- metricRegistry.shutdown()
+ metricRegistry.shutdown().get()
} catch {
case t: Throwable =>
LOG.error("Could not properly shut down the metric registry.", t)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
index 2eccc0cb7a04d..adb622d7e1c19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -59,12 +59,12 @@ public class MetricRegistryImplTest extends TestLogger {
private static final char GLOBAL_DEFAULT_DELIMITER = '.';
@Test
- public void testIsShutdown() {
+ public void testIsShutdown() throws Exception {
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
Assert.assertFalse(metricRegistry.isShutdown());
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
Assert.assertTrue(metricRegistry.isShutdown());
}
@@ -73,7 +73,7 @@ public void testIsShutdown() {
* Verifies that the reporter name list is correctly used to determine which reporters should be instantiated.
*/
@Test
- public void testReporterInclusion() {
+ public void testReporterInclusion() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.REPORTERS_LIST, "test");
@@ -87,7 +87,7 @@ public void testReporterInclusion() {
Assert.assertTrue(TestReporter1.wasOpened);
Assert.assertFalse(TestReporter11.wasOpened);
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
}
/**
@@ -106,7 +106,7 @@ public void open(MetricConfig config) {
* Verifies that multiple reporters are instantiated correctly.
*/
@Test
- public void testMultipleReporterInstantiation() {
+ public void testMultipleReporterInstantiation() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
@@ -121,7 +121,7 @@ public void testMultipleReporterInstantiation() {
Assert.assertTrue(TestReporter12.wasOpened);
Assert.assertTrue(TestReporter13.wasOpened);
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
}
/**
@@ -164,14 +164,14 @@ public void open(MetricConfig config) {
* Verifies that configured arguments are properly forwarded to the reporter.
*/
@Test
- public void testReporterArgumentForwarding() {
+ public void testReporterArgumentForwarding() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
- new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
+ new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown().get();
Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null));
Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null));
@@ -190,11 +190,9 @@ public void open(MetricConfig config) {
/**
* Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics.
- *
- * @throws InterruptedException
*/
@Test
- public void testReporterScheduling() throws InterruptedException {
+ public void testReporterScheduling() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
@@ -225,7 +223,7 @@ public void testReporterScheduling() throws InterruptedException {
}
Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0);
- registry.shutdown();
+ registry.shutdown().get();
}
/**
@@ -244,7 +242,7 @@ public void report() {
* Verifies that reporters are notified of added/removed metrics.
*/
@Test
- public void testReporterNotifications() {
+ public void testReporterNotifications() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
@@ -268,7 +266,7 @@ public void testReporterNotifications() {
assertTrue(TestReporter7.removedMetric instanceof Counter);
assertEquals("rootCounter", TestReporter7.removedMetricName);
- registry.shutdown();
+ registry.shutdown().get();
}
/**
@@ -338,7 +336,7 @@ public void testScopeConfig() {
}
@Test
- public void testConfigurableDelimiter() {
+ public void testConfigurableDelimiter() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_DELIMITER, "_");
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
@@ -348,11 +346,11 @@ public void testConfigurableDelimiter() {
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id");
assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testConfigurableDelimiterForReporters() {
+ public void testConfigurableDelimiterForReporters() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
@@ -370,11 +368,11 @@ public void testConfigurableDelimiterForReporters() {
assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3));
assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testConfigurableDelimiterForReportersInGroup() {
+ public void testConfigurableDelimiterForReportersInGroup() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
@@ -395,7 +393,7 @@ public void testConfigurableDelimiterForReportersInGroup() {
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
group.counter("C");
group.close();
- registry.shutdown();
+ registry.shutdown().get();
assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
}
@@ -415,7 +413,7 @@ public void testQueryActorShutdown() throws Exception {
ActorRef queryServiceActor = registry.getQueryService();
- registry.shutdown();
+ registry.shutdown().get();
try {
Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);
@@ -471,7 +469,7 @@ public void testExceptionIsolation() throws Exception {
assertEquals(metric, TestReporter7.removedMetric);
assertEquals("counter", TestReporter7.removedMetricName);
- registry.shutdown();
+ registry.shutdown().get();
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index fe22095fdb276..179885112b12b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -162,7 +162,7 @@ protected void run() {
highAvailabilityServices.closeAndCleanupAllData();
}
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 325982baa2b3a..f8ed3c6a6d8e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -44,7 +44,7 @@ public class AbstractMetricGroupTest {
* called and the parent is null.
*/
@Test
- public void testGetAllVariables() {
+ public void testGetAllVariables() throws Exception {
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
AbstractMetricGroup group = new AbstractMetricGroup>(registry, new String[0], null) {
@@ -60,7 +60,7 @@ protected String getGroupName(CharacterFilter filter) {
};
assertTrue(group.getAllVariables().isEmpty());
- registry.shutdown();
+ registry.shutdown().get();
}
// ========================================================================
@@ -101,7 +101,7 @@ public void testScopeCachingForMultipleReporters() throws Exception {
}
}
} finally {
- testRegistry.shutdown();
+ testRegistry.shutdown().get();
}
}
@@ -176,7 +176,7 @@ public String filterCharacters(String input) {
}
@Test
- public void testScopeGenerationWithoutReporters() {
+ public void testScopeGenerationWithoutReporters() throws Exception {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -193,7 +193,7 @@ public void testScopeGenerationWithoutReporters() {
assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, -1));
assertEquals("A.X.C.D.1", group.getMetricIdentifier("1", FILTER_B, 2));
} finally {
- testRegistry.shutdown();
+ testRegistry.shutdown().get();
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 05a72acb4d1e4..cb5ec67c97c92 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -44,7 +44,7 @@ public class JobManagerGroupTest extends TestLogger {
// ------------------------------------------------------------------------
@Test
- public void addAndRemoveJobs() {
+ public void addAndRemoveJobs() throws Exception {
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
@@ -72,11 +72,11 @@ public void addAndRemoveJobs() {
assertTrue(jmJobGroup21.isClosed());
assertEquals(0, group.numRegisteredJobMetricGroups());
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testCloseClosesAll() {
+ public void testCloseClosesAll() throws Exception {
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
@@ -94,7 +94,7 @@ public void testCloseClosesAll() {
assertTrue(jmJobGroup11.isClosed());
assertTrue(jmJobGroup21.isClosed());
- registry.shutdown();
+ registry.shutdown().get();
}
// ------------------------------------------------------------------------
@@ -102,18 +102,18 @@ public void testCloseClosesAll() {
// ------------------------------------------------------------------------
@Test
- public void testGenerateScopeDefault() {
+ public void testGenerateScopeDefault() throws Exception {
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
assertEquals("localhost.jobmanager.name", group.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testGenerateScopeCustom() {
+ public void testGenerateScopeCustom() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant..foo.");
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -123,7 +123,7 @@ public void testGenerateScopeCustom() {
assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index 4373f80c37d9b..6f4751b07d9b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -38,7 +38,7 @@
public class JobManagerJobGroupTest extends TestLogger {
@Test
- public void testGenerateScopeDefault() {
+ public void testGenerateScopeDefault() throws Exception {
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
@@ -52,11 +52,11 @@ public void testGenerateScopeDefault() {
"theHostName.jobmanager.myJobName.name",
jmGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testGenerateScopeCustom() {
+ public void testGenerateScopeCustom() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.");
@@ -75,11 +75,11 @@ public void testGenerateScopeCustom() {
"some-constant.myJobName.name",
jmGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testGenerateScopeCustomWildcard() {
+ public void testGenerateScopeCustomWildcard() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.");
@@ -98,7 +98,7 @@ public void testGenerateScopeCustomWildcard() {
"peter.some-constant." + jid + ".name",
jmGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index bcdcd63ac6dc9..22148db75a776 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -44,7 +44,7 @@ public class MetricGroupRegistrationTest extends TestLogger {
* Verifies that group methods instantiate the correct metric with the given name.
*/
@Test
- public void testMetricInstantiation() {
+ public void testMetricInstantiation() throws Exception {
Configuration config = new Configuration();
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
@@ -85,7 +85,7 @@ public HistogramStatistics getStatistics() {
Assert.assertEquals(histogram, TestReporter1.lastPassedMetric);
assertEquals("histogram", TestReporter1.lastPassedName);
- registry.shutdown();
+ registry.shutdown().get();
}
/**
@@ -107,7 +107,7 @@ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup gr
* Verifies that when attempting to create a group with the name of an existing one the existing one will be returned instead.
*/
@Test
- public void testDuplicateGroupName() {
+ public void testDuplicateGroupName() throws Exception {
Configuration config = new Configuration();
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
@@ -119,6 +119,6 @@ public void testDuplicateGroupName() {
MetricGroup group3 = root.addGroup("group");
Assert.assertTrue(group1 == group2 && group2 == group3);
- registry.shutdown();
+ registry.shutdown().get();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 4dc5edf8a17b6..71ae7f18ecb7e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -62,8 +62,8 @@ public void createRegistry() {
}
@After
- public void shutdownRegistry() {
- this.registry.shutdown();
+ public void shutdownRegistry() throws Exception {
+ this.registry.shutdown().get();
this.registry = null;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 820b73efc0010..58198e4f92313 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -32,6 +32,8 @@
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.util.Map;
@@ -45,10 +47,22 @@
*/
public class OperatorGroupTest extends TestLogger {
- @Test
- public void testGenerateScopeDefault() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ private MetricRegistryImpl registry;
+
+ @Before
+ public void setup() {
+ registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (registry != null) {
+ registry.shutdown().get();
+ }
+ }
+ @Test
+ public void testGenerateScopeDefault() throws Exception {
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
@@ -62,12 +76,10 @@ public void testGenerateScopeDefault() {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.myOpName.11.name",
opGroup.getMetricIdentifier("name"));
-
- registry.shutdown();
}
@Test
- public void testGenerateScopeCustom() {
+ public void testGenerateScopeCustom() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "....");
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -91,14 +103,12 @@ public void testGenerateScopeCustom() {
String.format("%s.%s.%s.%s.%s.name", tmID, jid, vertexId, operatorName, operatorID),
operatorGroup.getMetricIdentifier("name"));
} finally {
- registry.shutdown();
+ registry.shutdown().get();
}
}
@Test
- public void testIOMetricGroupInstantiation() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
+ public void testIOMetricGroupInstantiation() throws Exception {
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
TaskMetricGroup taskGroup = new TaskMetricGroup(
@@ -108,14 +118,10 @@ public void testIOMetricGroupInstantiation() {
assertNotNull(opGroup.getIOMetricGroup());
assertNotNull(opGroup.getIOMetricGroup().getNumRecordsInCounter());
assertNotNull(opGroup.getIOMetricGroup().getNumRecordsOutCounter());
-
- registry.shutdown();
}
@Test
public void testVariables() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
JobID jid = new JobID();
JobVertexID tid = new JobVertexID();
AbstractID eid = new AbstractID();
@@ -140,8 +146,6 @@ public void testVariables() {
testVariable(variables, ScopeFormat.SCOPE_TASK_ATTEMPT_NUM, "0");
testVariable(variables, ScopeFormat.SCOPE_OPERATOR_ID, oid.toString());
testVariable(variables, ScopeFormat.SCOPE_OPERATOR_NAME, "myOpName");
-
- registry.shutdown();
}
private static void testVariable(Map variables, String key, String expectedValue) {
@@ -156,7 +160,6 @@ public void testCreateQueryServiceMetricInfo() {
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
OperatorID oid = new OperatorID();
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 3272f73765a2b..addbcbde66a73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -30,6 +30,8 @@
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -44,14 +46,26 @@
*/
public class TaskManagerGroupTest extends TestLogger {
+ private MetricRegistryImpl registry;
+
+ @Before
+ public void setup() {
+ registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (registry != null) {
+ registry.shutdown().get();
+ }
+ }
+
// ------------------------------------------------------------------------
// adding and removing jobs
// ------------------------------------------------------------------------
@Test
public void addAndRemoveJobs() throws IOException {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
registry, "localhost", new AbstractID().toString());
@@ -106,13 +120,10 @@ public void addAndRemoveJobs() throws IOException {
assertTrue(tmGroup13.parent().isClosed());
assertEquals(0, group.numRegisteredJobMetricGroups());
-
- registry.shutdown();
}
@Test
public void testCloseClosesAll() throws IOException {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
registry, "localhost", new AbstractID().toString());
@@ -142,8 +153,6 @@ public void testCloseClosesAll() throws IOException {
assertTrue(tmGroup11.isClosed());
assertTrue(tmGroup12.isClosed());
assertTrue(tmGroup21.isClosed());
-
- registry.shutdown();
}
// ------------------------------------------------------------------------
@@ -152,16 +161,14 @@ public void testCloseClosesAll() throws IOException {
@Test
public void testGenerateScopeDefault() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id");
assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents());
assertEquals("localhost.taskmanager.id.name", group.getMetricIdentifier("name"));
- registry.shutdown();
}
@Test
- public void testGenerateScopeCustom() {
+ public void testGenerateScopeCustom() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant..foo.");
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -169,12 +176,11 @@ public void testGenerateScopeCustom() {
assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
assertEquals("constant.host.foo.host.name", group.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
public void testCreateQueryServiceMetricInfo() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
index b6be31c2dbfb3..52ee578ad5bfe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerJobGroupTest.java
@@ -27,6 +27,8 @@
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
@@ -37,10 +39,22 @@
*/
public class TaskManagerJobGroupTest extends TestLogger {
+ private MetricRegistryImpl registry;
+
+ @Before
+ public void setup() {
+ registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (registry != null) {
+ registry.shutdown().get();
+ }
+ }
+
@Test
public void testGenerateScopeDefault() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
JobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -51,11 +65,10 @@ public void testGenerateScopeDefault() {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.name",
jmGroup.getMetricIdentifier("name"));
- registry.shutdown();
}
@Test
- public void testGenerateScopeCustom() {
+ public void testGenerateScopeCustom() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "some-constant.");
@@ -73,11 +86,11 @@ public void testGenerateScopeCustom() {
assertEquals(
"some-constant.myJobName.name",
jmGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testGenerateScopeCustomWildcard() {
+ public void testGenerateScopeCustomWildcard() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_TM, "peter.");
cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "*.some-constant.");
@@ -95,13 +108,12 @@ public void testGenerateScopeCustomWildcard() {
assertEquals(
"peter.test-tm-id.some-constant." + jid + ".name",
jmGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index 47ee1a91db113..d9e6158ccc869 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -31,7 +31,9 @@
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
@@ -43,13 +45,26 @@
*/
public class TaskMetricGroupTest extends TestLogger {
+ private MetricRegistryImpl registry;
+
+ @Before
+ public void setup() {
+ registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (registry != null) {
+ registry.shutdown().get();
+ }
+ }
+
// ------------------------------------------------------------------------
// scope tests
// -----------------------------------------------------------------------
@Test
public void testGenerateScopeDefault() {
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobVertexID vertexId = new JobVertexID();
AbstractID executionId = new AbstractID();
@@ -64,11 +79,10 @@ public void testGenerateScopeDefault() {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName.aTaskName.13.name",
taskGroup.getMetricIdentifier("name"));
- registry.shutdown();
}
@Test
- public void testGenerateScopeCustom() {
+ public void testGenerateScopeCustom() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_TM, "abc");
cfg.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "def");
@@ -91,11 +105,11 @@ public void testGenerateScopeCustom() {
assertEquals(
String.format("test-tm-id.%s.%s.%s.name", jid, vertexId, executionId),
taskGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testGenerateScopeWilcard() {
+ public void testGenerateScopeWilcard() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_TASK, "*..");
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -115,7 +129,7 @@ public void testGenerateScopeWilcard() {
assertEquals(
"theHostName.taskmanager.test-tm-id.myJobName." + executionId + ".13.name",
taskGroup.getMetricIdentifier("name"));
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
@@ -123,7 +137,6 @@ public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
- MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
@@ -136,7 +149,7 @@ public void testCreateQueryServiceMetricInfo() {
}
@Test
- public void testTaskMetricGroupCleanup() {
+ public void testTaskMetricGroupCleanup() throws Exception {
CountingMetricRegistry registry = new CountingMetricRegistry(new Configuration());
TaskManagerMetricGroup taskManagerMetricGroup = new TaskManagerMetricGroup(registry, "localhost", "0");
TaskManagerJobMetricGroup taskManagerJobMetricGroup = new TaskManagerJobMetricGroup(registry, taskManagerMetricGroup, new JobID(), "job");
@@ -150,11 +163,11 @@ public void testTaskMetricGroupCleanup() {
// now all registered metrics should have been unregistered
assertEquals(0, registry.getNumberRegisteredMetrics());
- registry.shutdown();
+ registry.shutdown().get();
}
@Test
- public void testOperatorNameTruncation() {
+ public void testOperatorNameTruncation() throws Exception {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, ScopeFormat.SCOPE_OPERATOR_NAME);
MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
@@ -168,6 +181,7 @@ public void testOperatorNameTruncation() {
String storedName = operatorMetricGroup.getScopeComponents()[0];
Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
Assert.assertEquals(originalName.substring(0, TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH), storedName);
+ registry.shutdown().get();
}
private static class CountingMetricRegistry extends MetricRegistryImpl {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 9d1af35c38b3e..ed1aad35d7494 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -457,7 +457,7 @@ protected int runApplicationMaster(Configuration config) {
if (metricRegistry != null) {
try {
- metricRegistry.shutdown();
+ metricRegistry.shutdown().get();
} catch (Throwable t) {
LOG.error("Could not properly shut down the metric registry.", t);
}