diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 9d3e12869deea..38c382108befe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.time.Time; @@ -37,6 +38,7 @@ import org.apache.flink.runtime.io.network.ConnectionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -45,11 +47,12 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; +import org.apache.flink.util.FlinkRuntimeException; import org.slf4j.Logger; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -58,7 +61,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.runtime.execution.ExecutionState.CANCELED; @@ -96,6 +98,11 @@ public class Execution implements AccessExecution, Archiveable STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state"); + private static final AtomicReferenceFieldUpdater ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater( + Execution.class, + SimpleSlot.class, + "assignedResource"); + private static final Logger LOG = ExecutionGraph.LOG; private static final int NUM_CANCEL_CALL_TRIES = 3; @@ -134,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable assignedResource; + private volatile SimpleSlot assignedResource; private volatile Throwable failureCause; // once assigned, never changes @@ -193,7 +200,7 @@ public Execution( this.terminationFuture = new CompletableFuture<>(); this.taskManagerLocationFuture = new CompletableFuture<>(); - this.assignedResource = new AtomicReference<>(); + this.assignedResource = null; } // -------------------------------------------------------------------------------------------- @@ -234,7 +241,7 @@ public CompletableFuture getTaskManagerLocationFuture() { } public SimpleSlot getAssignedResource() { - return assignedResource.get(); + return assignedResource; } /** @@ -244,22 +251,23 @@ public SimpleSlot getAssignedResource() { * @param slot to assign to this execution * @return true if the slot could be assigned to the execution, otherwise false */ + @VisibleForTesting boolean tryAssignResource(final SimpleSlot slot) { - Preconditions.checkNotNull(slot); + checkNotNull(slot); // only allow to set the assigned resource in state SCHEDULED or CREATED // note: we also accept resource assignment when being in state CREATED for testing purposes if (state == SCHEDULED || state == CREATED) { - if (assignedResource.compareAndSet(null, slot)) { + if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) { // check for concurrent modification (e.g. cancelling call) if (state == SCHEDULED || state == CREATED) { - Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); + checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet."); taskManagerLocationFuture.complete(slot.getTaskManagerLocation()); return true; } else { // free assigned resource and return false - assignedResource.set(null); + ASSIGNED_SLOT_UPDATER.set(this, null); return false; } } else { @@ -275,7 +283,8 @@ boolean tryAssignResource(final SimpleSlot slot) { @Override public TaskManagerLocation getAssignedResourceLocation() { // returns non-null only when a location is already assigned - return assignedResource.get() != null ? assignedResource.get().getTaskManagerLocation() : null; + final SimpleSlot currentAssignedResource = assignedResource; + return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null; } public Throwable getFailureCause() { @@ -333,7 +342,7 @@ public CompletableFuture getTerminationFuture() { public boolean scheduleForExecution() { SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider(); boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed(); - return scheduleForExecution(resourceProvider, allowQueued); + return scheduleForExecution(resourceProvider, allowQueued, LocationPreferenceConstraint.ANY); } /** @@ -344,12 +353,19 @@ public boolean scheduleForExecution() { * @param slotProvider The slot provider to use to allocate slot for this execution attempt. * @param queued Flag to indicate whether the scheduler may queue this task if it cannot * immediately deploy it. + * @param locationPreferenceConstraint constraint for the location preferences * * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling. */ - public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) { + public boolean scheduleForExecution( + SlotProvider slotProvider, + boolean queued, + LocationPreferenceConstraint locationPreferenceConstraint) { try { - final CompletableFuture allocationFuture = allocateAndAssignSlotForExecution(slotProvider, queued); + final CompletableFuture allocationFuture = allocateAndAssignSlotForExecution( + slotProvider, + queued, + locationPreferenceConstraint); // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is @@ -387,11 +403,15 @@ public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) { * * @param slotProvider to obtain a new slot from * @param queued if the allocation can be queued + * @param locationPreferenceConstraint constraint for the location preferences * @return Future which is completed with this execution once the slot has been assigned * or with an exception if an error occurred. * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state */ - public CompletableFuture allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) throws IllegalExecutionStateException { + public CompletableFuture allocateAndAssignSlotForExecution( + SlotProvider slotProvider, + boolean queued, + LocationPreferenceConstraint locationPreferenceConstraint) throws IllegalExecutionStateException { checkNotNull(slotProvider); @@ -411,18 +431,27 @@ public CompletableFuture allocateAndAssignSlotForExecution(SlotProvid new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, sharingGroup, locationConstraint); - CompletableFuture slotFuture = slotProvider.allocateSlot(toSchedule, queued); - - return slotFuture.thenApply(slot -> { - if (tryAssignResource(slot)) { - return this; - } else { - // release the slot - slot.releaseSlot(); + // calculate the preferred locations + final CompletableFuture> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint); + + return preferredLocationsFuture + .thenCompose( + (Collection preferredLocations) -> + slotProvider.allocateSlot( + toSchedule, + queued, + preferredLocations)) + .thenApply( + (SimpleSlot slot) -> { + if (tryAssignResource(slot)) { + return this; + } else { + // release the slot + slot.releaseSlot(); - throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned ")); - } - }); + throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned ")); + } + }); } else { // call race, already deployed, or already done @@ -436,7 +465,7 @@ public CompletableFuture allocateAndAssignSlotForExecution(SlotProvid * @throws JobException if the execution cannot be deployed to the assigned resource */ public void deploy() throws JobException { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource."); @@ -516,7 +545,7 @@ public void deploy() throws JobException { * Sends stop RPC call. */ public void stop() { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -579,7 +608,7 @@ else if (current == CREATED || current == SCHEDULED) { try { vertex.getExecutionGraph().deregisterExecution(this); - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { slot.releaseSlot(); @@ -640,8 +669,9 @@ else if (numConsumers == 0) { () -> { try { consumerVertex.scheduleForExecution( - consumerVertex.getExecutionGraph().getSlotProvider(), - consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed()); + consumerVertex.getExecutionGraph().getSlotProvider(), + consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY); // there must be at least one known location } catch (Throwable t) { consumerVertex.fail(new IllegalStateException("Could not schedule consumer " + "vertex " + consumerVertex, t)); @@ -748,7 +778,7 @@ public CompletableFuture requestStackTraceSample( int maxStrackTraceDepth, Time timeout) { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -772,7 +802,7 @@ public CompletableFuture requestStackTraceSample( * @param timestamp of the completed checkpoint */ public void notifyCheckpointComplete(long checkpointId, long timestamp) { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -792,7 +822,7 @@ public void notifyCheckpointComplete(long checkpointId, long timestamp) { * @param checkpointOptions of the checkpoint to trigger */ public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -850,7 +880,7 @@ void markFinished(Map> userAccumulators, IOMetrics met updateAccumulatorsAndMetrics(userAccumulators, metrics); - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { slot.releaseSlot(); @@ -908,7 +938,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) { if (transitionState(current, CANCELED)) { try { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { slot.releaseSlot(); @@ -1005,7 +1035,7 @@ private boolean processFail(Throwable t, boolean isCallback, Map partitionInfos) { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); @@ -1151,6 +1181,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // -------------------------------------------------------------------------------------------- + /** + * Calculates the preferred locations based on the location preference constraint. + * + * @param locationPreferenceConstraint constraint for the location preference + * @return Future containing the collection of preferred locations. This might not be completed if not all inputs + * have been a resource assigned. + */ + @VisibleForTesting + public CompletableFuture> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture> preferredLocationsFuture; + + switch(locationPreferenceConstraint) { + case ALL: + preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); + break; + case ANY: + final ArrayList completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size()); + + for (CompletableFuture preferredLocationFuture : preferredLocationFutures) { + if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) { + final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null); + + if (taskManagerLocation == null) { + throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug."); + } + + completedTaskManagerLocations.add(taskManagerLocation); + } + } + + preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations); + break; + default: + throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.'); + } + + return preferredLocationsFuture; + } + private boolean transitionState(ExecutionState currentState, ExecutionState targetState) { return transitionState(currentState, targetState, null); } @@ -1248,7 +1318,7 @@ private void updateAccumulatorsAndMetrics(Map> userAcc @Override public String toString() { - final SimpleSlot slot = assignedResource.get(); + final SimpleSlot slot = assignedResource; return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(), (slot == null ? "(unassigned)" : slot), state); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 62c6e9995e5a8..8a7400138a4df 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -60,6 +60,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.state.SharedStateRegistry; @@ -853,11 +854,14 @@ public void scheduleForExecution() throws JobException { } } - private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { + private void scheduleLazy(SlotProvider slotProvider) { // simply take the vertices without inputs. for (ExecutionJobVertex ejv : verticesInCreationOrder) { if (ejv.getJobVertex().isInputVertex()) { - ejv.scheduleAll(slotProvider, allowQueuedScheduling); + ejv.scheduleAll( + slotProvider, + allowQueuedScheduling, + LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty } } } @@ -884,7 +888,10 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // allocate the slots (obtain all their futures for (ExecutionJobVertex ejv : getVerticesTopologically()) { // these calls are not blocking, they only return futures - Collection> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued); + Collection> allocationFutures = ejv.allocateResourcesForAll( + slotProvider, + queued, + LocationPreferenceConstraint.ALL); allAllocationFutures.addAll(allocationFutures); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 98191d04f8d84..fff7ce1fa30c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.types.Either; @@ -455,14 +456,24 @@ public void connectToPredecessors(Map //--------------------------------------------------------------------------------------------- // Actions //--------------------------------------------------------------------------------------------- - - public void scheduleAll(SlotProvider slotProvider, boolean queued) { + + /** + * Schedules all execution vertices of this ExecutionJobVertex. + * + * @param slotProvider to allocate the slots from + * @param queued if the allocations can be queued + * @param locationPreferenceConstraint constraint for the location preferences + */ + public void scheduleAll( + SlotProvider slotProvider, + boolean queued, + LocationPreferenceConstraint locationPreferenceConstraint) { final ExecutionVertex[] vertices = this.taskVertices; // kick off the tasks for (ExecutionVertex ev : vertices) { - ev.scheduleForExecution(slotProvider, queued); + ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint); } } @@ -474,8 +485,13 @@ public void scheduleAll(SlotProvider slotProvider, boolean queued) { *

If this method throws an exception, it makes sure to release all so far requested slots. * * @param resourceProvider The resource provider from whom the slots are requested. + * @param queued if the allocation can be queued + * @param locationPreferenceConstraint constraint for the location preferences */ - public Collection> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) { + public Collection> allocateResourcesForAll( + SlotProvider resourceProvider, + boolean queued, + LocationPreferenceConstraint locationPreferenceConstraint) { final ExecutionVertex[] vertices = this.taskVertices; final CompletableFuture[] slots = new CompletableFuture[vertices.length]; @@ -484,7 +500,10 @@ public Collection> allocateResourcesForAll(SlotProv for (int i = 0; i < vertices.length; i++) { // allocate the next slot (future) final Execution exec = vertices[i].getCurrentExecutionAttempt(); - final CompletableFuture allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued); + final CompletableFuture allocationFuture = exec.allocateAndAssignSlotForExecution( + resourceProvider, + queued, + locationPreferenceConstraint); slots[i] = allocationFuture; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index e87a5a0240ed4..6d45d06d8f952 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.util.EvictingBoundedList; @@ -487,7 +488,8 @@ public Collection> getPreferredLocationsB return Collections.emptySet(); } else { - Set> inputLocations = new HashSet<>(4); + Set> locations = new HashSet<>(getTotalNumberOfParallelSubtasks()); + Set> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks()); // go over all inputs for (int i = 0; i < inputEdges.length; i++) { @@ -497,17 +499,26 @@ public Collection> getPreferredLocationsB // go over all input sources for (int k = 0; k < sources.length; k++) { // look-up assigned slot of input source - CompletableFuture taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture(); - inputLocations.add(taskManagerLocationFuture); - + CompletableFuture locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture(); + // add input location + inputLocations.add(locationFuture); + // inputs which have too many distinct sources are not considered if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) { - return Collections.emptyList(); + inputLocations.clear(); + break; } } } + // keep the locations of the input with the least preferred locations + if (locations.isEmpty() || // nothing assigned yet + (!inputLocations.isEmpty() && inputLocations.size() < locations.size())) { + // current input has fewer preferred locations + locations.clear(); + locations.addAll(inputLocations); + } } - return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations; + return locations.isEmpty() ? Collections.emptyList() : locations; } } @@ -587,8 +598,22 @@ public Execution resetForNewExecution(final long timestamp, final long originati } } - public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) { - return this.currentExecution.scheduleForExecution(slotProvider, queued); + /** + * Schedules the current execution of this ExecutionVertex. + * + * @param slotProvider to allocate the slots from + * @param queued if the allocation can be queued + * @param locationPreferenceConstraint constraint for the location preferences + * @return + */ + public boolean scheduleForExecution( + SlotProvider slotProvider, + boolean queued, + LocationPreferenceConstraint locationPreferenceConstraint) { + return this.currentExecution.scheduleForExecution( + slotProvider, + queued, + locationPreferenceConstraint); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java index 1919c6118df81..0b00c0e039dd9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.util.AbstractID; import org.apache.flink.util.FlinkException; @@ -216,8 +217,9 @@ private void restart(long globalModVersionOfFailover) { for (ExecutionVertex ev : connectedExecutionVertexes) { try { ev.scheduleForExecution( - executionGraph.getSlotProvider(), - executionGraph.isQueuedSchedulingAllowed()); + executionGraph.getSlotProvider(), + executionGraph.isQueuedSchedulingAllowed(), + LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover region might have failed concurrently } catch (Throwable e) { failover(globalModVersionOfFailover); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index fcf7d40414e98..1944b381639a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -1002,14 +1002,12 @@ public boolean returnAllocatedSlot(Slot slot) { } @Override - public CompletableFuture allocateSlot(ScheduledUnit task, boolean allowQueued) { - Collection> locationPreferenceFutures = - task.getTaskToExecute().getVertex().getPreferredLocations(); + public CompletableFuture allocateSlot( + ScheduledUnit task, + boolean allowQueued, + Collection preferredLocations) { - CompletableFuture> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures); - - return locationPreferencesFuture.thenCompose( - locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout)); + return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, preferredLocations, timeout); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java index 23e6749a46bbf..ef988b4886a75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java @@ -19,7 +19,9 @@ package org.apache.flink.runtime.instance; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.Collection; import java.util.concurrent.CompletableFuture; /** @@ -40,7 +42,11 @@ public interface SlotProvider { * * @param task The task to allocate the slot for * @param allowQueued Whether allow the task be queued if we do not have enough resource + * @param preferredLocations preferred locations for the slot allocation * @return The future of the allocation */ - CompletableFuture allocateSlot(ScheduledUnit task, boolean allowQueued); + CompletableFuture allocateSlot( + ScheduledUnit task, + boolean allowQueued, + Collection preferredLocations); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java new file mode 100644 index 0000000000000..e890512ecba28 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +/** + * Defines the location preference constraint. + * + *

Currently, we support that all input locations have to be taken into consideration + * and only those which are known at scheduling time. Note that if all input locations + * are considered, then the scheduling operation can potentially take a while until all + * inputs have locations assigned. + */ +public enum LocationPreferenceConstraint { + ALL, // wait for all inputs to have a location assigned + ANY // only consider those inputs who already have a location assigned +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 9b1ffbef330ae..1995c125ca169 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -18,6 +18,26 @@ package org.apache.flink.runtime.jobmanager.scheduler; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceDiedException; +import org.apache.flink.runtime.instance.InstanceListener; +import org.apache.flink.runtime.instance.SharedSlot; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -32,31 +52,9 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; - -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.instance.SlotProvider; -import org.apache.flink.runtime.instance.SlotSharingGroupAssignment; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.instance.SharedSlot; -import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.instance.Instance; -import org.apache.flink.runtime.instance.InstanceDiedException; -import org.apache.flink.runtime.instance.InstanceListener; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.ExceptionUtils; - -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots. * @@ -135,31 +133,29 @@ public void shutdown() { @Override - public CompletableFuture allocateSlot(ScheduledUnit task, boolean allowQueued) { - Collection> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs(); + public CompletableFuture allocateSlot( + ScheduledUnit task, + boolean allowQueued, + Collection preferredLocations) { - CompletableFuture> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); + try { + final Object ret = scheduleTask(task, allowQueued, preferredLocations); - return preferredLocationsFuture.thenCompose( - preferredLocations -> { - try { - final Object ret = scheduleTask(task, allowQueued, preferredLocations); - - if (ret instanceof SimpleSlot) { - return CompletableFuture.completedFuture((SimpleSlot) ret); - } else if (ret instanceof CompletableFuture) { - @SuppressWarnings("unchecked") - CompletableFuture typed = (CompletableFuture) ret; - return typed; - } else { - // this should never happen, simply guard this case with an exception - throw new RuntimeException(); - } - } catch (NoResourceAvailableException e) { - throw new CompletionException(e); - } + if (ret instanceof SimpleSlot) { + return CompletableFuture.completedFuture((SimpleSlot) ret); + } + else if (ret instanceof CompletableFuture) { + @SuppressWarnings("unchecked") + CompletableFuture typed = (CompletableFuture) ret; + return typed; } - ); + else { + // this should never happen, simply guard this case with an exception + throw new RuntimeException(); + } + } catch (NoResourceAvailableException e) { + return FutureUtils.completedExceptionally(e); + } } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 5c8040579cf36..de91d788d445f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; @@ -46,14 +47,19 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.TestLogger; @@ -67,14 +73,18 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static junit.framework.TestCase.assertTrue; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; /** * Tests for {@link ExecutionGraph} deployment. @@ -555,6 +565,103 @@ public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()); } + /** + * Tests that eager scheduling will wait until all input locations have been set before + * scheduling a task. + */ + @Test + public void testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Exception { + final int parallelism = 2; + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); + + final Time timeout = Time.hours(1L); + final JobVertexID sourceVertexId = new JobVertexID(); + final JobVertex sourceVertex = new JobVertex("Test source", sourceVertexId); + sourceVertex.setInvokableClass(NoOpInvokable.class); + sourceVertex.setParallelism(parallelism); + + final JobVertexID sinkVertexId = new JobVertexID(); + final JobVertex sinkVertex = new JobVertex("Test sink", sinkVertexId); + sinkVertex.setInvokableClass(NoOpInvokable.class); + sinkVertex.setParallelism(parallelism); + + sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); + + final Map[]> slotFutures = new HashMap<>(2); + + for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) { + CompletableFuture[] slotFutureArray = new CompletableFuture[parallelism]; + + for (int i = 0; i < parallelism; i++) { + slotFutureArray[i] = new CompletableFuture<>(); + } + + slotFutures.put(jobVertexID, slotFutureArray); + slotProvider.addSlots(jobVertexID, slotFutureArray); + } + + final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(3); + + final ExecutionGraph executionGraph = ExecutionGraphTestUtils.createExecutionGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + scheduledExecutorService, + timeout, + sourceVertex, + sinkVertex); + + executionGraph.setScheduleMode(ScheduleMode.EAGER); + executionGraph.scheduleForExecution(); + + // all tasks should be in state SCHEDULED + for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { + assertEquals(ExecutionState.SCHEDULED, executionVertex.getCurrentExecutionAttempt().getState()); + } + + // wait until the source vertex slots have been requested + assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 0).get()); + assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 1).get()); + + // check that the sinks have not requested their slots because they need the location + // information of the sources + assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 0).isDone()); + assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 1).isDone()); + + final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation(); + + final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0); + + final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1); + + final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0); + + final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1); + + slotFutures.get(sourceVertexId)[0].complete(sourceSlot1); + slotFutures.get(sourceVertexId)[1].complete(sourceSlot2); + + // wait until the sink vertex slots have been requested after we completed the source slots + assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 0).get()); + assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 1).get()); + + slotFutures.get(sinkVertexId)[0].complete(sinkSlot1); + slotFutures.get(sinkVertexId)[1].complete(sinkSlot2); + + for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) { + ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 5000L); + } + } + + private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) { + return new SimpleSlot( + jobId, + mock(SlotOwner.class), + taskManagerLocation, + index, + new SimpleAckingTaskManagerGateway()); + } + @SuppressWarnings("serial") public static class FailingFinalizeJobVertex extends JobVertex { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index f4e8b30ca7f3b..d3cec30b7baf9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -53,6 +53,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -115,7 +116,7 @@ public void testExecutionGraphRestartTimeMetric() throws JobException, IOExcepti CompletableFuture future = new CompletableFuture<>(); future.complete(simpleSlot); - when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future); + when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future); when(rootSlot.getSlotNumber()).thenReturn(0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 69a679acf3975..90136a69fe3fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -306,7 +306,7 @@ public void testOneSlotFailureAbortsDeploy() throws Exception { for (int i = 0; i < parallelism; i += 2) { sourceFutures[i].complete(sourceSlots[i]); - targetFutures[i + 1].complete(targetSlots[i + 1]); + targetFutures[i].complete(targetSlots[i]); } // @@ -331,7 +331,7 @@ public void testOneSlotFailureAbortsDeploy() throws Exception { // all completed futures must have been returns for (int i = 0; i < parallelism; i += 2) { assertTrue(sourceSlots[i].isCanceled()); - assertTrue(targetSlots[i + 1].isCanceled()); + assertTrue(targetSlots[i].isCanceled()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 5feeabcd542c8..42a63ec481797 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -320,9 +320,21 @@ public static ExecutionGraph createExecutionGraph( ScheduledExecutorService executor, JobVertex... vertices) throws Exception { + return createExecutionGraph(jid, slotProvider, restartStrategy, executor, Time.seconds(10L), vertices); + } + + public static ExecutionGraph createExecutionGraph( + JobID jid, + SlotProvider slotProvider, + RestartStrategy restartStrategy, + ScheduledExecutorService executor, + Time timeout, + JobVertex... vertices) throws Exception { + checkNotNull(jid); checkNotNull(restartStrategy); checkNotNull(vertices); + checkNotNull(timeout); return ExecutionGraphBuilder.buildGraph( null, @@ -333,7 +345,7 @@ public static ExecutionGraph createExecutionGraph( slotProvider, ExecutionGraphTestUtils.class.getClassLoader(), new StandaloneCheckpointRecoveryFactory(), - Time.seconds(10), + timeout, restartStrategy, new UnregisteredMetricsGroup(), 1, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java new file mode 100644 index 0000000000000..fa845cff1d0ef --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; +import org.apache.flink.runtime.jobmanager.slots.SlotOwner; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link Execution}. + */ +public class ExecutionTest extends TestLogger { + + /** + * Tests that slots are released if we cannot assign the allocated resource to the + * Execution. In this case, a concurrent cancellation precedes the assignment. + */ + @Test + public void testSlotReleaseOnFailedResourceAssignment() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final CompletableFuture slotFuture = new CompletableFuture<>(); + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(jobVertexId, 0, slotFuture); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt(); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + + final SimpleSlot slot = new SimpleSlot( + new JobID(), + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway()); + + CompletableFuture allocationFuture = execution.allocateAndAssignSlotForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ALL); + + assertFalse(allocationFuture.isDone()); + + assertEquals(ExecutionState.SCHEDULED, execution.getState()); + + // cancelling the execution should move it into state CANCELED; this happens before + // the slot future has been completed + execution.cancel(); + + assertEquals(ExecutionState.CANCELED, execution.getState()); + + // completing now the future should cause the slot to be released + slotFuture.complete(slot); + + assertEquals(slot, slotOwner.getReturnedSlotFuture().get()); + } + + /** + * Tests that the slot is released in case of a execution cancellation when having + * a slot assigned and being in state SCHEDULED. + */ + @Test + public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + + final SimpleSlot slot = new SimpleSlot( + new JobID(), + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway()); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt(); + + CompletableFuture allocationFuture = execution.allocateAndAssignSlotForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ALL); + + assertTrue(allocationFuture.isDone()); + + assertEquals(ExecutionState.SCHEDULED, execution.getState()); + + assertEquals(slot, execution.getAssignedResource()); + + // cancelling the execution should move it into state CANCELED + execution.cancel(); + assertEquals(ExecutionState.CANCELED, execution.getState()); + + assertEquals(slot, slotOwner.getReturnedSlotFuture().get()); + } + + /** + * Tests that the slot is released in case of a execution cancellation when being in state + * RUNNING. + */ + @Test + public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception { + final JobVertexID jobVertexId = new JobVertexID(); + final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId); + jobVertex.setInvokableClass(NoOpInvokable.class); + + final TestingSlotOwner slotOwner = new TestingSlotOwner(); + + final SimpleSlot slot = new SimpleSlot( + new JobID(), + slotOwner, + new LocalTaskManagerLocation(), + 0, + new SimpleAckingTaskManagerGateway()); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot)); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + jobVertex); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); + + final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt(); + + CompletableFuture allocationFuture = execution.allocateAndAssignSlotForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ALL); + + assertTrue(allocationFuture.isDone()); + + assertEquals(ExecutionState.SCHEDULED, execution.getState()); + + assertEquals(slot, execution.getAssignedResource()); + + execution.deploy(); + + execution.switchToRunning(); + + // cancelling the execution should move it into state CANCELING + execution.cancel(); + assertEquals(ExecutionState.CANCELING, execution.getState()); + + execution.cancelingComplete(); + + assertEquals(slot, slotOwner.getReturnedSlotFuture().get()); + } + + /** + * Tests that all preferred locations are calculated. + */ + @Test + public void testAllPreferredLocationCalculation() throws ExecutionException, InterruptedException { + final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation(); + final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation(); + final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation(); + + final CompletableFuture locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1); + final CompletableFuture locationFuture2 = new CompletableFuture<>(); + final CompletableFuture locationFuture3 = new CompletableFuture<>(); + + final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3)); + + CompletableFuture> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ALL); + + assertFalse(preferredLocationsFuture.isDone()); + + locationFuture3.complete(taskManagerLocation3); + + assertFalse(preferredLocationsFuture.isDone()); + + locationFuture2.complete(taskManagerLocation2); + + assertTrue(preferredLocationsFuture.isDone()); + + final Collection preferredLocations = preferredLocationsFuture.get(); + + assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation2, taskManagerLocation3)); + } + + /** + * Tests that any preferred locations are calculated. + */ + @Test + public void testAnyPreferredLocationCalculation() throws ExecutionException, InterruptedException { + final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation(); + final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation(); + + final CompletableFuture locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1); + final CompletableFuture locationFuture2 = new CompletableFuture<>(); + final CompletableFuture locationFuture3 = CompletableFuture.completedFuture(taskManagerLocation3); + + final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3)); + + CompletableFuture> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ANY); + + assertTrue(preferredLocationsFuture.isDone()); + + final Collection preferredLocations = preferredLocationsFuture.get(); + + assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation3)); + } + + /** + * Slot owner which records the first returned slot. + */ + public static final class TestingSlotOwner implements SlotOwner { + + final CompletableFuture returnedSlot = new CompletableFuture<>(); + + public CompletableFuture getReturnedSlotFuture() { + return returnedSlot; + } + + @Override + public boolean returnAllocatedSlot(Slot slot) { + return returnedSlot.complete(slot); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 9908daeb95cc8..cb31d1509dd53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -34,6 +34,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; @@ -447,7 +448,7 @@ public void testScheduleOrDeployAfterCancel() { // it can occur as the result of races { Scheduler scheduler = mock(Scheduler.class); - vertex.scheduleForExecution(scheduler, false); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); assertEquals(ExecutionState.CANCELED, vertex.getExecutionState()); } @@ -486,7 +487,7 @@ public void testActionsWhileCancelling() { setVertexState(vertex, ExecutionState.CANCELING); Scheduler scheduler = mock(Scheduler.class); - vertex.scheduleForExecution(scheduler, false); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); } catch (Exception e) { fail("should not throw an exception"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java index 26cb3f155b9e0..cf0868763282b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.TestLogger; + import org.junit.Test; import java.util.Collection; @@ -52,7 +54,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class ExecutionVertexDeploymentTest { +public class ExecutionVertexDeploymentTest extends TestLogger { @Test public void testDeployCall() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index 4eac4aa628794..27f7f51032d87 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; @@ -32,6 +33,7 @@ import org.junit.Test; import org.mockito.Matchers; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; @@ -39,6 +41,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -62,11 +65,11 @@ public void testSlotReleasedWhenScheduledImmediately() { Scheduler scheduler = mock(Scheduler.class); CompletableFuture future = new CompletableFuture<>(); future.complete(slot); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot - vertex.scheduleForExecution(scheduler, false); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); // will have failed assertEquals(ExecutionState.FAILED, vertex.getExecutionState()); @@ -94,11 +97,11 @@ public void testSlotReleasedWhenScheduledQueued() { final CompletableFuture future = new CompletableFuture<>(); Scheduler scheduler = mock(Scheduler.class); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot - vertex.scheduleForExecution(scheduler, true); + vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL); // future has not yet a slot assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState()); @@ -128,12 +131,12 @@ public void testScheduleToDeploying() { Scheduler scheduler = mock(Scheduler.class); CompletableFuture future = new CompletableFuture<>(); future.complete(slot); - when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future); + when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future); assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot - vertex.scheduleForExecution(scheduler, false); + vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL); assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); } catch (Exception e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index 4e89d43e9ee49..f1e0f7c0d9c9e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -156,7 +157,7 @@ public void testMultiRegionsFailover() throws Exception { assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); - ev21.scheduleForExecution(slotProvider, true); + ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL); ev21.getCurrentExecutionAttempt().fail(new Exception("New fail")); assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState()); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); @@ -169,7 +170,7 @@ public void testMultiRegionsFailover() throws Exception { ev11.getCurrentExecutionAttempt().markFinished(); ev21.getCurrentExecutionAttempt().markFinished(); - ev22.scheduleForExecution(slotProvider, true); + ev22.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL); ev22.getCurrentExecutionAttempt().markFinished(); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState()); assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState()); @@ -209,11 +210,11 @@ public void testNoManualRestart() throws Exception { } /** - * Tests that two failover regions failover at the same time, they will not influence each orther + * Tests that two failover regions failover at the same time, they will not influence each other * @throws Exception */ @Test - public void testMutilRegionFailoverAtSameTime() throws Exception { + public void testMultiRegionFailoverAtSameTime() throws Exception { Instance instance = ExecutionGraphTestUtils.getInstance( new ActorTaskManagerGateway( new SimpleActorGateway(TestingUtils.directExecutionContext())), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java index fef6aaafe1cdf..5d7fa1f32dea5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java @@ -22,7 +22,9 @@ import org.apache.flink.runtime.instance.SlotProvider; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -38,6 +40,8 @@ class ProgrammedSlotProvider implements SlotProvider { private final Map[]> slotFutures = new HashMap<>(); + private final Map[]> slotFutureRequested = new HashMap<>(); + private final int parallelism; public ProgrammedSlotProvider(int parallelism) { @@ -51,14 +55,20 @@ public void addSlot(JobVertexID vertex, int subtaskIndex, CompletableFuture= 0 && subtaskIndex < parallelism); CompletableFuture[] futures = slotFutures.get(vertex); + CompletableFuture[] requestedFutures = slotFutureRequested.get(vertex); + if (futures == null) { @SuppressWarnings("unchecked") CompletableFuture[] newArray = (CompletableFuture[]) new CompletableFuture[parallelism]; futures = newArray; slotFutures.put(vertex, futures); + + requestedFutures = new CompletableFuture[parallelism]; + slotFutureRequested.put(vertex, requestedFutures); } futures[subtaskIndex] = future; + requestedFutures[subtaskIndex] = new CompletableFuture<>(); } public void addSlots(JobVertexID vertex, CompletableFuture[] futures) { @@ -67,10 +77,25 @@ public void addSlots(JobVertexID vertex, CompletableFuture[] futures checkArgument(futures.length == parallelism); slotFutures.put(vertex, futures); + + CompletableFuture[] requestedFutures = new CompletableFuture[futures.length]; + + for (int i = 0; i < futures.length; i++) { + requestedFutures[i] = new CompletableFuture<>(); + } + + slotFutureRequested.put(vertex, requestedFutures); + } + + public CompletableFuture getSlotRequestedFuture(JobVertexID jobVertexId, int subtaskIndex) { + return slotFutureRequested.get(jobVertexId)[subtaskIndex]; } @Override - public CompletableFuture allocateSlot(ScheduledUnit task, boolean allowQueued) { + public CompletableFuture allocateSlot( + ScheduledUnit task, + boolean allowQueued, + Collection preferredLocations) { JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId(); int subtask = task.getTaskToExecute().getParallelSubtaskIndex(); @@ -78,6 +103,8 @@ public CompletableFuture allocateSlot(ScheduledUnit task, boolean al if (forTask != null) { CompletableFuture future = forTask[subtask]; if (future != null) { + slotFutureRequested.get(vertexId)[subtask].complete(true); + return future; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index be5282a7da32a..a2323bfbc6c33 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -35,6 +35,7 @@ import java.net.InetAddress; import java.util.ArrayDeque; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkArgument; @@ -70,7 +71,10 @@ public SimpleSlotProvider(JobID jobId, int numSlots, TaskManagerGateway taskMana } @Override - public CompletableFuture allocateSlot(ScheduledUnit task, boolean allowQueued) { + public CompletableFuture allocateSlot( + ScheduledUnit task, + boolean allowQueued, + Collection preferredLocations) { final AllocatedSlot slot; synchronized (slots) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index 9f4a675668a2c..2d35ce29926a2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -37,6 +37,7 @@ import org.junit.Test; +import java.util.Collections; import java.util.concurrent.ExecutionException; public class ScheduleWithCoLocationHintTest extends TestLogger { @@ -66,18 +67,18 @@ public void scheduleAllSharedAndCoLocated() { CoLocationConstraint c6 = new CoLocationConstraint(ccg); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false).get(); - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false).get(); - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false).get(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false).get(); - SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false).get(); - SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false).get(); - SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false).get(); - SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false).get(); - SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6), sharingGroup, c1), false, Collections.emptyList()).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6), sharingGroup, c2), false, Collections.emptyList()).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6), sharingGroup, c3), false, Collections.emptyList()).get(); + SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get(); + SimpleSlot s9 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get(); + SimpleSlot s10 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6), sharingGroup, c4), false, Collections.emptyList()).get(); + SimpleSlot s11 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6), sharingGroup, c5), false, Collections.emptyList()).get(); + SimpleSlot s12 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6), sharingGroup, c6), false, Collections.emptyList()).get(); assertNotNull(s1); assertNotNull(s2); @@ -140,7 +141,7 @@ public void scheduleAllSharedAndCoLocated() { assertTrue(scheduler.getNumberOfAvailableSlots() >= 1); SimpleSlot single = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false).get(); + new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1)), false, Collections.emptyList()).get(); assertNotNull(single); s1.releaseSlot(); @@ -188,11 +189,11 @@ public void scheduleWithIntermediateRelease() { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); - SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false).get(); + SimpleSlot sSolo = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1)), false, Collections.emptyList()).get(); ResourceID taskManager = s1.getTaskManagerID(); @@ -201,7 +202,7 @@ public void scheduleWithIntermediateRelease() { sSolo.releaseSlot(); SimpleSlot sNew = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); assertEquals(taskManager, sNew.getTaskManagerID()); assertEquals(2, scheduler.getNumberOfLocalizedAssignments()); @@ -235,14 +236,14 @@ public void scheduleWithReleaseNoResource() { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); s1.releaseSlot(); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false).get(); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1)), false, Collections.emptyList()).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2)), false, Collections.emptyList()).get(); try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup, c1), false, Collections.emptyList()).get(); fail("Scheduled even though no resource was available."); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NoResourceAvailableException); @@ -283,35 +284,35 @@ public void scheduleMixedCoLocationSlotSharing() { SlotSharingGroup shareGroup = new SlotSharingGroup(); // first wave - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), shareGroup), false, Collections.emptyList()); // second wave SimpleSlot s21 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get(); SimpleSlot s22 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false).get(); + new ScheduledUnit(getTestVertex(jid2, 2, 4), shareGroup, clc2), false, Collections.emptyList()).get(); SimpleSlot s23 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false).get(); + new ScheduledUnit(getTestVertex(jid2, 1, 4), shareGroup, clc3), false, Collections.emptyList()).get(); SimpleSlot s24 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false).get(); + new ScheduledUnit(getTestVertex(jid2, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get(); // third wave SimpleSlot s31 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false).get(); + new ScheduledUnit(getTestVertex(jid3, 1, 4), shareGroup, clc2), false, Collections.emptyList()).get(); SimpleSlot s32 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false).get(); + new ScheduledUnit(getTestVertex(jid3, 2, 4), shareGroup, clc3), false, Collections.emptyList()).get(); SimpleSlot s33 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false).get(); + new ScheduledUnit(getTestVertex(jid3, 3, 4), shareGroup, clc4), false, Collections.emptyList()).get(); SimpleSlot s34 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 4), shareGroup, clc1), false, Collections.emptyList()).get(); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false); - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), shareGroup), false, Collections.emptyList()); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), shareGroup), false, Collections.emptyList()); assertEquals(s21.getTaskManagerID(), s34.getTaskManagerID()); assertEquals(s22.getTaskManagerID(), s31.getTaskManagerID()); @@ -357,25 +358,25 @@ public void testGetsNonLocalFromSharingGroupFirst() { // schedule something into the shared group so that both instances are in the sharing group SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); // schedule one locally to instance 1 SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); // schedule with co location constraint (yet unassigned) and a preference for // instance 1, but it can only get instance 2 SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // schedule something into the assigned co-location constraints and check that they override the // other preferences SimpleSlot s5 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get(); SimpleSlot s6 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // check that each slot got three assertEquals(3, s1.getRoot().getNumberLeaves()); @@ -434,9 +435,9 @@ public void testSlotReleasedInBetween() { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -445,9 +446,9 @@ public void testSlotReleasedInBetween() { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get(); SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // still preserves the previous instance mapping) assertEquals(i1.getTaskManagerID(), s3.getTaskManagerID()); @@ -495,9 +496,9 @@ public void testSlotReleasedInBetweenAndNoNewLocal() { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.singleton(loc2)).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -506,13 +507,13 @@ public void testSlotReleasedInBetweenAndNoNewLocal() { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfSlots()); SimpleSlot sa = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2)), false, Collections.emptyList()).get(); SimpleSlot sb = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2)), false, Collections.emptyList()).get(); try { scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup, cc1), false, Collections.singleton(loc2)).get(); fail("should not be able to find a resource"); } catch (ExecutionException e) { @@ -565,14 +566,14 @@ public void testScheduleOutOfOrder() { // and give locality preferences that hint at using the same shared slot for both // co location constraints (which we seek to prevent) SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup, cc1), false, Collections.singleton(loc1)).get(); SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup, cc2), false, Collections.singleton(loc1)).get(); // check that each slot got three assertEquals(2, s1.getRoot().getNumberLeaves()); @@ -631,14 +632,14 @@ public void nonColocationFollowsCoLocation() { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); SimpleSlot s1 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup, cc1), false, Collections.emptyList()).get(); SimpleSlot s2 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup, cc2), false, Collections.emptyList()).get(); SimpleSlot s3 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.emptyList()).get(); SimpleSlot s4 = scheduler.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc1), sharingGroup), false, Collections.emptyList()).get(); // check that each slot got two assertEquals(2, s1.getRoot().getNumberLeaves()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index a05c1a3cf5aab..7882f4a63b7fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -25,6 +25,8 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -123,17 +125,17 @@ public void testScheduleImmediately() { assertEquals(5, scheduler.getNumberOfAvailableSlots()); // schedule something into all slots - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); // the slots should all be different assertTrue(areAllDistinct(s1, s2, s3, s4, s5)); try { - scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); fail("Scheduler accepted scheduling request without available resource."); } catch (ExecutionException e) { @@ -146,8 +148,8 @@ public void testScheduleImmediately() { assertEquals(2, scheduler.getNumberOfAvailableSlots()); // now we can schedule some more slots - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7)); @@ -235,7 +237,7 @@ public void run() { disposeThread.start(); for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) { - CompletableFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true); + CompletableFuture future = scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), true, Collections.emptyList()); future.thenAcceptAsync( (SimpleSlot slot) -> { synchronized (toRelease) { @@ -284,11 +286,11 @@ public void testScheduleWithDyingInstances() { scheduler.newInstanceAvailable(i3); List slots = new ArrayList(); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); - slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); + slots.add(scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get()); i2.markDead(); @@ -309,7 +311,7 @@ public void testScheduleWithDyingInstances() { // cannot get another slot, since all instances are dead try { - scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getDummyTask()), false, Collections.emptyList()).get(); fail("Scheduler served a slot from a dead instance"); } catch (ExecutionException e) { @@ -344,7 +346,7 @@ public void testSchedulingLocation() { scheduler.newInstanceAvailable(i3); // schedule something on an arbitrary instance - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(new Instance[0])), false, Collections.emptyList()).get(); // figure out how we use the location hints Instance first = (Instance) s1.getOwner(); @@ -352,28 +354,28 @@ public void testSchedulingLocation() { Instance third = first == i3 ? i2 : i3; // something that needs to go to the first instance again - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, Collections.singleton(s1.getTaskManagerLocation())).get(); assertEquals(first, s2.getOwner()); // first or second --> second, because first is full - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, Arrays.asList(first.getTaskManagerLocation(), second.getTaskManagerLocation())).get(); assertEquals(second, s3.getOwner()); // first or third --> third (because first is full) - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); assertEquals(third, s4.getOwner()); assertEquals(third, s5.getOwner()); // first or third --> second, because all others are full - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); assertEquals(second, s6.getOwner()); // release something on the first and second instance s2.releaseSlot(); s6.releaseSlot(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, Arrays.asList(first.getTaskManagerLocation(), third.getTaskManagerLocation())).get(); assertEquals(first, s7.getOwner()); assertEquals(1, scheduler.getNumberOfUnconstrainedAssignments()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java index 1f88dd8be9049..a478eb92a3552 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java @@ -27,6 +27,7 @@ import org.junit.Test; +import java.util.Collections; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -64,10 +65,10 @@ public void scheduleSingleVertexType() { scheduler.newInstanceAvailable(i2); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 8), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1); assertNotNull(s2); @@ -78,7 +79,7 @@ public void scheduleSingleVertexType() { // we cannot schedule another task from the first vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -92,7 +93,7 @@ public void scheduleSingleVertexType() { s3.releaseSlot(); // allocate another slot from that group - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 8), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5); // release all old slots @@ -100,9 +101,9 @@ public void scheduleSingleVertexType() { s2.releaseSlot(); s4.releaseSlot(); - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false).get(); - SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false).get(); - SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s7 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 6, 8), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s8 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 7, 8), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s6); assertNotNull(s7); @@ -149,10 +150,10 @@ public void allocateSlotWithSharing() { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1); assertNotNull(s2); @@ -163,7 +164,7 @@ public void allocateSlotWithSharing() { // we cannot schedule another task from the first vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -174,10 +175,10 @@ public void allocateSlotWithSharing() { } // schedule some tasks from the second ID group - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false).get(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -186,7 +187,7 @@ public void allocateSlotWithSharing() { // we cannot schedule another task from the second vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -207,7 +208,7 @@ public void allocateSlotWithSharing() { // we can still not schedule anything from the second group of vertices try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -218,7 +219,7 @@ public void allocateSlotWithSharing() { } // we can schedule something from the first vertex group - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -228,7 +229,7 @@ public void allocateSlotWithSharing() { // now we release a slot from the second vertex group and schedule another task from that group s2_2.releaseSlot(); - SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false).get(); + SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 5), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5_2); // release all slots @@ -269,10 +270,10 @@ public void allocateSlotWithIntermediateTotallyEmptySharingGroup() { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); @@ -288,10 +289,10 @@ public void allocateSlotWithIntermediateTotallyEmptySharingGroup() { assertEquals(0, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid2)); // schedule some tasks from the second ID group - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfSlots()); assertEquals(4, sharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jid1)); @@ -334,10 +335,10 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup() { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 4 tasks from the first vertex group - SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -347,10 +348,10 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup() { assertTrue(areAllDistinct(s1_1, s2_1, s3_1, s4_1)); // schedule 4 tasks from the second vertex group - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false).get(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 7), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -360,10 +361,10 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup() { assertTrue(areAllDistinct(s1_2, s2_2, s3_2, s4_2)); // schedule 4 tasks from the third vertex group - SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false).get(); - SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -375,7 +376,7 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup() { // we cannot schedule another task from the second vertex group try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -391,9 +392,9 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup() { s3_2.releaseSlot(); s4_2.releaseSlot(); - SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false).get(); - SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false).get(); - SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false).get(); + SimpleSlot s5_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s6_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 6, 7), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s7_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 7, 7), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s5_2); assertNotNull(s6_2); @@ -444,9 +445,9 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup2() { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule 1 tasks from the first vertex group and 2 from the second - SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false).get(); - SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 2), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 2), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_1); assertNotNull(s2_1); @@ -462,7 +463,7 @@ public void allocateSlotWithTemporarilyEmptyVertexGroup2() { // this should free one slot so we can allocate one non-shared - SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false).get(); + SimpleSlot sx = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1)), false, Collections.emptyList()).get(); assertNotNull(sx); assertEquals(1, sharingGroup.getTaskAssignment().getNumberOfSlots()); @@ -497,28 +498,28 @@ public void scheduleMixedSharingAndNonSharing() { scheduler.newInstanceAvailable(getRandomInstance(2)); // schedule some individual vertices - SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false).get(); - SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false).get(); + SimpleSlot sA1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 0, 2)), false, Collections.emptyList()).get(); + SimpleSlot sA2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidA, 1, 2)), false, Collections.emptyList()).get(); assertNotNull(sA1); assertNotNull(sA2); // schedule some vertices in the sharing group - SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); - SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); - SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); + SimpleSlot s1_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_0); assertNotNull(s1_1); assertNotNull(s2_0); assertNotNull(s2_1); // schedule another isolated vertex - SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false).get(); + SimpleSlot sB1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 1, 3)), false, Collections.emptyList()).get(); assertNotNull(sB1); // should not be able to schedule more vertices try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -529,7 +530,7 @@ public void scheduleMixedSharingAndNonSharing() { } try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -540,7 +541,7 @@ public void scheduleMixedSharingAndNonSharing() { } try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -551,7 +552,7 @@ public void scheduleMixedSharingAndNonSharing() { } try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 1)), false, Collections.emptyList()).get(); fail("Scheduler accepted too many tasks at the same time"); } catch (ExecutionException e) { @@ -564,8 +565,8 @@ public void scheduleMixedSharingAndNonSharing() { // release some isolated task and check that the sharing group may grow sA1.releaseSlot(); - SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); + SimpleSlot s1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_2); assertNotNull(s2_2); @@ -577,19 +578,19 @@ public void scheduleMixedSharingAndNonSharing() { assertEquals(1, scheduler.getNumberOfAvailableSlots()); // schedule one more no-shared task - SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false).get(); + SimpleSlot sB0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 0, 3)), false, Collections.emptyList()).get(); assertNotNull(sB0); // release the last of the original shared slots and allocate one more non-shared slot s2_1.releaseSlot(); - SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false).get(); + SimpleSlot sB2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidB, 2, 3)), false, Collections.emptyList()).get(); assertNotNull(sB2); // release on non-shared and add some shared slots sA2.releaseSlot(); - SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); - SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); + SimpleSlot s1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); assertNotNull(s1_3); assertNotNull(s2_3); @@ -599,8 +600,8 @@ public void scheduleMixedSharingAndNonSharing() { s1_3.releaseSlot(); s2_3.releaseSlot(); - SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false).get(); - SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false).get(); + SimpleSlot sC0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 1, 2)), false, Collections.emptyList()).get(); + SimpleSlot sC1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jidC, 0, 2)), false, Collections.emptyList()).get(); assertNotNull(sC0); assertNotNull(sC1); @@ -648,8 +649,8 @@ public void testLocalizedAssignment1() { // schedule one to each instance - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); assertNotNull(s1); assertNotNull(s2); @@ -658,8 +659,8 @@ public void testLocalizedAssignment1() { assertEquals(1, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); assertNotNull(s3); assertNotNull(s4); @@ -701,8 +702,8 @@ public void testLocalizedAssignment2() { // schedule one to each instance - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); assertNotNull(s1); assertNotNull(s2); @@ -711,8 +712,8 @@ public void testLocalizedAssignment2() { assertEquals(2, i2.getNumberOfAvailableSlots()); // schedule one from the other group to each instance - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, loc2), sharingGroup), false, Collections.singleton(loc2)).get(); assertNotNull(s3); assertNotNull(s4); @@ -752,14 +753,14 @@ public void testLocalizedAssignment3() { scheduler.newInstanceAvailable(i2); // schedule until the one instance is full - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false).get(); - SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false).get(); - SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); // schedule two more with preference of same instance --> need to go to other instance - SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false).get(); - SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false).get(); + SimpleSlot s5 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 3, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); + SimpleSlot s6 = scheduler.allocateSlot(new ScheduledUnit(getTestVertexWithLocation(jid2, 4, 4, loc1), sharingGroup), false, Collections.singleton(loc1)).get(); assertNotNull(s1); assertNotNull(s2); @@ -805,19 +806,19 @@ public void testSequentialAllocateAndRelease() { scheduler.newInstanceAvailable(getRandomInstance(4)); // allocate something from group 1 and 2 interleaved with schedule for group 3 - SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false).get(); - SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false).get(); + SimpleSlot slot_1_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_1_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false).get(); - SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false).get(); + SimpleSlot slot_2_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_2_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 4), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get(); + SimpleSlot slot_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false).get(); - SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false).get(); + SimpleSlot slot_1_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_1_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4), sharingGroup), false, Collections.emptyList()).get(); - SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false).get(); - SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false).get(); + SimpleSlot slot_2_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_2_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 4), sharingGroup), false, Collections.emptyList()).get(); // release groups 1 and 2 @@ -833,10 +834,10 @@ public void testSequentialAllocateAndRelease() { // allocate group 4 - SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get(); - SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get(); - SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get(); - SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get(); + SimpleSlot slot_4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot slot_4_4 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get(); // release groups 3 and 4 @@ -887,7 +888,7 @@ public void testConcurrentAllocateAndRelease() { @Override public void run() { try { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, enumerator4.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get(); sleepUninterruptibly(rnd.nextInt(5)); slot.releaseSlot(); @@ -910,7 +911,7 @@ public void run() { public void run() { try { if (flag3.compareAndSet(false, true)) { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1), sharingGroup), false, Collections.emptyList()).get(); sleepUninterruptibly(5); @@ -939,7 +940,7 @@ public void run() { @Override public void run() { try { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, enumerator2.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -966,7 +967,7 @@ public void run() { @Override public void run() { try { - SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false).get(); + SimpleSlot slot = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, enumerator1.getAndIncrement(), 4), sharingGroup), false, Collections.emptyList()).get(); // wait a bit till scheduling the successor sleepUninterruptibly(rnd.nextInt(5)); @@ -1041,27 +1042,27 @@ public void testDopIncreases() { scheduler.newInstanceAvailable(getRandomInstance(4)); // schedule one task for the first and second vertex - SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false).get(); - SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false).get(); + SimpleSlot s1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 1), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1), sharingGroup), false, Collections.emptyList()).get(); assertTrue( s1.getParent() == s2.getParent() ); assertEquals(3, scheduler.getNumberOfAvailableSlots()); - SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false).get(); - SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false).get(); - SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false).get(); - SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false).get(); + SimpleSlot s3_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 1, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_0 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_1 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4), sharingGroup), false, Collections.emptyList()).get(); s1.releaseSlot(); s2.releaseSlot(); - SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false).get(); - SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false).get(); - SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false).get(); - SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false).get(); + SimpleSlot s3_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 2, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s3_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 3, 5), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_2 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4), sharingGroup), false, Collections.emptyList()).get(); + SimpleSlot s4_3 = scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4), sharingGroup), false, Collections.emptyList()).get(); try { - scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false).get(); + scheduler.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 4, 5), sharingGroup), false, Collections.emptyList()).get(); fail("should throw an exception"); } catch (ExecutionException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index c7d0f09d3d149..98dca034704d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmanager.scheduler; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -106,20 +107,26 @@ public static Execution getTestVertex(TaskManagerLocation... preferredLocations) public static Execution getTestVertex(Iterable preferredLocations) { - ExecutionVertex vertex = mock(ExecutionVertex.class); - Collection> preferredLocationFutures = new ArrayList<>(4); for (TaskManagerLocation preferredLocation : preferredLocations) { preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation)); } + + return getTestVertex(preferredLocationFutures); + } + + public static Execution getTestVertex(Collection> preferredLocationFutures) { + ExecutionVertex vertex = mock(ExecutionVertex.class); + when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures); when(vertex.getJobId()).thenReturn(new JobID()); when(vertex.toString()).thenReturn("TEST-VERTEX"); - + Execution execution = mock(Execution.class); when(execution.getVertex()).thenReturn(vertex); - + when(execution.calculatePreferredLocations(any(LocationPreferenceConstraint.class))).thenCallRealMethod(); + return execution; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java new file mode 100644 index 0000000000000..60dddbb8b120c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; + +import java.net.InetAddress; + +/** + * Dummy local task manager location for testing purposes. + */ +public class LocalTaskManagerLocation extends TaskManagerLocation { + + private static final long serialVersionUID = 2396142513336559461L; + + public LocalTaskManagerLocation() { + super(ResourceID.generate(), InetAddress.getLoopbackAddress(), -1); + } +}