Skip to content

Commit

Permalink
[FLINK-4516] update leadership information in ResourceManager
Browse files Browse the repository at this point in the history
The leadership information remained static for connected
JobMasters. This updates it to remove stale JobMasters when they lose
leadership status.

This closes apache#2624
  • Loading branch information
mxm authored and StephanEwen committed Dec 23, 2016
1 parent 5a63697 commit 66f1529
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
Expand All @@ -53,17 +55,14 @@
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import scala.concurrent.duration.FiniteDuration;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -85,10 +84,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
protected static final int EXIT_CODE_FATAL_ERROR = -13;

/** All currently registered JobMasterGateways scoped by JobID. */
private final Map<JobID, JobMasterGateway> jobMasterGateways;
private final Map<JobID, JobMasterRegistration> jobMasters;

/** LeaderListeners for all registered JobMasters. */
private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
/** LeaderListeners for all registered JobIDs. */
private final Map<JobID, JobIdLeaderListener> leaderListeners;

/** All currently registered TaskExecutors with there framework specific worker information. */
private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
Expand All @@ -106,7 +105,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
private LeaderElectionService leaderElectionService;

/** ResourceManager's leader session id which is updated on leader election. */
private UUID leaderSessionID;
private volatile UUID leaderSessionID;

/** All registered listeners for status updates of the ResourceManager. */
private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
Expand All @@ -121,8 +120,8 @@ public ResourceManager(
super(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.slotManagerFactory = checkNotNull(slotManagerFactory);
this.jobMasterGateways = new HashMap<>();
this.jobMasterLeaderRetrievalListeners = new HashMap<>();
this.jobMasters = new HashMap<>();
this.leaderListeners = new HashMap<>();
this.taskExecutors = new HashMap<>();
this.leaderSessionID = new UUID(0, 0);
infoMessageListeners = new HashMap<>();
Expand All @@ -149,9 +148,7 @@ public void start() {
public void shutDown() {
try {
leaderElectionService.stop();
for (JobID jobID : jobMasterGateways.keySet()) {
highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
}
clearState();
super.shutDown();
} catch (Throwable e) {
log.error("A fatal error happened when shutdown the ResourceManager", e);
Expand Down Expand Up @@ -185,6 +182,24 @@ public Future<RegistrationResponse> registerJobMaster(
checkNotNull(jobMasterAddress);
checkNotNull(jobID);

// create a leader retriever in case it doesn't exist
final JobIdLeaderListener jobIdLeaderListener;
if (leaderListeners.containsKey(jobID)) {
jobIdLeaderListener = leaderListeners.get(jobID);
} else {
try {
LeaderRetrievalService jobMasterLeaderRetriever =
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
FlinkCompletableFuture<RegistrationResponse> responseFuture = new FlinkCompletableFuture<>();
responseFuture.complete(new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
return responseFuture;
}
leaderListeners.put(jobID, jobIdLeaderListener);
}

return getRpcService()
.execute(new Callable<JobMasterGateway>() {
@Override
Expand All @@ -197,21 +212,13 @@ public JobMasterGateway call() throws Exception {
throw new Exception("Invalid leader session id");
}

final LeaderConnectionInfo jobMasterLeaderInfo;
try {
jobMasterLeaderInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new FiniteDuration(5, TimeUnit.SECONDS));
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
throw new Exception("Failed to retrieve JobMasterLeaderRetriever");
}

if (!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
log.info("Declining registration request from non-leading JobManager {}", jobMasterAddress);
throw new Exception("JobManager is not leading");
if (!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
.equals(jobMasterLeaderId)) {
throw new Exception("Leader Id did not match");
}

return getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, TimeUnit.SECONDS);
return getRpcService().connect(jobMasterAddress, JobMasterGateway.class)
.get(timeout.getSize(), timeout.getUnit());
}
})
.handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() {
Expand All @@ -220,24 +227,34 @@ public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable t

if (throwable != null) {
return new RegistrationResponse.Decline(throwable.getMessage());
} else {
if (!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
JobMasterLeaderListener jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
try {
LeaderRetrievalService jobMasterLeaderRetriever = highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
} catch (Exception e) {
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
return new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
}
jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
}
final JobMasterGateway existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
if (existingGateway != null) {
log.info("Replacing gateway for registered JobID {}.", jobID);
}

if (!leaderSessionID.equals(resourceManagerLeaderId)) {
log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
" did not equal the received leader session ID {}",
jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
return new RegistrationResponse.Decline("Invalid leader session id");
}

try {
// LeaderID should be available now, but if not we fail the registration
UUID currentJobMasterLeaderId = jobIdLeaderListener.getLeaderID().getNow(null);
if (currentJobMasterLeaderId == null || !currentJobMasterLeaderId.equals(jobMasterLeaderId)) {
throw new Exception("Leader Id did not match");
}
return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
} catch (Exception e) {
return new RegistrationResponse.Decline(e.getMessage());
}

final JobMasterRegistration registration =
new JobMasterRegistration(jobID, jobMasterLeaderId, jobMasterGateway);

final JobMasterRegistration existingRegistration = jobMasters.put(jobID, registration);
if (existingRegistration != null) {
log.info("Replacing JobMaster registration for newly registered JobMaster with JobID {}.", jobID);
}
return new JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);

}
}, getMainThreadExecutor());
}
Expand Down Expand Up @@ -305,13 +322,10 @@ public RMSlotRequestReply requestSlot(
SlotRequest slotRequest) {

JobID jobId = slotRequest.getJobId();
JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId);
JobMasterLeaderListener jobMasterLeaderListener = jobMasterLeaderRetrievalListeners.get(jobId);
JobMasterRegistration jobMasterRegistration = jobMasters.get(jobId);

UUID leaderID = jobMasterLeaderListener.getLeaderID();

if (jobMasterGateway != null
&& jobMasterLeaderID.equals(leaderID)
if (jobMasterRegistration != null
&& jobMasterLeaderID.equals(jobMasterRegistration.getLeaderID())
&& resourceManagerLeaderID.equals(leaderSessionID)) {
return slotManager.requestSlot(slotRequest);
} else {
Expand Down Expand Up @@ -371,8 +385,6 @@ public void run() {
log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), leaderSessionID);
// confirming the leader session ID might be blocking,
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
// notify SlotManager
slotManager.setLeaderUUID(leaderSessionID);
ResourceManager.this.leaderSessionID = leaderSessionID;
}
});
Expand All @@ -387,10 +399,7 @@ public void revokeLeadership() {
@Override
public void run() {
log.info("ResourceManager {} was revoked leadership.", getAddress());
jobMasterGateways.clear();
taskExecutors.clear();
slotManager.clearState();
leaderSessionID = new UUID(0, 0);
clearState();
}
});
}
Expand Down Expand Up @@ -576,6 +585,11 @@ protected ResourceManagerServices createResourceManagerServices() {

private class DefaultResourceManagerServices implements ResourceManagerServices {

@Override
public UUID getLeaderID() {
return ResourceManager.this.leaderSessionID;
}

@Override
public void allocateResource(ResourceProfile resourceProfile) {
ResourceManager.this.startNewWorker(resourceProfile);
Expand All @@ -592,33 +606,95 @@ public Executor getMainThreadExecutor() {
}
}

private static class JobMasterLeaderListener implements LeaderRetrievalListener {
/**
* Leader instantiated for each connected JobMaster
*/
private class JobIdLeaderListener implements LeaderRetrievalListener {

private final JobID jobID;
private UUID leaderID;
private final LeaderRetrievalService retrievalService;

private JobMasterLeaderListener(JobID jobID) {
private final FlinkCompletableFuture<UUID> initialLeaderIdFuture;

private volatile UUID leaderID;

private JobIdLeaderListener(
JobID jobID,
LeaderRetrievalService retrievalService) throws Exception {
this.jobID = jobID;
this.retrievalService = retrievalService;
this.initialLeaderIdFuture = new FlinkCompletableFuture<>();
this.retrievalService.start(this);
}

public Future<UUID> getLeaderID() {
if (!initialLeaderIdFuture.isDone()) {
return initialLeaderIdFuture;
} else {
return FlinkCompletableFuture.completed(leaderID);
}
}

public JobID getJobID() {
return jobID;
}

public UUID getLeaderID() {
return leaderID;

public void stopService() throws Exception {
retrievalService.stop();
}

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
this.leaderID = leaderSessionID;

if (!initialLeaderIdFuture.isDone()) {
initialLeaderIdFuture.complete(leaderSessionID);
}

ResourceManager.this.runAsync(new Runnable() {
@Override
public void run() {
JobMasterRegistration jobMasterRegistration = ResourceManager.this.jobMasters.get(jobID);
if (jobMasterRegistration == null || !jobMasterRegistration.getLeaderID().equals(leaderSessionID)) {
// registration is not valid anymore, remove registration
ResourceManager.this.jobMasters.remove(jobID);
// leader listener is not necessary anymore
JobIdLeaderListener listener = ResourceManager.this.leaderListeners.remove(jobID);
if (listener != null) {
try {
listener.stopService();
} catch (Exception e) {
ResourceManager.this.handleError(e);
}
}
}
}
});
}

@Override
public void handleError(final Exception exception) {
// TODO
ResourceManager.this.handleError(exception);
}
}

private void clearState() {
jobMasters.clear();
taskExecutors.clear();
slotManager.clearState();
Iterator<JobIdLeaderListener> leaderListenerIterator =
leaderListeners.values().iterator();
while (leaderListenerIterator.hasNext()) {
JobIdLeaderListener listener = leaderListenerIterator.next();
try {
listener.stopService();
} catch (Exception e) {
handleError(e);
}
leaderListenerIterator.remove();
}
leaderSessionID = new UUID(0, 0);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ Future<RegistrationResponse> registerJobMaster(
/**
* Requests a slot from the resource manager.
*
* @param jobMasterLeaderID leader id of the JobMaster
* @param resourceManagerLeaderID leader if of the ResourceMaster
* @param jobMasterLeaderID leader if of the JobMaster
* @param slotRequest The slot to request
* @return The confirmation that the slot gets allocated
*/
Future<RMSlotRequestReply> requestSlot(
UUID jobMasterLeaderID,
UUID resourceManagerLeaderID,
UUID jobMasterLeaderID,
SlotRequest slotRequest,
@RpcTimeout Time timeout);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

import org.apache.flink.runtime.clusterframework.types.ResourceProfile;

import java.util.UUID;
import java.util.concurrent.Executor;

/**
* Interface which provides access to services of the ResourceManager.
*/
public interface ResourceManagerServices {

/**
* Gets the current leader id assigned at the ResourceManager.
*/
UUID getLeaderID();

/**
* Allocates a resource according to the resource profile.
*/
Expand Down
Loading

0 comments on commit 66f1529

Please sign in to comment.