Skip to content

Commit

Permalink
[hotfix] Remove unused method MiniCluster#waitUntilTaskManagerRegistr…
Browse files Browse the repository at this point in the history
…ationsComplete
  • Loading branch information
tillrohrmann committed Feb 23, 2018
1 parent c131546 commit 1897e23
Showing 1 changed file with 0 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
Expand Down Expand Up @@ -508,52 +507,6 @@ private void shutdownInternally() throws Exception {
}
}

public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
LeaderRetrievalService rmMasterListener = null;
CompletableFuture<LeaderAddressAndId> addressAndIdFuture;

try {
synchronized (lock) {
checkState(running, "FlinkMiniCluster is not running");

OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture();
rmMasterListener = haServices.getResourceManagerLeaderRetriever();
rmMasterListener.start(listenerFuture);
addressAndIdFuture = listenerFuture.future();
}

final LeaderAddressAndId addressAndId = addressAndIdFuture.get();

final ResourceManagerGateway resourceManager = commonRpcService
.connect(
addressAndId.leaderAddress(),
new ResourceManagerId(addressAndId.leaderId()),
ResourceManagerGateway.class)
.get();

final int numTaskManagersToWaitFor = taskManagers.length;

// poll and wait until enough TaskManagers are available
while (true) {
int numTaskManagersAvailable = resourceManager.getNumberOfRegisteredTaskManagers().get();

if (numTaskManagersAvailable >= numTaskManagersToWaitFor) {
break;
}
Thread.sleep(2);
}
}
finally {
try {
if (rmMasterListener != null) {
rmMasterListener.stop();
}
} catch (Exception e) {
LOG.warn("Error shutting down leader listener for ResourceManager");
}
}
}

// ------------------------------------------------------------------------
// running jobs
// ------------------------------------------------------------------------
Expand Down

0 comments on commit 1897e23

Please sign in to comment.