Skip to content

Commit

Permalink
[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling
Browse files Browse the repository at this point in the history
The LocationPreferenceConstraint defines whether all or any preferred locations
have to be taken into consideration when scheduling tasks. Especially for batch
jobs where we do lazy scheduling not all input locations might be known for a
consumer task. Therefore, we set the location preference constraint to any which
means that only those location are taken into consideration which are known at
scheduling time.

[FLINK-7153] Add test cases

Replace AtomicReference with AtomicReferenceUpdater

Fix

Use static imports Preconditions.checkNotNull

Initialize ANY array with number of returned futures

Revert formatting changes in Scheduler

Set flink-runtime log level in log4j-test.properties to OFF

Revert changes to ExecutionVertex#getPreferredLocationsBasedOnInputs

Fix failing FailoverRegionTest

This closes apache#4916.
  • Loading branch information
tillrohrmann committed Nov 2, 2017
1 parent c73b2fe commit 3b0fb26
Show file tree
Hide file tree
Showing 25 changed files with 972 additions and 327 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.state.SharedStateRegistry;
Expand Down Expand Up @@ -853,11 +854,14 @@ public void scheduleForExecution() throws JobException {
}
}

private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
private void scheduleLazy(SlotProvider slotProvider) {
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
ejv.scheduleAll(
slotProvider,
allowQueuedScheduling,
LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty
}
}
}
Expand All @@ -884,7 +888,10 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) {
// allocate the slots (obtain all their futures
for (ExecutionJobVertex ejv : getVerticesTopologically()) {
// these calls are not blocking, they only return futures
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
slotProvider,
queued,
LocationPreferenceConstraint.ALL);

allAllocationFutures.addAll(allocationFutures);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.types.Either;
Expand Down Expand Up @@ -455,14 +456,24 @@ public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult>
//---------------------------------------------------------------------------------------------
// Actions
//---------------------------------------------------------------------------------------------

public void scheduleAll(SlotProvider slotProvider, boolean queued) {

/**
* Schedules all execution vertices of this ExecutionJobVertex.
*
* @param slotProvider to allocate the slots from
* @param queued if the allocations can be queued
* @param locationPreferenceConstraint constraint for the location preferences
*/
public void scheduleAll(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {

final ExecutionVertex[] vertices = this.taskVertices;

// kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(slotProvider, queued);
ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint);
}
}

Expand All @@ -474,8 +485,13 @@ public void scheduleAll(SlotProvider slotProvider, boolean queued) {
* <p>If this method throws an exception, it makes sure to release all so far requested slots.
*
* @param resourceProvider The resource provider from whom the slots are requested.
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences
*/
public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
SlotProvider resourceProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
final ExecutionVertex[] vertices = this.taskVertices;
final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];

Expand All @@ -484,7 +500,10 @@ public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProv
for (int i = 0; i < vertices.length; i++) {
// allocate the next slot (future)
final Execution exec = vertices[i].getCurrentExecutionAttempt();
final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
resourceProvider,
queued,
locationPreferenceConstraint);
slots[i] = allocationFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.EvictingBoundedList;
Expand Down Expand Up @@ -487,7 +488,8 @@ public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsB
return Collections.emptySet();
}
else {
Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());

// go over all inputs
for (int i = 0; i < inputEdges.length; i++) {
Expand All @@ -497,17 +499,26 @@ public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsB
// go over all input sources
for (int k = 0; k < sources.length; k++) {
// look-up assigned slot of input source
CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
inputLocations.add(taskManagerLocationFuture);

CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
// add input location
inputLocations.add(locationFuture);
// inputs which have too many distinct sources are not considered
if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
return Collections.emptyList();
inputLocations.clear();
break;
}
}
}
// keep the locations of the input with the least preferred locations
if (locations.isEmpty() || // nothing assigned yet
(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
// current input has fewer preferred locations
locations.clear();
locations.addAll(inputLocations);
}
}

