Skip to content

Commit

Permalink
[FLINK-8670] Make MetricRegistryImpl#shutdown non blocking
Browse files Browse the repository at this point in the history
This commit makes the MetricRegistryImpl#shutdown method non blocking. Instead
of waiting for the completion of the shutdown procedure, the method returns a
future which is completed once the metric registry has completed the shut down.

This closes apache#5504.
  • Loading branch information
tillrohrmann committed Feb 23, 2018
1 parent d9b28e8 commit e29ec0f
Show file tree
Hide file tree
Showing 26 changed files with 334 additions and 181 deletions.
21 changes: 21 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -74,4 +75,24 @@ public static void gracefulShutdown(long timeout, TimeUnit unit, ExecutorService
}
}
}

/**
* Shuts the given {@link ExecutorService} down in a non-blocking fashion. The shut down will
* be executed by a thread from the common fork-join pool.
*
* <p>The executor services will be shut down gracefully for the given timeout period. Afterwards
* {@link ExecutorService#shutdownNow()} will be called.
*
* @param timeout before {@link ExecutorService#shutdownNow()} is called
* @param unit time unit of the timeout
* @param executorServices to shut down
* @return Future which is completed once the {@link ExecutorService} are shut down
*/
public static CompletableFuture<Void> nonBlockingShutdown(long timeout, TimeUnit unit, ExecutorService... executorServices) {
return CompletableFuture.supplyAsync(
() -> {
gracefulShutdown(timeout, unit, executorServices);
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie

if (metricRegistry != null) {
try {
metricRegistry.shutdown();
metricRegistry.shutdown().get();
} catch (Throwable t) {
LOG.error("Could not shut down metric registry.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public ScheduledReporter getReporter(MetricConfig config) {
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException {
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "test\"Ta\"..sk";
String jobName = "testJ\"ob:-!ax..?";
Expand Down Expand Up @@ -131,7 +131,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept

assertEquals(expectedCounterName, counters.get(myCounter));

metricRegistry.shutdown();
metricRegistry.shutdown().get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void testDropwizardHistogramWrapperReporting() throws Exception {
assertEquals(0, testingReporter.getMetrics().size());
} finally {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public Integer getValue() {
rep1.notifyOfRemovedMetric(g2, "rep2", null);

mg.close();
reg.shutdown();
reg.shutdown().get();
}

/**
Expand Down Expand Up @@ -219,7 +219,7 @@ public Integer getValue() {
rep1.close();
rep2.close();
mg.close();
reg.shutdown();
reg.shutdown().get();
}

/**
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testHistogramReporting() throws Exception {

} finally {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ public void testMeterReporting() throws Exception {

} finally {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ public void setupReporter() {
}

@After
public void shutdownRegistry() {
public void shutdownRegistry() throws Exception {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ public void setupReporter() {
}

@After
public void shutdownRegistry() {
public void shutdownRegistry() throws Exception {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}
}

Expand Down Expand Up @@ -237,7 +237,7 @@ class SomeMetricType implements Metric{}
}

@Test
public void cannotStartTwoReportersOnSamePort() {
public void cannotStartTwoReportersOnSamePort() throws Exception {
final MetricRegistryImpl fixedPort1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRangeProvider.next())));
assertThat(fixedPort1.getReporters(), hasSize(1));

Expand All @@ -246,21 +246,21 @@ public void cannotStartTwoReportersOnSamePort() {
final MetricRegistryImpl fixedPort2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", String.valueOf(firstReporter.getPort()))));
assertThat(fixedPort2.getReporters(), hasSize(0));

fixedPort1.shutdown();
fixedPort2.shutdown();
fixedPort1.shutdown().get();
fixedPort2.shutdown().get();
}

@Test
public void canStartTwoReportersWhenUsingPortRange() {
public void canStartTwoReportersWhenUsingPortRange() throws Exception {
String portRange = portRangeProvider.next();
final MetricRegistryImpl portRange1 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", portRange)));
final MetricRegistryImpl portRange2 = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test2", portRange)));

assertThat(portRange1.getReporters(), hasSize(1));
assertThat(portRange2.getReporters(), hasSize(1));

portRange1.shutdown();
portRange2.shutdown();
portRange1.shutdown().get();
portRange2.shutdown().get();
}

private String addMetricAndPollResponse(Metric metric, String metricName) throws UnirestException {
Expand All @@ -280,8 +280,8 @@ static Configuration createConfigWithOneReporter(String reporterName, String por
}

@After
public void closeReporterAndShutdownRegistry() {
registry.shutdown();
public void closeReporterAndShutdownRegistry() throws Exception {
registry.shutdown().get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public static void setUp() {
}

@AfterClass
public static void tearDown() {
registry.shutdown();
public static void tearDown() throws Exception {
registry.shutdown().get();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testReplaceInvalidChars() throws NoSuchMethodException, InvocationTa
* Tests that the registered metrics' names don't contain invalid characters.
*/
@Test
public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessException {
public void testAddingMetrics() throws Exception {
Configuration configuration = new Configuration();
String taskName = "testTask";
String jobName = "testJob:-!ax..?";
Expand Down Expand Up @@ -124,7 +124,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept

assertEquals(expectedCounterName, counters.get(myCounter));

metricRegistry.shutdown();
metricRegistry.shutdown().get();
}

/**
Expand Down Expand Up @@ -187,7 +187,7 @@ public void testStatsDHistogramReporting() throws Exception {

} finally {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}

if (receiver != null) {
Expand Down Expand Up @@ -247,7 +247,7 @@ public void testStatsDMetersReporting() throws Exception {

} finally {
if (registry != null) {
registry.shutdown();
registry.shutdown().get();
}

if (receiver != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.akka;

import org.apache.flink.runtime.concurrent.FutureUtils;

import akka.actor.ActorRef;
import akka.actor.Kill;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/**
* Utility functions for the interaction with Akka {@link akka.actor.Actor}.
*/
public class ActorUtils {

private static final Logger LOG = LoggerFactory.getLogger(ActorUtils.class);

/**
* Shuts the given {@link akka.actor.Actor} down in a non blocking fashion. The method first tries to
* gracefully shut them down. If this is not successful, then the actors will be terminated by sending
* a {@link akka.actor.Kill} message.
*
* @param gracePeriod for the graceful shutdown
* @param timeUnit time unit of the grace period
* @param actors to shut down
* @return Future which is completed once all actors have been shut down gracefully or forceful
* kill messages have been sent to all actors. Occurring errors will be suppressed into one error.
*/
public static CompletableFuture<Void> nonBlockingShutDown(long gracePeriod, TimeUnit timeUnit, ActorRef... actors) {
final Collection<CompletableFuture<Void>> terminationFutures = new ArrayList<>(actors.length);
final FiniteDuration timeout = new FiniteDuration(gracePeriod, timeUnit);

for (ActorRef actor : actors) {
try {
final Future<Boolean> booleanFuture = Patterns.gracefulStop(actor, timeout);
final CompletableFuture<Void> terminationFuture = FutureUtils.toJava(booleanFuture)
.<Void>thenApply(ignored -> null)
.exceptionally((Throwable throwable) -> {
if (throwable instanceof TimeoutException) {
// the actor did not gracefully stop within the grace period --> Let's kill him
actor.tell(Kill.getInstance(), ActorRef.noSender());
return null;
} else {
throw new CompletionException(throwable);
}
});

terminationFutures.add(terminationFuture);
} catch (IllegalStateException ignored) {
// this can happen if the underlying actor system has been stopped before shutting
// the actor down
LOG.debug("The actor {} has already been stopped because the " +
"underlying ActorSystem has already been shut down.", actor.path());
}
}

return FutureUtils.completeAll(terminationFutures);
}

private ActorUtils() {}
}
Loading

0 comments on commit e29ec0f

Please sign in to comment.