Skip to content

Commit

Permalink
[FLINK-7387] [rpc] Require RpcEndpoints to directly implement RpcGate…
Browse files Browse the repository at this point in the history
…ways

This commit changes the relation between RpcEndpoints and RpcGateways. From now on,
the RpcEndpoints have to implement the RpcGateways they want to support instead of
coupling it loosely via a type parameter. In order to obtain self gateway a new
method RpcEndpoint#getSelfGateway(Class) has been introduced. This method can be used
to obtain the RpcGateway type at run time to talk to the RpcEndpoint asynchronously.

All existing RpcEndpoints have been adapted to the new model. This basically means
that they now return a CompletableFuture<X> instead of X.

Add RpcEndpointTest

This closes apache#4498.
  • Loading branch information
tillrohrmann committed Aug 11, 2017
1 parent 9f790d3 commit d95d20e
Show file tree
Hide file tree
Showing 35 changed files with 887 additions and 1,120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public class MesosResourceManagerTest extends TestLogger {

private static ActorSystem system;

private static final Time timeout = Time.seconds(10L);

@Before
public void setup() {
system = AkkaUtils.createLocalActorSystem(flinkConfig);
Expand Down Expand Up @@ -415,7 +417,7 @@ public void startResourceManager() throws Exception {
*/
public void registerJobMaster(MockJobMaster jobMaster) throws Exception {
CompletableFuture<RegistrationResponse> registration = resourceManager.registerJobManager(
rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID);
rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout);
assertTrue(registration.get() instanceof JobMasterRegistrationSuccess);
}

Expand Down Expand Up @@ -589,7 +591,7 @@ public void testWorkerStarted() throws Exception {

// send registration message
CompletableFuture<RegistrationResponse> successfulFuture =
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport);
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport, timeout);
RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
assertTrue(response instanceof TaskExecutorRegistrationSuccess);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
Expand All @@ -37,7 +39,6 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
Expand All @@ -47,14 +48,15 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Base class for the Dispatcher component. The Dispatcher component is responsible
* for receiving job submissions, persisting them, spawning JobManagers to execute
* the jobs and to recover them in case of a master failure. Furthermore, it knows
* about the state of the Flink session cluster.
*/
public abstract class Dispatcher extends RpcEndpoint<DispatcherGateway> {
public abstract class Dispatcher extends RpcEndpoint implements DispatcherGateway {

public static final String DISPATCHER_NAME = "dispatcher";

Expand Down Expand Up @@ -131,8 +133,8 @@ public void postStop() throws Exception {
// RPCs
//------------------------------------------------------

@RpcMethod
public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
final JobID jobId = jobGraph.getJobID();

log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
Expand All @@ -143,15 +145,17 @@ public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
jobSchedulingStatus = runningJobsRegistry.getJobSchedulingStatus(jobId);
} catch (IOException e) {
log.warn("Cannot retrieve job status for {}.", jobId, e);
throw new JobSubmissionException(jobId, "Could not retrieve the job status.", e);
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, "Could not retrieve the job status.", e));
}

if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.PENDING) {
try {
submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph, null));
} catch (Exception e) {
log.warn("Cannot persist JobGraph.", e);
throw new JobSubmissionException(jobId, "Could not persist JobGraph.", e);
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, "Could not persist JobGraph.", e));
}

final JobManagerRunner jobManagerRunner;
Expand Down Expand Up @@ -180,22 +184,24 @@ public Acknowledge submitJob(JobGraph jobGraph) throws JobSubmissionException {
e.addSuppressed(t);
}

throw new JobSubmissionException(jobId, "Could not start JobManager.", e);
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, "Could not start JobManager.", e));
}

jobManagerRunners.put(jobId, jobManagerRunner);

return Acknowledge.get();
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
throw new JobSubmissionException(jobId, "Job has already been submitted and " +
"is currently in state " + jobSchedulingStatus + '.');
return FutureUtils.completedExceptionally(
new JobSubmissionException(jobId, "Job has already been submitted and " +
"is currently in state " + jobSchedulingStatus + '.'));
}
}