return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations;
return locations.isEmpty() ? Collections.emptyList() : locations;
}
}

Expand Down Expand Up @@ -587,8 +598,22 @@ public Execution resetForNewExecution(final long timestamp, final long originati
}
}

public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
return this.currentExecution.scheduleForExecution(slotProvider, queued);
/**
* Schedules the current execution of this ExecutionVertex.
*
* @param slotProvider to allocate the slots from
* @param queued if the allocation can be queued
* @param locationPreferenceConstraint constraint for the location preferences
* @return
*/
public boolean scheduleForExecution(
SlotProvider slotProvider,
boolean queued,
LocationPreferenceConstraint locationPreferenceConstraint) {
return this.currentExecution.scheduleForExecution(
slotProvider,
queued,
locationPreferenceConstraint);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;

Expand Down Expand Up @@ -216,8 +217,9 @@ private void restart(long globalModVersionOfFailover) {
for (ExecutionVertex ev : connectedExecutionVertexes) {
try {
ev.scheduleForExecution(
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed());
executionGraph.getSlotProvider(),
executionGraph.isQueuedSchedulingAllowed(),
LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover region might have failed concurrently
}
catch (Throwable e) {
failover(globalModVersionOfFailover);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1002,14 +1002,12 @@ public boolean returnAllocatedSlot(Slot slot) {
}

@Override
public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
Collection<CompletableFuture<TaskManagerLocation>> locationPreferenceFutures =
task.getTaskToExecute().getVertex().getPreferredLocations();
public CompletableFuture<SimpleSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {

CompletableFuture<Collection<TaskManagerLocation>> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);

return locationPreferencesFuture.thenCompose(
locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.runtime.instance;

import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -40,7 +42,11 @@ public interface SlotProvider {
*
* @param task The task to allocate the slot for
* @param allowQueued Whether allow the task be queued if we do not have enough resource
* @param preferredLocations preferred locations for the slot allocation
* @return The future of the allocation
*/
CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
CompletableFuture<SimpleSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmanager.scheduler;

/**
* Defines the location preference constraint.
*
* <p> Currently, we support that all input locations have to be taken into consideration
* and only those which are known at scheduling time. Note that if all input locations
* are considered, then the scheduling operation can potentially take a while until all
* inputs have locations assigned.
*/
public enum LocationPreferenceConstraint {
ALL, // wait for all inputs to have a location assigned
ANY // only consider those inputs who already have a location assigned
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@

package org.apache.flink.runtime.jobmanager.scheduler;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -32,31 +52,9 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;

import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
*
Expand Down Expand Up @@ -135,31 +133,29 @@ public void shutdown() {


@Override
public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
public CompletableFuture<SimpleSlot> allocateSlot(
ScheduledUnit task,
boolean allowQueued,
Collection<TaskManagerLocation> preferredLocations) {

CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
try {
final Object ret = scheduleTask(task, allowQueued, preferredLocations);

return preferredLocationsFuture.thenCompose(
preferredLocations -> {
try {
final Object ret = scheduleTask(task, allowQueued, preferredLocations);

if (ret instanceof SimpleSlot) {
return CompletableFuture.completedFuture((SimpleSlot) ret);
} else if (ret instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
return typed;
} else {
// this should never happen, simply guard this case with an exception
throw new RuntimeException();
}
} catch (NoResourceAvailableException e) {
throw new CompletionException(e);
}
if (ret instanceof SimpleSlot) {
return CompletableFuture.completedFuture((SimpleSlot) ret);
}
else if (ret instanceof CompletableFuture) {
@SuppressWarnings("unchecked")
CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
return typed;
}
);
else {
// this should never happen, simply guard this case with an exception
throw new RuntimeException();
}
} catch (NoResourceAvailableException e) {
return FutureUtils.completedExceptionally(e);
}
}

/**
Expand Down
Loading

0 comments on commit 3b0fb26

Please sign in to comment.