Skip to content

Commit

Permalink
[hotfix] Replace RpcEndpoint#shutDown with RpcEndpoint#closeAsync method
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Feb 12, 2019
1 parent 3d20007 commit dbb1755
Show file tree
Hide file tree
Showing 26 changed files with 72 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ public CompletableFuture<String> triggerSavepoint(

@Override
public CompletableFuture<Acknowledge> shutDownCluster() {
shutDown();
closeAsync();
return CompletableFuture.completedFuture(Acknowledge.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,11 @@ public DispatcherResourceManagerComponent<T> create(
}

if (resourceManager != null) {
resourceManager.shutDown();
terminationFutures.add(resourceManager.getTerminationFuture());
terminationFutures.add(resourceManager.closeAsync());
}

if (dispatcher != null) {
dispatcher.shutDown();
terminationFutures.add(dispatcher.getTerminationFuture());
terminationFutures.add(dispatcher.closeAsync());
}

final FutureUtils.ConjunctFuture<Void> terminationFuture = FutureUtils.completeAll(terminationFutures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,9 @@ private CompletableFuture<Void> closeAsyncInternal() {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

dispatcher.shutDown();
terminationFutures.add(dispatcher.getTerminationFuture());
terminationFutures.add(dispatcher.closeAsync());

resourceManager.shutDown();
terminationFutures.add(resourceManager.getTerminationFuture());
terminationFutures.add(resourceManager.closeAsync());

if (exception != null) {
terminationFutures.add(FutureUtils.completedExceptionally(exception));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public CompletableFuture<Void> onStop() {
suspendExecution(new FlinkException("JobManager is shutting down."));

// shut down will internally release all registered slots
slotPool.shutDown();
final CompletableFuture<Void> slotPoolTerminationFuture = slotPool.closeAsync();

final CompletableFuture<Void> disposeInternalSavepointFuture;

Expand All @@ -376,8 +376,6 @@ public CompletableFuture<Void> onStop() {
disposeInternalSavepointFuture = CompletableFuture.completedFuture(null);
}

final CompletableFuture<Void> slotPoolTerminationFuture = slotPool.getTerminationFuture();

return FutureUtils.completeAll(Arrays.asList(disposeInternalSavepointFuture, slotPoolTerminationFuture));
}

Expand Down Expand Up @@ -1511,12 +1509,6 @@ public JobMasterGateway getGateway() {
return getSelfGateway(JobMasterGateway.class);
}

@Override
public CompletableFuture<Void> closeAsync() {
shutDown();
return getTerminationFuture();
}

//----------------------------------------------------------------------------------------------
// Utility classes
//----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,7 @@ public CompletableFuture<Void> closeAsync() {
if (taskManagers != null) {
for (TaskExecutor tm : taskManagers) {
if (tm != null) {
tm.shutDown();
componentTerminationFutures.add(tm.getTerminationFuture());
componentTerminationFutures.add(tm.closeAsync());
}
}
taskManagers = null;
Expand Down Expand Up @@ -931,7 +930,7 @@ public void onFatalError(Throwable exception) {

if (currentTaskManagers != null) {
// the shutDown is asynchronous
currentTaskManagers[index].shutDown();
currentTaskManagers[index].closeAsync();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class ResourceManagerRunner implements FatalErrorHandler, AutoCloseableAs

private static final Logger LOG = LoggerFactory.getLogger(ResourceManagerRunner.class);

private final Object lock = new Object();

private final ResourceManagerRuntimeServices resourceManagerRuntimeServices;

private final ResourceManager<?> resourceManager;
Expand Down Expand Up @@ -88,10 +86,6 @@ public ResourceManagerRunner(
jobManagerMetricGroup);
}

public ResourceManagerGateway getResourceManageGateway() {
return resourceManager.getSelfGateway(ResourceManagerGateway.class);
}

//-------------------------------------------------------------------------------------
// Lifecycle management
//-------------------------------------------------------------------------------------
Expand All @@ -102,11 +96,7 @@ public void start() throws Exception {

@Override
public CompletableFuture<Void> closeAsync() {
synchronized (lock) {
resourceManager.shutDown();

return resourceManager.getTerminationFuture();
}
return resourceManager.closeAsync();
}

//-------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledFutureAdapter;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -56,7 +57,7 @@
* <p>The RPC endpoint provides {@link #runAsync(Runnable)}, {@link #callAsync(Callable, Time)}
* and the {@link #getMainThreadExecutor()} to execute code in the RPC endpoint's main thread.
*/
public abstract class RpcEndpoint implements RpcGateway {
public abstract class RpcEndpoint implements RpcGateway, AutoCloseableAsync {

protected final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -157,11 +158,8 @@ public CompletableFuture<Void> onStop() {
* <p>In order to wait on the completion of the shut down, obtain the termination future
* via {@link #getTerminationFuture()}} and wait on its completion.
*/
public final void shutDown() {
rpcService.stopServer(rpcServer);
}

public final CompletableFuture<Void> terminate() {
@Override
public final CompletableFuture<Void> closeAsync() {
rpcService.stopServer(rpcServer);
return getTerminationFuture();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public static Set<Class<? extends RpcGateway>> extractImplementedRpcGateways(Cla
* @throws TimeoutException if a timeout occurred
*/
public static void terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout) throws ExecutionException, InterruptedException, TimeoutException {
rpcEndpoint.shutDown();
rpcEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
rpcEndpoint.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ public CompletableFuture<Void> closeAsync() {
if (!shutdown) {
shutdown = true;

taskManager.shutDown();
final CompletableFuture<Void> taskManagerTerminationFuture = taskManager.getTerminationFuture();
final CompletableFuture<Void> taskManagerTerminationFuture = taskManager.closeAsync();

final CompletableFuture<Void> serviceTerminationFuture = FutureUtils.composeAfterwards(
taskManagerTerminationFuture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ public void setup() throws Exception {
@After
public void teardown() throws Exception {
if (dispatcher != null) {
dispatcher.shutDown();
dispatcher.getTerminationFuture().get();
dispatcher.close();
}

if (fatalErrorHandler != null) {
Expand Down Expand Up @@ -294,7 +293,7 @@ public void testBlobServerCleanupWhenJobSubmissionFails() throws Exception {
public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
submitJob();

dispatcher.shutDown();
dispatcher.closeAsync();
terminationFuture.complete(null);
dispatcher.getTerminationFuture().get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,7 @@ public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
submissionFuture.get();
assertThat(dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));

dispatcher.shutDown();
dispatcher.getTerminationFuture().get();
dispatcher.close();

assertThat(submittedJobGraphStore.contains(jobGraph.getJobID()), Matchers.is(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,10 @@ public void testStandbyDispatcherJobRecovery() throws Exception {
dispatcherGateway.submitJob(nonEmptyJobGraph, TIMEOUT).get();

if (dispatcher1.getAddress().equals(leaderConnectionInfo.getAddress())) {
dispatcher1.shutDown();
dispatcher1.closeAsync();
assertThat(jobGraphFuture2.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
} else {
dispatcher2.shutDown();
dispatcher2.closeAsync();
assertThat(jobGraphFuture1.get().getJobID(), is(equalTo(nonEmptyJobGraph.getJobID())));
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand All @@ -37,6 +38,10 @@ public TestingHeartbeatServices(long heartbeatInterval, long heartbeatTimeout, S
this.scheduledExecutorToUse = Preconditions.checkNotNull(scheduledExecutorToUse);
}

public TestingHeartbeatServices() {
this(1000L, 10000L, TestingUtils.defaultScheduledExecutor());
}

@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@

import javax.annotation.Nonnull;

import java.util.concurrent.CompletableFuture;

/**
* {@link ExternalResource} which provides a {@link SlotPool}.
*/
Expand Down Expand Up @@ -102,8 +100,6 @@ protected void after() {
}

private void terminateSlotPool() {
slotPool.shutDown();
CompletableFuture<Void> terminationFuture = slotPool.getTerminationFuture();
terminationFuture.join();
slotPool.closeAsync().join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public void testAllocateWithFreeSlot() throws Exception {
assertEquals(slot1.getTaskManagerLocation(), slot2.getTaskManagerLocation());
assertEquals(slot1.getPhysicalSlotNumber(), slot2.getPhysicalSlotNumber());
} finally {
slotPool.shutDown();
RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}

Expand Down Expand Up @@ -336,7 +336,7 @@ public void testOfferSlot() throws Exception {
assertFalse(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, anotherSlotOfferWithSameAllocationId).get());
assertFalse(slotPoolGateway.offerSlot(anotherTaskManagerLocation, taskManagerGateway, slotOffer).get());
} finally {
slotPool.shutDown();
RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}

Expand Down Expand Up @@ -393,7 +393,7 @@ public void testReleaseResource() throws Exception {
Thread.sleep(10);
assertFalse(future2.isDone());
} finally {
slotPool.shutDown();
RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}

Expand Down Expand Up @@ -577,8 +577,7 @@ public void testShutdownReleasesAllSlots() throws Exception {
assertThat(acceptedSlotOffers, Matchers.equalTo(slotOffers));

// shut down the slot pool
slotPool.shutDown();
slotPool.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
slotPool.closeAsync().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);

// the shut down operation should have freed all registered slots
ArrayList<AllocationID> freedSlots = new ArrayList<>(numSlotOffers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerConfiguration;
import org.apache.flink.runtime.rpc.RpcService;
Expand All @@ -40,10 +40,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static org.mockito.Mockito.mock;

/**
* resourceManager HA test, including grant leadership and revoke leadership
* ResourceManager HA test, including grant leadership and revoke leadership.
*/
public class ResourceManagerHATest extends TestLogger {

Expand All @@ -64,7 +62,7 @@ public void confirmLeaderSessionID(UUID leaderId) {
TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);

HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
TestingHeartbeatServices heartbeatServices = new TestingHeartbeatServices();

ResourceManagerRuntimeServicesConfiguration resourceManagerRuntimeServicesConfiguration = new ResourceManagerRuntimeServicesConfiguration(
Time.seconds(5L),
Expand All @@ -77,8 +75,6 @@ public void confirmLeaderSessionID(UUID leaderId) {
highAvailabilityServices,
rpcService.getScheduledExecutor());

MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class);

TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();

CompletableFuture<ResourceManagerId> revokedLeaderIdFuture = new CompletableFuture<>();
Expand All @@ -91,7 +87,7 @@ public void confirmLeaderSessionID(UUID leaderId) {
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
metricRegistry,
NoOpMetricRegistry.INSTANCE,
resourceManagerRuntimeServices.getJobLeaderIdService(),
new ClusterInformation("localhost", 1234),
testingFatalErrorHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private void runScheduleWithNoDelayTest(RpcEndpointFactory factory) throws Excep
// validate that no concurrent access happened
assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get());
} finally {
rpcEndpoint.shutDown();
RpcUtils.terminateRpcEndpoint(rpcEndpoint, timeout);
}
}

Expand Down Expand Up @@ -320,8 +320,7 @@ private static <T> CompletableFuture<T> testRunAsync(Function<FencedTestEndpoint

return result;
} finally {
fencedTestEndpoint.shutDown();
fencedTestEndpoint.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
RpcUtils.terminateRpcEndpoint(fencedTestEndpoint, timeout);
}
}

Expand Down
Loading

0 comments on commit dbb1755

Please sign in to comment.