@RpcMethod
public Collection<JobID> listJobs() {
@Override
public CompletableFuture<Collection<JobID>> listJobs(Time timeout) {
// TODO: return proper list of running jobs
return jobManagerRunners.keySet();
return CompletableFuture.completedFuture(jobManagerRunners.keySet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
Expand All @@ -35,9 +36,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
Expand All @@ -46,15 +45,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -73,7 +76,7 @@
* TODO : Make pending requests location preference aware
* TODO : Make pass location preferences to ResourceManager when sending a slot request
*/
public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
public class SlotPool extends RpcEndpoint implements SlotPoolGateway {

/** The log for the pool - shared also with the internal classes */
static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
Expand Down Expand Up @@ -154,7 +157,7 @@ public SlotPool(
this.pendingRequests = new HashMap<>();
this.waitingForResourceManager = new HashMap<>();

this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -187,12 +190,12 @@ public void start(UUID newJobManagerLeaderId, String newJobManagerAddress) throw
/**
* Suspends this pool, meaning it has lost its authority to accept and distribute slots.
*/
@RpcMethod
@Override
public void suspend() {
validateRunsInMainThread();

// suspend this RPC endpoint
((StartStoppable) getSelf()).stop();
stop();

// do not accept any requests
jobManagerLeaderId = null;
Expand Down Expand Up @@ -236,7 +239,7 @@ public SlotProvider getSlotProvider() {
// Resource Manager Connection
// ------------------------------------------------------------------------

@RpcMethod
@Override
public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
Expand All @@ -250,7 +253,7 @@ public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManag
waitingForResourceManager.clear();
}

@RpcMethod
@Override
public void disconnectResourceManager() {
this.resourceManagerLeaderId = null;
this.resourceManagerGateway = null;
Expand All @@ -260,16 +263,17 @@ public void disconnectResourceManager() {
// Slot Allocation
// ------------------------------------------------------------------------

@RpcMethod
@Override
public CompletableFuture<SimpleSlot> allocateSlot(
ScheduledUnit task,
ResourceProfile resources,
Iterable<TaskManagerLocation> locationPreferences) {
Iterable<TaskManagerLocation> locationPreferences,
Time timeout) {

return internalAllocateSlot(task, resources, locationPreferences);
}

@RpcMethod
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
Expand Down Expand Up @@ -457,18 +461,39 @@ private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
return null;
}

@RpcMethod
public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers) {
@Override
public CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers) {
validateRunsInMainThread();

final ArrayList<SlotOffer> result = new ArrayList<>();
for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) {
if (offerSlot(offer.f0)) {
result.add(offer.f1);
List<CompletableFuture<Optional<SlotOffer>>> acceptedSlotOffers = offers.stream().map(
offer -> {
CompletableFuture<Optional<SlotOffer>> acceptedSlotOffer = offerSlot(offer.f0).thenApply(
(acceptedSlot) -> {
if (acceptedSlot) {
return Optional.of(offer.f1);
} else {
return Optional.empty();
}
});

return acceptedSlotOffer;
}
}
).collect(Collectors.toList());

CompletableFuture<Collection<Optional<SlotOffer>>> optionalSlotOffers = FutureUtils.combineAll(acceptedSlotOffers);

CompletableFuture<Collection<SlotOffer>> resultingSlotOffers = optionalSlotOffers.thenApply(
collection -> {
Collection<SlotOffer> slotOffers = collection
.stream()
.flatMap(
opt -> opt.map(Stream::of).orElseGet(Stream::empty))
.collect(Collectors.toList());

return result.isEmpty() ? Collections.<SlotOffer>emptyList() : result;
return slotOffers;
});

return resultingSlotOffers;
}

/**
Expand All @@ -480,8 +505,8 @@ public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>>
* @param slot The offered slot
* @return True if we accept the offering
*/
@RpcMethod
public boolean offerSlot(final AllocatedSlot slot) {
@Override
public CompletableFuture<Boolean> offerSlot(final AllocatedSlot slot) {
validateRunsInMainThread();

// check if this TaskManager is valid
Expand All @@ -491,7 +516,7 @@ public boolean offerSlot(final AllocatedSlot slot) {
if (!registeredTaskManagers.contains(resourceID)) {
LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
slot.getSlotAllocationId(), slot);
return false;
return CompletableFuture.completedFuture(false);
}

// check whether we have already using this slot
Expand All @@ -500,7 +525,7 @@ public boolean offerSlot(final AllocatedSlot slot) {

// return true here so that the sender will get a positive acknowledgement to the retry
// and mark the offering as a success
return true;
return CompletableFuture.completedFuture(true);
}

// check whether we have request waiting for this slot
Expand All @@ -520,7 +545,7 @@ public boolean offerSlot(final AllocatedSlot slot) {

// we accepted the request in any case. slot will be released after it idled for
// too long and timed out
return true;
return CompletableFuture.completedFuture(true);
}


Expand All @@ -541,7 +566,7 @@ public boolean offerSlot(final AllocatedSlot slot) {
* @param allocationID Represents the allocation which should be failed
* @param cause The cause of the failure
*/
@RpcMethod
@Override
public void failAllocation(final AllocationID allocationID, final Exception cause) {
final PendingRequest pendingRequest = pendingRequests.remove(allocationID);
if (pendingRequest != null) {
Expand Down Expand Up @@ -576,7 +601,7 @@ else if (availableSlots.tryRemove(allocationID)) {
*
* @param resourceID The id of the TaskManager
*/
@RpcMethod
@Override
public void registerTaskManager(final ResourceID resourceID) {
registeredTaskManagers.add(resourceID);
}
Expand All @@ -587,7 +612,7 @@ public void registerTaskManager(final ResourceID resourceID) {
*
* @param resourceID The id of the TaskManager
*/
@RpcMethod
@Override
public void releaseTaskManager(final ResourceID resourceID) {
if (registeredTaskManagers.remove(resourceID)) {
availableSlots.removeAllForTaskManager(resourceID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -77,7 +78,7 @@ public interface SlotPoolGateway extends RpcGateway {

CompletableFuture<Boolean> offerSlot(AllocatedSlot slot);

CompletableFuture<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
CompletableFuture<Collection<SlotOffer>> offerSlots(Collection<Tuple2<AllocatedSlot, SlotOffer>> offers);

void failAllocation(AllocationID allocationID, Exception cause);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ public void revokeLeadership() {
log.info("JobManager for job {} ({}) was revoked leadership at {}.",
jobGraph.getName(), jobGraph.getJobID(), getAddress());

jobManager.getSelf().suspendExecution(new Exception("JobManager is no longer the leader."));
jobManager.getSelfGateway(JobMasterGateway.class).suspendExecution(new Exception("JobManager is no longer the leader."));
}
}

Expand Down
Loading

0 comments on commit d95d20e

Please sign in to comment.