Skip to content

Commit

Permalink
[FLINK-4555] wait for ResourceManager to cleanly unregister application
Browse files Browse the repository at this point in the history
This ensures that the ResourceManager has enough time to unregister the
application before shutting down.

This closes apache#2514
  • Loading branch information
mxm committed Sep 24, 2016
1 parent 40c978b commit 1e4b7eb
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@
import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -253,6 +254,7 @@ else if (message instanceof RegisterResourceManagerSuccessful) {
else if (message instanceof StopCluster) {
StopCluster msg = (StopCluster) message;
shutdownCluster(msg.finalStatus(), msg.message());
sender().tell(decorateMessage(StopClusterSuccessful.getInstance()), ActorRef.noSender());
}

// --- miscellaneous messages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,15 @@ class JobManager(
// send resource manager the ok
currentResourceManager match {
case Some(rm) =>
// inform rm
rm ! decorateMessage(msg)
try {
// inform rm and wait for it to confirm
val waitTime = FiniteDuration(5, TimeUnit.SECONDS)
val answer = (rm ? decorateMessage(msg))(waitTime)
Await.ready(answer, waitTime)
} catch {
case e: TimeoutException =>
case e: InterruptedException =>
}
case None =>
// ResourceManager not available
// we choose not to wait here beacuse it might block the shutdown forever
Expand Down

0 comments on commit 1e4b7eb

Please sign in to comment.