diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index e23e0cbebd326..2b38b8587b69a 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -510,7 +510,7 @@ public Protos.Offer.Operation launch(Protos.TaskInfo... taskInfo) { @Override public void close() throws Exception { - rpcService.stopService(); + rpcService.stopService().get(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index ccb3ae4ae7338..f347d05881c7a 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -437,7 +437,7 @@ protected void stopClusterServices(boolean cleanupHaData) throws FlinkException if (commonRpcService != null) { try { - commonRpcService.stopService(); + commonRpcService.stopService().get(); } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 5046ae7b0e4c6..3f019f44326d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -756,7 +756,7 @@ private void initializeIOFormatClasses(Configuration configuration) { private static Throwable shutDownRpc(RpcService rpcService, Throwable priorException) { if (rpcService != null) { try { - rpcService.stopService(); + rpcService.stopService().get(); } catch (Throwable t) { return ExceptionUtils.firstOrSuppressed(t, priorException); @@ -773,7 +773,7 @@ private static Throwable shutDownRpcs(RpcService[] rpcServices, Throwable priorE for (RpcService service : rpcServices) { try { if (service != null) { - service.stopService(); + service.stopService().get(); } } catch (Throwable t) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index 089e4b0161b56..9aa31195fa946 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -116,9 +116,12 @@ > CompletableFuture con void stopServer(RpcServer selfGateway); /** - * Stop the rpc service shutting down all started rpc servers. + * Trigger the asynchronous stopping of the {@link RpcService}. + * + * @return Future which is completed once the {@link RpcService} has been + * fully stopped. */ - void stopService(); + CompletableFuture stopService(); /** * Returns a future indicating when the RPC service has been shut down. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java index f87d33cf5b991..c90a8b5bbbc6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java @@ -50,7 +50,7 @@ public static Set> extractImplementedRpcGateways(Cla while (clazz != null) { for (Class interfaze : clazz.getInterfaces()) { if (RpcGateway.class.isAssignableFrom(interfaze)) { - interfaces.add((Class)interfaze); + interfaces.add((Class) interfaze); } } @@ -65,7 +65,7 @@ public static Set> extractImplementedRpcGateways(Cla * * @param rpcEndpoint to terminate * @param timeout for this operation - * @throws ExecutionException if a problem occurs + * @throws ExecutionException if a problem occurred * @throws InterruptedException if the operation has been interrupted * @throws TimeoutException if a timeout occurred */ @@ -74,6 +74,19 @@ public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout) t rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } + /** + * Shuts the given rpc service down and waits for its termination. + * + * @param rpcService to shut down + * @param timeout for this operation + * @throws InterruptedException if the operation has been interrupted + * @throws ExecutionException if a problem occurred + * @throws TimeoutException if a timeout occurred + */ + public static void terminateRpcService(RpcService rpcService, Time timeout) throws InterruptedException, ExecutionException, TimeoutException { + rpcService.stopService().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + } + // We don't want this class to be instantiable private RpcUtils() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index d2d4bf2322bc9..a65fe4611846d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -42,6 +42,7 @@ import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Terminated; import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.Patterns; @@ -98,6 +99,8 @@ public class AkkaRpcService implements RpcService { private final ScheduledExecutor internalScheduledExecutor; + private final CompletableFuture terminationFuture; + private volatile boolean stopped; public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { @@ -127,6 +130,8 @@ public AkkaRpcService(final ActorSystem actorSystem, final Time timeout) { internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem); + terminationFuture = new CompletableFuture<>(); + stopped = false; } @@ -311,33 +316,40 @@ public void stopServer(RpcServer selfGateway) { } @Override - public void stopService() { - LOG.info("Stopping Akka RPC service."); - + public CompletableFuture stopService() { synchronized (lock) { if (stopped) { - return; + return terminationFuture; } stopped = true; - } - actorSystem.shutdown(); - actorSystem.awaitTermination(); + LOG.info("Stopping Akka RPC service."); - synchronized (lock) { - actors.clear(); - } + final CompletableFuture actorSytemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + actorSytemTerminationFuture.whenComplete( + (Terminated ignored, Throwable throwable) -> { + synchronized (lock) { + actors.clear(); + } + + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(null); + } + + LOG.info("Stopped Akka RPC service."); + }); - LOG.info("Stopped Akka RPC service."); + return terminationFuture; } @Override public CompletableFuture getTerminationFuture() { - return CompletableFuture.runAsync( - actorSystem::awaitTermination, - getExecutor()); + return terminationFuture; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 4cb1beb5d7d11..46205855945e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -173,7 +173,15 @@ protected void shutDownInternally() throws Exception { exception = ExceptionUtils.firstOrSuppressed(e, exception); } - rpcService.stopService(); + try { + rpcService.stopService().get(); + } catch (InterruptedException ie) { + exception = ExceptionUtils.firstOrSuppressed(ie, exception); + + Thread.currentThread().interrupt(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } try { highAvailabilityServices.close(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 096ba5e85aefd..241da8f1bd6db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -53,6 +53,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; @@ -578,7 +579,7 @@ public void testHeartbeatTimeoutWithTaskExecutor() throws Exception { verify(taskExecutorGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(any(TimeoutException.class)); } finally { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } } @@ -680,7 +681,7 @@ public void testHeartbeatTimeoutWithJobManager() throws Exception { verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(rmLeaderId), any(TimeoutException.class)); } finally { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index 46cd9e26773e0..5d264e2010f99 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -134,9 +134,9 @@ public static void setupClass() { } @AfterClass - public static void teardownClass() { + public static void teardownClass() throws Exception { if (rpcService != null) { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, TIMEOUT); rpcService = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java index dfabc611f301e..4291ef21d7e28 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/MiniDispatcherTest.java @@ -61,6 +61,8 @@ import java.io.IOException; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -143,13 +145,13 @@ public void teardown() throws Exception { } @AfterClass - public static void teardownClass() throws IOException { + public static void teardownClass() throws IOException, InterruptedException, ExecutionException, TimeoutException { if (blobServer != null) { blobServer.close(); } if (rpcService != null) { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java index 949ff9621adef..76a5642ec0313 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java @@ -117,7 +117,7 @@ public void teardown() throws Exception { } if (rpcService != null) { - rpcService.stopService(); + rpcService.stopService().get(); rpcService = null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java index 172876d202a49..b2be97ef29c85 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java @@ -92,9 +92,9 @@ public static void setup() { } @AfterClass - public static void shutdown() { + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { if (rpcService != null) { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); rpcService = null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java index 4cd7782a722b9..b9036c19ced96 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSchedulingTestBase.java @@ -64,9 +64,9 @@ public static void setup() { } @AfterClass - public static void teardown() { + public static void teardown() throws ExecutionException, InterruptedException { if (testingRpcService != null) { - testingRpcService.stopService(); + testingRpcService.stopService().get(); testingRpcService = null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index e6446ad9e48d5..d6e05217f47a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -104,7 +104,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java index 650a0f2112b9f..25f976a1e08a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.slf4j.LoggerFactory; @@ -46,6 +48,20 @@ */ public class RegisteredRpcConnectionTest extends TestLogger { + private TestingRpcService rpcService; + + @Before + public void setup() { + rpcService = new TestingRpcService(); + } + + @After + public void tearDown() throws ExecutionException, InterruptedException { + if (rpcService != null) { + rpcService.stopService().get(); + } + } + @Test public void testSuccessfulRpcConnection() throws Exception { final String testRpcConnectionEndpointAddress = ""; @@ -54,7 +70,6 @@ public void testSuccessfulRpcConnection() throws Exception { // an endpoint that immediately returns success TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID)); - TestingRpcService rpcService = new TestingRpcService(); try { rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); @@ -74,7 +89,6 @@ public void testSuccessfulRpcConnection() throws Exception { } finally { testGateway.stop(); - rpcService.stopService(); } } @@ -84,37 +98,30 @@ public void testRpcConnectionFailures() throws Exception { final String testRpcConnectionEndpointAddress = ""; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpcService = new TestingRpcService(); + // gateway that upon calls Throw an exception + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + final RuntimeException registrationException = new RuntimeException(connectionFailureMessage); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( + registrationException); - try { - // gateway that upon calls Throw an exception - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - final RuntimeException registrationException = new RuntimeException(connectionFailureMessage); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenThrow( - registrationException); + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); - rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); - - TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); - connection.start(); - - //wait for connection failure - try { - connection.getConnectionFuture().get(); - fail("expected failure."); - } catch (ExecutionException ee) { - assertEquals(registrationException, ee.getCause()); - } + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); - // validate correct invocation and result - assertFalse(connection.isConnected()); - assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); - assertEquals(leaderId, connection.getTargetLeaderId()); - assertNull(connection.getTargetGateway()); - } - finally { - rpcService.stopService(); + //wait for connection failure + try { + connection.getConnectionFuture().get(); + fail("expected failure."); + } catch (ExecutionException ee) { + assertEquals(registrationException, ee.getCause()); } + + // validate correct invocation and result + assertFalse(connection.isConnected()); + assertEquals(testRpcConnectionEndpointAddress, connection.getTargetAddress()); + assertEquals(leaderId, connection.getTargetLeaderId()); + assertNull(connection.getTargetGateway()); } @Test @@ -124,7 +131,6 @@ public void testRpcConnectionClose() throws Exception { final String connectionID = "Test RPC Connection ID"; TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID)); - TestingRpcService rpcService = new TestingRpcService(); try { rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); @@ -141,7 +147,6 @@ public void testRpcConnectionClose() throws Exception { } finally { testGateway.stop(); - rpcService.stopService(); } } @@ -149,31 +154,26 @@ public void testRpcConnectionClose() throws Exception { public void testReconnect() throws Exception { final String connectionId1 = "Test RPC Connection ID 1"; final String connectionId2 = "Test RPC Connection ID 2"; - final TestingRpcService rpcService = new TestingRpcService(); final String testRpcConnectionEndpointAddress = ""; final UUID leaderId = UUID.randomUUID(); final TestRegistrationGateway testGateway = new TestRegistrationGateway( new RetryingRegistrationTest.TestRegistrationSuccess(connectionId1), new RetryingRegistrationTest.TestRegistrationSuccess(connectionId2)); - try { - rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); + rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway); - TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); - connection.start(); + TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService); + connection.start(); - final String actualConnectionId1 = connection.getConnectionFuture().get(); + final String actualConnectionId1 = connection.getConnectionFuture().get(); - assertEquals(actualConnectionId1, connectionId1); + assertEquals(actualConnectionId1, connectionId1); - assertTrue(connection.tryReconnect()); + assertTrue(connection.tryReconnect()); - final String actualConnectionId2 = connection.getConnectionFuture().get(); + final String actualConnectionId2 = connection.getConnectionFuture().get(); - assertEquals(actualConnectionId2, connectionId2); - } finally { - rpcService.stopService(); - } + assertEquals(actualConnectionId2, connectionId2); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index 885a7f5dd8548..ff5a74818b68b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.slf4j.LoggerFactory; @@ -53,6 +55,20 @@ */ public class RetryingRegistrationTest extends TestLogger { + private TestingRpcService rpcService; + + @Before + public void setup() { + rpcService = new TestingRpcService(); + } + + @After + public void tearDown() throws ExecutionException, InterruptedException { + if (rpcService != null) { + rpcService.stopService().get(); + } + } + @Test public void testSimpleSuccessfulRegistration() throws Exception { final String testId = "laissez les bon temps roulez"; @@ -61,12 +77,11 @@ public void testSimpleSuccessfulRegistration() throws Exception { // an endpoint that immediately returns success TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); - TestingRpcService rpc = new TestingRpcService(); try { - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); registration.startRegistration(); CompletableFuture> future = registration.getFuture(); @@ -84,7 +99,6 @@ public void testSimpleSuccessfulRegistration() throws Exception { } finally { testGateway.stop(); - rpc.stopService(); } } @@ -173,14 +187,12 @@ public void testRetriesOnTimeouts() throws Exception { new TestRegistrationSuccess(testId) // success ); - TestingRpcService rpc = new TestingRpcService(); - try { - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); final long initialTimeout = 20L; TestRetryingRegistration registration = new TestRetryingRegistration( - rpc, + rpcService, testEndpointAddress, leaderId, initialTimeout, @@ -206,7 +218,6 @@ public void testRetriesOnTimeouts() throws Exception { assertTrue("retries did not properly back off", elapsedMillis >= 3 * initialTimeout); } finally { - rpc.stopService(); testGateway.stop(); } } @@ -217,8 +228,6 @@ public void testDecline() throws Exception { final String testEndpointAddress = ""; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpc = new TestingRpcService(); - TestRegistrationGateway testGateway = new TestRegistrationGateway( null, // timeout new RegistrationResponse.Decline("no reason "), @@ -227,9 +236,9 @@ public void testDecline() throws Exception { ); try { - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); long started = System.nanoTime(); registration.startRegistration(); @@ -251,7 +260,6 @@ public void testDecline() throws Exception { } finally { testGateway.stop(); - rpc.stopService(); } } @@ -262,39 +270,32 @@ public void testRetryOnError() throws Exception { final String testEndpointAddress = ""; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpc = new TestingRpcService(); - - try { - // gateway that upon calls first responds with a failure, then with a success - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + // gateway that upon calls first responds with a failure, then with a success + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( - FutureUtils.completedExceptionally(new Exception("test exception")), - CompletableFuture.completedFuture(new TestRegistrationSuccess(testId))); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn( + FutureUtils.completedExceptionally(new Exception("test exception")), + CompletableFuture.completedFuture(new TestRegistrationSuccess(testId))); - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); - long started = System.nanoTime(); - registration.startRegistration(); + long started = System.nanoTime(); + registration.startRegistration(); - CompletableFuture> future = registration.getFuture(); - Tuple2 success = - future.get(10, TimeUnit.SECONDS); + CompletableFuture> future = registration.getFuture(); + Tuple2 success = + future.get(10, TimeUnit.SECONDS); - long finished = System.nanoTime(); - long elapsedMillis = (finished - started) / 1000000; + long finished = System.nanoTime(); + long elapsedMillis = (finished - started) / 1000000; - assertEquals(testId, success.f1.getCorrelationId()); + assertEquals(testId, success.f1.getCorrelationId()); - // validate that some retry-delay / back-off behavior happened - assertTrue("retries did not properly back off", - elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR); - } - finally { - rpc.stopService(); - } + // validate that some retry-delay / back-off behavior happened + assertTrue("retries did not properly back off", + elapsedMillis >= TestRetryingRegistration.DELAY_ON_ERROR); } @Test @@ -302,29 +303,22 @@ public void testCancellation() throws Exception { final String testEndpointAddress = "my-test-address"; final UUID leaderId = UUID.randomUUID(); - TestingRpcService rpc = new TestingRpcService(); + CompletableFuture result = new CompletableFuture<>(); - try { - CompletableFuture result = new CompletableFuture<>(); - - TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); - when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result); + TestRegistrationGateway testGateway = mock(TestRegistrationGateway.class); + when(testGateway.registrationCall(any(UUID.class), anyLong())).thenReturn(result); - rpc.registerGateway(testEndpointAddress, testGateway); + rpcService.registerGateway(testEndpointAddress, testGateway); - TestRetryingRegistration registration = new TestRetryingRegistration(rpc, testEndpointAddress, leaderId); - registration.startRegistration(); + TestRetryingRegistration registration = new TestRetryingRegistration(rpcService, testEndpointAddress, leaderId); + registration.startRegistration(); - // cancel and fail the current registration attempt - registration.cancel(); - result.completeExceptionally(new TimeoutException()); + // cancel and fail the current registration attempt + registration.cancel(); + result.completeExceptionally(new TimeoutException()); - // there should not be a second registration attempt - verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); - } - finally { - rpc.stopService(); - } + // there should not be a second registration attempt + verify(testGateway, atMost(1)).registrationCall(any(UUID.class), anyLong()); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index 57ce3497a965c..a3d17f2433489 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -126,7 +126,7 @@ public void revokeLeadership() { testingFatalErrorHandler.rethrowError(); } } finally { - rpcService.stopService(); + rpcService.stopService().get(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 4440e380cc1de..acd87748f7e68 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; @@ -73,7 +74,7 @@ public void setup() throws Exception { @After public void teardown() throws Exception { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 2af6632395580..a0c4b43efa863 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; @@ -105,7 +106,7 @@ public void setup() throws Exception { @After public void teardown() throws Exception { - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java index 8f35b1310ef6b..f5fa89992e152 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java @@ -46,6 +46,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; @Category(Flip6.class) public class ResourceManagerTest extends TestLogger { @@ -54,18 +55,13 @@ public class ResourceManagerTest extends TestLogger { @Before public void setUp() { - if (rpcService != null) { - rpcService.stopService(); - rpcService = null; - } - rpcService = new TestingRpcService(); } @After - public void tearDown() { + public void tearDown() throws ExecutionException, InterruptedException { if (rpcService != null) { - rpcService.stopService(); + rpcService.stopService().get(); rpcService = null; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 9b72cb497b981..8ba0ccdb5d96e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.exceptions.FencingTokenException; @@ -30,10 +31,12 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -60,9 +63,13 @@ public class AsyncCallsTest extends TestLogger { new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); @AfterClass - public static void shutdown() { - akkaRpcService.stopService(); - actorSystem.shutdown(); + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java index b8040510f8cd8..3d99c3f8127fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java @@ -60,8 +60,7 @@ public static void setup() { @AfterClass public static void teardown() throws ExecutionException, InterruptedException, TimeoutException { if (rpcService != null) { - rpcService.stopService(); - rpcService.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + RpcUtils.terminateRpcService(rpcService, timeout); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java index 26630c824ceb6..017c1f58029c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java @@ -23,17 +23,21 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.TestLogger; +import akka.actor.Terminated; import org.junit.Test; import org.junit.experimental.categories.Category; import scala.Option; import scala.Tuple2; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -46,10 +50,10 @@ * connect to an RpcEndpoint. */ @Category(Flip6.class) -public class RpcConnectionTest { +public class RpcConnectionTest extends TestLogger { @Test - public void testConnectFailure() { + public void testConnectFailure() throws Exception { ActorSystem actorSystem = null; RpcService rpcService = null; try { @@ -77,12 +81,25 @@ public void testConnectFailure() { fail("wrong exception: " + t); } finally { + final CompletableFuture rpcTerminationFuture; + if (rpcService != null) { - rpcService.stopService(); + rpcTerminationFuture = rpcService.stopService(); + } else { + rpcTerminationFuture = CompletableFuture.completedFuture(null); } + + final CompletableFuture actorSystemTerminationFuture; + if (actorSystem != null) { - actorSystem.shutdown(); + actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + } else { + actorSystemTerminationFuture = CompletableFuture.completedFuture(null); } + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java index 6d60de922febc..d52aadb72c31d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java @@ -20,21 +20,22 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; +import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import scala.concurrent.duration.FiniteDuration; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -56,21 +57,13 @@ public static void setup() { @AfterClass public static void teardown() throws Exception { - if (rpcService != null) { - rpcService.stopService(); - } - - if (actorSystem != null) { - actorSystem.shutdown(); - } - if (rpcService != null) { - rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - } + final CompletableFuture rpcTerminationFuture = rpcService.stopService(); + final CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); - if (actorSystem != null) { - actorSystem.awaitTermination(new FiniteDuration(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)); - } + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 4b9f3977fa6c1..db70a0f165f88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -74,9 +74,15 @@ public TestingRpcService(Configuration configuration) { // ------------------------------------------------------------------------ @Override - public void stopService() { - super.stopService(); - registeredConnections.clear(); + public CompletableFuture stopService() { + final CompletableFuture terminationFuture = super.stopService(); + + terminationFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + registeredConnections.clear(); + }); + + return terminationFuture; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 3ff1b80214598..1b4500691f2d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -31,14 +31,17 @@ import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import akka.actor.Terminated; import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -61,10 +64,13 @@ public class AkkaRpcActorTest extends TestLogger { new AkkaRpcService(actorSystem, timeout); @AfterClass - public static void shutdown() { - akkaRpcService.stopService(); - actorSystem.shutdown(); - actorSystem.awaitTermination(); + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index c4259f402413f..d92e496f20573 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -27,10 +27,12 @@ import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; +import akka.actor.Terminated; import org.junit.AfterClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.util.Arrays; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -59,10 +61,13 @@ public class AkkaRpcServiceTest extends TestLogger { new AkkaRpcService(actorSystem, timeout); @AfterClass - public static void shutdown() { - akkaRpcService.stopService(); - actorSystem.shutdown(); - actorSystem.awaitTermination(FutureUtils.toFiniteDuration(timeout)); + public static void shutdown() throws InterruptedException, ExecutionException, TimeoutException { + final CompletableFuture rpcTerminationFuture = akkaRpcService.stopService(); + final CompletableFuture actorSystemTerminationFuture = FutureUtils.toJava(actorSystem.terminate()); + + FutureUtils + .waitForAll(Arrays.asList(rpcTerminationFuture, actorSystemTerminationFuture)) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java index 8f35c0f6afdd5..a69bd84c9bfe7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java @@ -70,7 +70,7 @@ public void failIfNotInMainThread() throws Exception { testEndpoint.shutDown(); } finally { - akkaRpcService.stopService(); + akkaRpcService.stopService().get(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java index bb46bec23a70d..061145c3f4334 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java @@ -18,16 +18,18 @@ package org.apache.flink.runtime.rpc.akka; -import akka.actor.ActorSystem; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.TestLogger; + +import akka.actor.ActorSystem; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; import org.hamcrest.core.Is; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -35,8 +37,13 @@ import org.junit.experimental.categories.Category; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -67,15 +74,17 @@ public static void setup() { } @AfterClass - public static void teardown() { - akkaRpcService1.stopService(); - akkaRpcService2.stopService(); + public static void teardown() throws InterruptedException, ExecutionException, TimeoutException { + final Collection> terminationFutures = new ArrayList<>(4); - actorSystem1.shutdown(); - actorSystem2.shutdown(); + terminationFutures.add(akkaRpcService1.stopService()); + terminationFutures.add(FutureUtils.toJava(actorSystem1.terminate())); + terminationFutures.add(akkaRpcService2.stopService()); + terminationFutures.add(FutureUtils.toJava(actorSystem2.terminate())); - actorSystem1.awaitTermination(); - actorSystem2.awaitTermination(); + FutureUtils + .waitForAll(terminationFutures) + .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index dc1d09f3aedc3..8f4ec5d7af073 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; @@ -220,7 +221,7 @@ public void testSlotAllocation() throws Exception { testingFatalErrorHandler.rethrowError(); } - rpcService.stopService(); + RpcUtils.terminateRpcService(rpcService, timeout); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index e894e48af4511..d7a1860697147 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -189,7 +189,7 @@ public void setup() throws IOException { @After public void teardown() throws Exception { if (rpc != null) { - rpc.stopService(); + RpcUtils.terminateRpcService(rpc, timeout); rpc = null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java index 847250cf29e1e..ae530f7d19070 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcGatewayRetrieverTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcTimeout; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.util.TestLogger; @@ -58,9 +59,7 @@ public static void setup() { @AfterClass public static void teardown() throws InterruptedException, ExecutionException, TimeoutException { if (rpcService != null) { - rpcService.stopService(); - rpcService.getTerminationFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); - + RpcUtils.terminateRpcService(rpcService, TIMEOUT); rpcService = null; } } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 8743380d507ab..455abc9596e9c 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -308,7 +308,7 @@ public void startResourceManager() throws Exception { * Stop the Akka actor system. */ public void stopResourceManager() throws Exception { - rpcService.stopService(); + rpcService.stopService().get(); } }