diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md index 476e16ede6fb1..3906a949b497d 100644 --- a/docs/ops/deployment/mesos.md +++ b/docs/ops/deployment/mesos.md @@ -193,7 +193,6 @@ The job graph file may be generated like this way: {% highlight java %} final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); -jobGraph.setAllowQueuedScheduling(true); final String jobGraphFilename = "job.graph"; File jobGraphFile = new File(jobGraphFilename); try (FileOutputStream output = new FileOutputStream(jobGraphFile); @@ -203,8 +202,7 @@ try (FileOutputStream output = new FileOutputStream(jobGraphFile); {% endhighlight %} Note: -1. Before serializing the job graph, please make sure to enable queued scheduling because slots need to be allocated lazily -2. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory) +1. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory) #### General configuration diff --git a/docs/ops/deployment/mesos.zh.md b/docs/ops/deployment/mesos.zh.md index 88874de943318..fec999acb177e 100644 --- a/docs/ops/deployment/mesos.zh.md +++ b/docs/ops/deployment/mesos.zh.md @@ -193,7 +193,6 @@ The job graph file may be generated like this way: {% highlight java %} final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); -jobGraph.setAllowQueuedScheduling(true); final String jobGraphFilename = "job.graph"; File jobGraphFile = new File(jobGraphFilename); try (FileOutputStream output = new FileOutputStream(jobGraphFile); @@ -203,8 +202,7 @@ try (FileOutputStream output = new FileOutputStream(jobGraphFile); {% endhighlight %} Note: -1. Before serializing the job graph, please make sure to enable queued scheduling because slots need to be allocated lazily -2. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory) +1. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory) #### General configuration diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index d866a4f101094..b1d33305a44a4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -115,8 +115,6 @@ public JobExecutionResult executePlan( baseConfiguration, 1); - jobGraph.setAllowQueuedScheduling(true); - try (final JobExecutorService executorService = createJobExecutorService(jobGraph, baseConfiguration)) { return executorService.executeJobBlocking(jobGraph); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 7352dd4c40711..27676556b991a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -278,9 +278,6 @@ public CompletableFuture requestJobResult(@Nonnull JobID jobId) { */ @Override public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { - // we have to enable queued scheduling because slot will be allocated lazily - jobGraph.setAllowQueuedScheduling(true); - CompletableFuture jobGraphFileFuture = CompletableFuture.supplyAsync(() -> { try { final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin"); diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 6cb2d84a3a06f..57808e3328506 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -101,7 +101,6 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept configuration, defaultParallelism, jobId); - jobGraph.setAllowQueuedScheduling(true); jobGraph.setSavepointRestoreSettings(savepointRestoreSettings); return jobGraph; diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 99860b2b91f03..fae214d01c1df 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -248,8 +248,6 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) { "This indicates that non-serializable types (like custom serializers) were registered"); } - graph.setAllowQueuedScheduling(false); - // add vertices to the graph for (JobVertex vertex : this.vertices.values()) { vertex.setInputDependencyConstraint(program.getOriginalPlan().getExecutionConfig().getDefaultInputDependencyConstraint()); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index baae701c89c10..dcce5a9e94169 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -98,11 +98,7 @@ protected CompletableFuture handleRequest( return jobGraph; }); - CompletableFuture jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> { - // we have to enable queued scheduling because slots will be allocated lazily - jobGraph.setAllowQueuedScheduling(true); - return gateway.submitJob(jobGraph, timeout); - }); + CompletableFuture jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout)); return jobSubmissionFuture .thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(jobGraph.getJobID())); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index d99d6a3c1d1b4..2db95ecbf7510 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -440,14 +440,7 @@ public CompletableFuture scheduleForExecution( locationPreferenceConstraint, allPreviousExecutionGraphAllocationIds); - final CompletableFuture deploymentFuture; - - if (allocationFuture.isDone() || slotProviderStrategy.isQueuedSchedulingAllowed()) { - deploymentFuture = allocationFuture.thenRun(ThrowingRunnable.unchecked(this::deploy)); - } else { - deploymentFuture = FutureUtils.completedExceptionally( - new IllegalArgumentException("The slot allocation future has not been completed yet.")); - } + final CompletableFuture deploymentFuture = allocationFuture.thenRun(ThrowingRunnable.unchecked(this::deploy)); deploymentFuture.whenComplete( (Void ignored, Throwable failure) -> { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 25a4a801c3a14..e2132423a5d12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -268,10 +268,6 @@ public class ExecutionGraph implements AccessExecutionGraph { // ------ Configuration of the Execution ------- - /** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able - * to deploy them immediately. */ - private final boolean allowQueuedScheduling; - /** The mode of scheduling. Decides how to select the initial set of tasks to be deployed. * May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking * from results than need to be materialized. */ @@ -440,8 +436,7 @@ public ExecutionGraph( jobInformation.getJobId(), NettyShuffleMaster.INSTANCE, ignored -> Optional.empty()), - ScheduleMode.LAZY_FROM_SOURCES, - false); + ScheduleMode.LAZY_FROM_SOURCES); } public ExecutionGraph( @@ -459,8 +454,7 @@ public ExecutionGraph( PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory, ShuffleMaster shuffleMaster, JobMasterPartitionTracker partitionTracker, - ScheduleMode scheduleMode, - boolean allowQueuedScheduling) throws IOException { + ScheduleMode scheduleMode) throws IOException { this.jobInformation = Preconditions.checkNotNull(jobInformation); @@ -468,8 +462,6 @@ public ExecutionGraph( this.scheduleMode = checkNotNull(scheduleMode); - this.allowQueuedScheduling = allowQueuedScheduling; - this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter); this.futureExecutor = Preconditions.checkNotNull(futureExecutor); @@ -478,8 +470,7 @@ public ExecutionGraph( this.slotProviderStrategy = SlotProviderStrategy.from( scheduleMode, slotProvider, - allocationTimeout, - allowQueuedScheduling); + allocationTimeout); this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader"); this.tasks = new ConcurrentHashMap<>(16); @@ -541,10 +532,6 @@ public int getNumberOfExecutionJobVertices() { return this.verticesInCreationOrder.size(); } - public boolean isQueuedSchedulingAllowed() { - return this.allowQueuedScheduling; - } - public SchedulingTopology getSchedulingTopology() { return executionTopology; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 090b5b412be30..6b0e389db5793 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -180,8 +180,7 @@ public static ExecutionGraph buildGraph( partitionReleaseStrategyFactory, shuffleMaster, partitionTracker, - jobGraph.getScheduleMode(), - jobGraph.getAllowQueuedScheduling()); + jobGraph.getScheduleMode()); } catch (IOException e) { throw new JobException("Could not create the ExecutionGraph.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SlotProviderStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SlotProviderStrategy.java index 830b0522201b5..cd1eac533f227 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SlotProviderStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SlotProviderStrategy.java @@ -39,15 +39,8 @@ public abstract class SlotProviderStrategy { protected final SlotProvider slotProvider; - protected final boolean allowQueuedScheduling; - - SlotProviderStrategy(SlotProvider slotProvider, boolean allowQueuedScheduling) { + SlotProviderStrategy(SlotProvider slotProvider) { this.slotProvider = Preconditions.checkNotNull(slotProvider); - this.allowQueuedScheduling = allowQueuedScheduling; - } - - boolean isQueuedSchedulingAllowed() { - return allowQueuedScheduling; } /** @@ -80,15 +73,14 @@ public void cancelSlotRequest( public static SlotProviderStrategy from( ScheduleMode scheduleMode, SlotProvider slotProvider, - Time allocationTimeout, - boolean allowQueuedScheduling) { + Time allocationTimeout) { switch (scheduleMode) { case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST: - return new BatchSlotProviderStrategy(slotProvider, allowQueuedScheduling); + return new BatchSlotProviderStrategy(slotProvider); case LAZY_FROM_SOURCES: case EAGER: - return new NormalSlotProviderStrategy(slotProvider, allocationTimeout, allowQueuedScheduling); + return new NormalSlotProviderStrategy(slotProvider, allocationTimeout); default: throw new IllegalArgumentException(String.format("Unknown scheduling mode: %s", scheduleMode)); } @@ -100,27 +92,27 @@ SlotProvider asSlotProvider() { static class BatchSlotProviderStrategy extends SlotProviderStrategy { - BatchSlotProviderStrategy(SlotProvider slotProvider, boolean allowQueuedScheduling) { - super(slotProvider, allowQueuedScheduling); + BatchSlotProviderStrategy(SlotProvider slotProvider) { + super(slotProvider); } @Override public CompletableFuture allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile) { - return slotProvider.allocateBatchSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling); + return slotProvider.allocateBatchSlot(slotRequestId, scheduledUnit, slotProfile); } } static class NormalSlotProviderStrategy extends SlotProviderStrategy { private final Time allocationTimeout; - NormalSlotProviderStrategy(SlotProvider slotProvider, Time allocationTimeout, boolean allowQueuedScheduling) { - super(slotProvider, allowQueuedScheduling); + NormalSlotProviderStrategy(SlotProvider slotProvider, Time allocationTimeout) { + super(slotProvider); this.allocationTimeout = Preconditions.checkNotNull(allocationTimeout); } @Override public CompletableFuture allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile) { - return slotProvider.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout); + return slotProvider.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 31564d4ff599c..f63a5184a210d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -73,9 +73,6 @@ public class JobGraph implements Serializable { /** Name of this job. */ private final String jobName; - /** flag to enable queued scheduling */ - private boolean allowQueuedScheduling; - /** The mode in which the job is scheduled */ private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES; @@ -220,14 +217,6 @@ public SerializedValue getSerializedExecutionConfig() { return serializedExecutionConfig; } - public void setAllowQueuedScheduling(boolean allowQueuedScheduling) { - this.allowQueuedScheduling = allowQueuedScheduling; - } - - public boolean getAllowQueuedScheduling() { - return allowQueuedScheduling; - } - public void setScheduleMode(ScheduleMode scheduleMode) { this.scheduleMode = scheduleMode; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java index 4bf8eab254106..ad318858d0196 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java @@ -110,13 +110,11 @@ public CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, - boolean allowQueuedScheduling, Time allocationTimeout) { return allocateSlotInternal( slotRequestId, scheduledUnit, slotProfile, - allowQueuedScheduling, allocationTimeout); } @@ -124,13 +122,11 @@ public CompletableFuture allocateSlot( public CompletableFuture allocateBatchSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, - SlotProfile slotProfile, - boolean allowQueuedScheduling) { + SlotProfile slotProfile) { return allocateSlotInternal( slotRequestId, scheduledUnit, slotProfile, - allowQueuedScheduling, null); } @@ -139,7 +135,6 @@ private CompletableFuture allocateSlotInternal( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, - boolean allowQueuedScheduling, @Nullable Time allocationTimeout) { log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getTaskToExecute()); @@ -151,7 +146,6 @@ private CompletableFuture allocateSlotInternal( slotRequestId, scheduledUnit, slotProfile, - allowQueuedScheduling, allocationTimeout); return allocationResultFuture; } @@ -161,11 +155,10 @@ private void internalAllocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, - boolean allowQueuedScheduling, Time allocationTimeout) { CompletableFuture allocationFuture = scheduledUnit.getSlotSharingGroupId() == null ? - allocateSingleSlot(slotRequestId, slotProfile, allowQueuedScheduling, allocationTimeout) : - allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout); + allocateSingleSlot(slotRequestId, slotProfile, allocationTimeout) : + allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout); allocationFuture.whenComplete((LogicalSlot slot, Throwable failure) -> { if (failure != null) { @@ -180,7 +173,6 @@ private void internalAllocateSlot( slotRequestId, scheduledUnit, slotProfile, - allowQueuedScheduling, allocationTimeout); } else { cancelSlotRequest( @@ -223,7 +215,6 @@ public void returnLogicalSlot(LogicalSlot logicalSlot) { private CompletableFuture allocateSingleSlot( SlotRequestId slotRequestId, SlotProfile slotProfile, - boolean allowQueuedScheduling, @Nullable Time allocationTimeout) { Optional slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile); @@ -236,7 +227,7 @@ private CompletableFuture allocateSingleSlot( } catch (FlinkException e) { return FutureUtils.completedExceptionally(e); } - } else if (allowQueuedScheduling) { + } else { // we allocate by requesting a new slot return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout) .thenApply((PhysicalSlot allocatedSlot) -> { @@ -246,10 +237,6 @@ private CompletableFuture allocateSingleSlot( throw new CompletionException(e); } }); - } else { - // failed to allocate - return FutureUtils.completedExceptionally( - new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.')); } } @@ -318,7 +305,6 @@ private CompletableFuture allocateSharedSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, - boolean allowQueuedScheduling, @Nullable Time allocationTimeout) { // allocate slot with slot sharing final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent( @@ -335,14 +321,12 @@ private CompletableFuture allocateSharedSlot( scheduledUnit.getCoLocationConstraint(), multiTaskSlotManager, slotProfile, - allowQueuedScheduling, allocationTimeout); } else { multiTaskSlotLocality = allocateMultiTaskSlot( scheduledUnit.getJobVertexId(), multiTaskSlotManager, slotProfile, - allowQueuedScheduling, allocationTimeout); } } catch (NoResourceAvailableException noResourceException) { @@ -363,13 +347,11 @@ private CompletableFuture allocateSharedSlot( /** * Allocates a co-located {@link SlotSharingManager.MultiTaskSlot} for the given {@link CoLocationConstraint}. * - *

If allowQueuedScheduling is true, then the returned {@link SlotSharingManager.MultiTaskSlot} can be - * uncompleted. + *

The returned {@link SlotSharingManager.MultiTaskSlot} can be uncompleted. * * @param coLocationConstraint for which to allocate a {@link SlotSharingManager.MultiTaskSlot} * @param multiTaskSlotManager responsible for the slot sharing group for which to allocate the slot * @param slotProfile specifying the requirements for the requested slot - * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false * @param allocationTimeout timeout before the slot allocation times out * @return A {@link SlotAndLocality} which contains the allocated{@link SlotSharingManager.MultiTaskSlot} * and its locality wrt the given location preferences @@ -378,7 +360,6 @@ private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot( CoLocationConstraint coLocationConstraint, SlotSharingManager multiTaskSlotManager, SlotProfile slotProfile, - boolean allowQueuedScheduling, @Nullable Time allocationTimeout) throws NoResourceAvailableException { final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId(); @@ -415,7 +396,6 @@ private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot( coLocationConstraint.getGroupId(), multiTaskSlotManager, slotProfile, - allowQueuedScheduling, allocationTimeout); // check whether we fulfill the co-location constraint @@ -466,12 +446,11 @@ private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot( * Allocates a {@link SlotSharingManager.MultiTaskSlot} for the given groupId which is in the * slot sharing group for which the given {@link SlotSharingManager} is responsible. * - *

If allowQueuedScheduling is true, then the method can return an uncompleted {@link SlotSharingManager.MultiTaskSlot}. + *

The method can return an uncompleted {@link SlotSharingManager.MultiTaskSlot}. * * @param groupId for which to allocate a new {@link SlotSharingManager.MultiTaskSlot} * @param slotSharingManager responsible for the slot sharing group for which to allocate the slot * @param slotProfile slot profile that specifies the requirements for the slot - * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false * @param allocationTimeout timeout before the slot allocation times out; null if requesting a batch slot * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated {@link SlotSharingManager.MultiTaskSlot} * and its locality wrt the given location preferences @@ -480,8 +459,7 @@ private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot( AbstractID groupId, SlotSharingManager slotSharingManager, SlotProfile slotProfile, - boolean allowQueuedScheduling, - @Nullable Time allocationTimeout) throws NoResourceAvailableException { + @Nullable Time allocationTimeout) { Collection resolvedRootSlotsInfo = slotSharingManager.listResolvedRootSlotInfo(groupId); @@ -533,48 +511,44 @@ private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot( return multiTaskSlotLocality; } - if (allowQueuedScheduling) { - // there is no slot immediately available --> check first for uncompleted slots at the slot sharing group - SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId); + // there is no slot immediately available --> check first for uncompleted slots at the slot sharing group + SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId); - if (multiTaskSlot == null) { - // it seems as if we have to request a new slot from the resource manager, this is always the last resort!!! - final CompletableFuture slotAllocationFuture = requestNewAllocatedSlot( - allocatedSlotRequestId, - slotProfile, - allocationTimeout); + if (multiTaskSlot == null) { + // it seems as if we have to request a new slot from the resource manager, this is always the last resort!!! + final CompletableFuture slotAllocationFuture = requestNewAllocatedSlot( + allocatedSlotRequestId, + slotProfile, + allocationTimeout); - multiTaskSlot = slotSharingManager.createRootSlot( - multiTaskSlotRequestId, - slotAllocationFuture, - allocatedSlotRequestId); + multiTaskSlot = slotSharingManager.createRootSlot( + multiTaskSlotRequestId, + slotAllocationFuture, + allocatedSlotRequestId); - slotAllocationFuture.whenComplete( - (PhysicalSlot allocatedSlot, Throwable throwable) -> { - final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId); - - if (taskSlot != null) { - // still valid - if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) { - taskSlot.release(throwable); - } else { - if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) { - taskSlot.release(new FlinkException("Could not assign payload to allocated slot " + - allocatedSlot.getAllocationId() + '.')); - } - } + slotAllocationFuture.whenComplete( + (PhysicalSlot allocatedSlot, Throwable throwable) -> { + final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId); + + if (taskSlot != null) { + // still valid + if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) { + taskSlot.release(throwable); } else { - slotPool.releaseSlot( - allocatedSlotRequestId, - new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.')); + if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) { + taskSlot.release(new FlinkException("Could not assign payload to allocated slot " + + allocatedSlot.getAllocationId() + '.')); + } } - }); - } - - return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN); + } else { + slotPool.releaseSlot( + allocatedSlotRequestId, + new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.')); + } + }); } - throw new NoResourceAvailableException("Could not allocate a shared slot for " + groupId + '.'); + return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN); } private void releaseSharedSlot( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java index 6f3071bc0e148..36da2c7c73ce7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotProvider.java @@ -48,7 +48,6 @@ public interface SlotProvider { * @param slotRequestId identifying the slot request * @param scheduledUnit The task to allocate the slot for * @param slotProfile profile of the requested slot - * @param allowQueuedScheduling Whether allow the task be queued if we do not have enough resource * @param allocationTimeout after which the allocation fails with a timeout exception * @return The future of the allocation */ @@ -56,7 +55,6 @@ CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, - boolean allowQueuedScheduling, Time allocationTimeout); /** @@ -65,14 +63,12 @@ CompletableFuture allocateSlot( * @param slotRequestId identifying the slot request * @param scheduledUnit The task to allocate the slot for * @param slotProfile profile of the requested slot - * @param allowQueuedScheduling Whether allow the task be queued if we do not have enough resource * @return The future of the allocation */ default CompletableFuture allocateBatchSlot( SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, - SlotProfile slotProfile, - boolean allowQueuedScheduling) { + SlotProfile slotProfile) { throw new UnsupportedOperationException("Not properly implemented."); } @@ -80,21 +76,18 @@ default CompletableFuture allocateBatchSlot( * Allocating slot with specific requirement. * * @param scheduledUnit The task to allocate the slot for - * @param allowQueued Whether allow the task be queued if we do not have enough resource * @param slotProfile profile of the requested slot * @param allocationTimeout after which the allocation fails with a timeout exception * @return The future of the allocation */ default CompletableFuture allocateSlot( ScheduledUnit scheduledUnit, - boolean allowQueued, SlotProfile slotProfile, Time allocationTimeout) { return allocateSlot( new SlotRequestId(), scheduledUnit, slotProfile, - allowQueued, allocationTimeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 7899b540a80fa..c082c6d3c2a8f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -653,21 +653,13 @@ public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionEx public CompletableFuture submitJob(JobGraph jobGraph) { final CompletableFuture dispatcherGatewayFuture = getDispatcherGatewayFuture(); - - // we have to allow queued scheduling in Flip-6 mode because we need to request slots - // from the ResourceManager - jobGraph.setAllowQueuedScheduling(true); - final CompletableFuture blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture); - final CompletableFuture jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph); - final CompletableFuture acknowledgeCompletableFuture = jarUploadFuture .thenCombine( dispatcherGatewayFuture, (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout)) .thenCompose(Function.identity()); - return acknowledgeCompletableFuture.thenApply( (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index 5de8286cb3ae0..82b949502f3ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -81,8 +81,7 @@ public SchedulerNG createInstance( final SlotProviderStrategy slotProviderStrategy = SlotProviderStrategy.from( jobGraph.getScheduleMode(), slotProvider, - slotRequestTimeout, - true); + slotRequestTimeout); return new DefaultScheduler( log, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index ea288f08863bd..a99f1c8579291 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -141,7 +141,6 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing( final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobVertex) .setRpcTimeout(timeout) .setAllocationTimeout(timeout) - .allowQueuedScheduling() .build(); executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java index 6b137be6dcf58..7e614e6e9f62d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java @@ -141,7 +141,6 @@ public void setup() throws Exception { testVertex.setInvokableClass(NoOpInvokable.class); jobId = new JobID(); jobGraph = new JobGraph(jobId, "testJob", testVertex); - jobGraph.setAllowQueuedScheduling(true); configuration = new Configuration(); configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index f136a8cdfb6e8..ba952ba96cecd 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -164,7 +164,6 @@ public void setUp() throws Exception { final JobVertex testVertex = new JobVertex("testVertex"); testVertex.setInvokableClass(NoOpInvokable.class); jobGraph = new JobGraph(TEST_JOB_ID, "testJob", testVertex); - jobGraph.setAllowQueuedScheduling(true); fatalErrorHandler = new TestingFatalErrorHandler(); heartbeatServices = new HeartbeatServices(1000L, 10000L); @@ -322,7 +321,6 @@ public void testJobSubmissionWithPartialResourceConfigured() throws Exception { secondVertex.setInvokableClass(NoOpInvokable.class); JobGraph jobGraphWithTwoVertices = new JobGraph(TEST_JOB_ID, "twoVerticesJob", firstVertex, secondVertex); - jobGraphWithTwoVertices.setAllowQueuedScheduling(true); CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java index 85b88241544c5..410835fe29964 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.java @@ -219,10 +219,7 @@ public Dispatcher createDispatcher( private static JobGraph createJobGraph() { final JobVertex testVertex = new JobVertex("testVertex"); testVertex.setInvokableClass(NoOpInvokable.class); - final JobGraph testJob = new JobGraph(TEST_JOB_ID, "testJob", testVertex); - testJob.setAllowQueuedScheduling(true); - - return testJob; + return new JobGraph(TEST_JOB_ID, "testJob", testVertex); } private DispatcherRunner createDispatcherRunner() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java index 5fcc8866b2a2c..5081373a3d16f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/ZooKeeperDefaultDispatcherRunnerTest.java @@ -240,8 +240,6 @@ private JobGraph createJobGraphWithBlobs() throws IOException { vertex.setParallelism(1); final JobGraph jobGraph = new JobGraph("Test job graph", vertex); - jobGraph.setAllowQueuedScheduling(true); - final PermanentBlobKey permanentBlobKey = blobServer.putPermanent(jobGraph.getJobID(), new byte[256]); jobGraph.addUserJarBlobKey(permanentBlobKey); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java index 132647a6df155..92548be44cd15 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java @@ -69,7 +69,6 @@ public void testConstraintsAfterRestart() throws Exception { new TestRestartStrategy( 1, false)) - .allowQueuedScheduling() .build(); // enable the queued scheduling for the slot pool diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 78fa2aba777ef..439093718cc8f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -26,14 +26,12 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; @@ -78,8 +76,6 @@ import org.junit.Test; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; - import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; @@ -91,7 +87,6 @@ import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.function.Function; @@ -171,11 +166,12 @@ public void testBuildDeploymentDescriptor() { v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED); DirectScheduledExecutorService executor = new DirectScheduledExecutorService(); - ExecutionGraph eg = createExecutionGraphWithoutQueuedScheduling( - jobId, - new TestingSlotProvider(ignore -> new CompletableFuture<>()), - executor, - executor); + ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobId) + .setFutureExecutor(executor) + .setIoExecutor(executor) + .setSlotProvider(new TestingSlotProvider(ignore -> new CompletableFuture<>())) + .setBlobWriter(blobWriter) + .build(); eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); @@ -361,7 +357,7 @@ public void testAccumulatorsAndMetricsForwarding() throws Exception { } /** - * Verifies that {@link Execution#completeCancelling(Map, IOMetrics)} and {@link Execution#markFailed(Throwable, Map, IOMetrics)} + * Verifies that {@link Execution#completeCancelling(Map, IOMetrics, boolean)} and {@link Execution#markFailed(Throwable, Map, IOMetrics)} * store the given accumulators and metrics correctly. */ @Test @@ -448,7 +444,12 @@ public void testNoResourceAvailableFailure() throws Exception { DirectScheduledExecutorService directExecutor = new DirectScheduledExecutorService(); // execution graph that executes actions synchronously - ExecutionGraph eg = createExecutionGraphWithoutQueuedScheduling(jobId, slotProvider, directExecutor, TestingUtils.defaultExecutor()); + ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobId) + .setFutureExecutor(directExecutor) + .setIoExecutor(TestingUtils.defaultExecutor()) + .setSlotProvider(slotProvider) + .setBlobWriter(blobWriter) + .build(); eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); @@ -512,7 +513,13 @@ private Tuple2> setupExecutio DirectScheduledExecutorService executorService = new DirectScheduledExecutorService(); // execution graph that executes actions synchronously - ExecutionGraph eg = createExecutionGraphWithoutQueuedScheduling(new JobID(), slotProvider, executorService, TestingUtils.defaultExecutor()); + ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(new JobID()) + .setFutureExecutor(executorService) + .setIoExecutor(TestingUtils.defaultExecutor()) + .setSlotProvider(slotProvider) + .setBlobWriter(blobWriter) + .build(); + checkJobOffloaded(eg); eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); @@ -529,21 +536,6 @@ private Tuple2> setupExecutio return new Tuple2<>(eg, executions); } - @Nonnull - private ExecutionGraph createExecutionGraphWithoutQueuedScheduling( - JobID jobId, - SlotProvider slotProvider, - ScheduledExecutorService futureExecutor, - Executor ioExecutor) throws JobException, JobExecutionException { - return new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(jobId) - .setFutureExecutor(futureExecutor) - .setIoExecutor(ioExecutor) - .setSlotProvider(slotProvider) - .setBlobWriter(blobWriter) - .setAllowQueuedScheduling(false) - .build(); - } - @Test public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception { @@ -689,7 +681,6 @@ public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception { .setSlotProvider(slotProvider) .setFutureExecutor(new DirectScheduledExecutorService()) .setScheduleMode(ScheduleMode.EAGER) - .allowQueuedScheduling() .build(); executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphNotEnoughResourceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphNotEnoughResourceTest.java new file mode 100644 index 0000000000000..e57c0b25b3c1a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphNotEnoughResourceTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy; +import org.apache.flink.runtime.jobmaster.slotpool.Scheduler; +import org.apache.flink.runtime.jobmaster.slotpool.SchedulerImpl; +import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import static org.junit.Assert.assertEquals; + +/** + * Tests ExecutionGraph schedule behavior with not enough resource. + */ +public class ExecutionGraphNotEnoughResourceTest extends TestLogger { + + private static TestingComponentMainThreadExecutor.Resource mainThreadExecutorResource; + private static ComponentMainThreadExecutor mainThreadExecutor; + + + private static final JobID TEST_JOB_ID = new JobID(); + private static final int NUM_TASKS = 31; + + @BeforeClass + public static void setupClass() { + mainThreadExecutorResource = new TestingComponentMainThreadExecutor.Resource(); + mainThreadExecutorResource.before(); + mainThreadExecutor = mainThreadExecutorResource.getComponentMainThreadTestExecutor().getMainThreadExecutor(); + } + + @AfterClass + public static void teardownClass() { + mainThreadExecutorResource.after(); + } + + @Test + public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception { + final int numRestarts = 10; + final int parallelism = 20; + + SlotPool slotPool = null; + try { + slotPool = new TestingSlotPoolImpl(TEST_JOB_ID); + final Scheduler scheduler = createSchedulerWithSlots( + parallelism - 1, slotPool, new LocalTaskManagerLocation()); + + final SlotSharingGroup sharingGroup = new SlotSharingGroup(); + + final JobVertex source = new JobVertex("source"); + source.setInvokableClass(NoOpInvokable.class); + source.setParallelism(parallelism); + source.setSlotSharingGroup(sharingGroup); + + final JobVertex sink = new JobVertex("sink"); + sink.setInvokableClass(NoOpInvokable.class); + sink.setParallelism(parallelism); + sink.setSlotSharingGroup(sharingGroup); + sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); + + TestRestartStrategy restartStrategy = + new TestRestartStrategy(numRestarts, false); + + final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(TEST_JOB_ID, source, sink) + .setSlotProvider(scheduler) + .setRestartStrategy(restartStrategy) + .setScheduleMode(ScheduleMode.EAGER) + .setAllocationTimeout(Time.milliseconds(1L)) + .build(); + + eg.start(mainThreadExecutor); + + mainThreadExecutor.execute(ThrowingRunnable.unchecked(eg::scheduleForExecution)); + + CommonTestUtils.waitUntilCondition( + () -> CompletableFuture.supplyAsync(eg::getState, mainThreadExecutor).join() == JobStatus.FAILED, + Deadline.fromNow(Duration.ofMillis(2000))); + + // the last suppressed restart is also counted + assertEquals(numRestarts + 1, CompletableFuture.supplyAsync(eg::getNumberOfRestarts, mainThreadExecutor).join().longValue()); + + final Throwable t = CompletableFuture.supplyAsync(eg::getFailureCause, mainThreadExecutor).join(); + if (!(t instanceof NoResourceAvailableException)) { + ExceptionUtils.rethrowException(t, t.getMessage()); + } + } finally { + if (slotPool != null) { + CompletableFuture.runAsync(slotPool::close, mainThreadExecutor).join(); + } + } + } + + private static Scheduler createSchedulerWithSlots( + int numSlots, + SlotPool slotPool, + TaskManagerLocation taskManagerLocation) throws Exception { + final TaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + final String jobManagerAddress = "foobar"; + final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + slotPool.start(JobMasterId.generate(), jobManagerAddress, mainThreadExecutor); + slotPool.connectToResourceManager(resourceManagerGateway); + Scheduler scheduler = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.INSTANCE, slotPool); + scheduler.start(mainThreadExecutor); + + CompletableFuture.runAsync(() -> slotPool.registerTaskManager(taskManagerLocation.getResourceID()), mainThreadExecutor).join(); + + final List slotOffers = new ArrayList<>(NUM_TASKS); + for (int i = 0; i < numSlots; i++) { + final AllocationID allocationId = new AllocationID(); + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.ANY); + slotOffers.add(slotOffer); + } + + CompletableFuture.runAsync(() -> slotPool.offerSlots(taskManagerLocation, taskManagerGateway, slotOffers), mainThreadExecutor).join(); + + return scheduler; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index f102346bd599b..44a7758f633af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -46,7 +46,6 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.ScheduleMode; -import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; @@ -64,7 +63,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; -import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -565,7 +563,6 @@ public void slotPoolExecutionGraph_ConcurrentSchedulingAndAllocationFailure_Shou .setSlotProvider(slots) .setAllocationTimeout(Time.minutes(60)) .setScheduleMode(ScheduleMode.EAGER) - .setAllowQueuedScheduling(true) .build(); eg.start(mainThreadExecutor); @@ -632,52 +629,6 @@ public void testRestartWithEagerSchedulingAndSlotSharing() throws Exception { } } - @Test - public void testRestartWithSlotSharingAndNotEnoughResources() throws Exception { - final int numRestarts = 10; - final int parallelism = 20; - - try (SlotPool slotPool = createSlotPoolImpl()) { - final Scheduler scheduler = createSchedulerWithSlots( - parallelism - 1, slotPool, new LocalTaskManagerLocation()); - - final SlotSharingGroup sharingGroup = new SlotSharingGroup(); - - final JobVertex source = new JobVertex("source"); - source.setInvokableClass(NoOpInvokable.class); - source.setParallelism(parallelism); - source.setSlotSharingGroup(sharingGroup); - - final JobVertex sink = new JobVertex("sink"); - sink.setInvokableClass(NoOpInvokable.class); - sink.setParallelism(parallelism); - sink.setSlotSharingGroup(sharingGroup); - sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED_BOUNDED); - - TestRestartStrategy restartStrategy = - new TestRestartStrategy(numRestarts, false); - - final ExecutionGraph eg = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(TEST_JOB_ID, source, sink) - .setSlotProvider(scheduler) - .setRestartStrategy(restartStrategy) - .setScheduleMode(ScheduleMode.EAGER) - .build(); - - eg.start(mainThreadExecutor); - eg.scheduleForExecution(); - - // the last suppressed restart is also counted - assertEquals(numRestarts + 1, eg.getNumberOfRestarts()); - - assertEquals(JobStatus.FAILED, eg.getState()); - - final Throwable t = eg.getFailureCause(); - if (!(t instanceof NoResourceAvailableException)) { - ExceptionUtils.rethrowException(t, t.getMessage()); - } - } - } - /** * Tests that the {@link ExecutionGraph} can handle failures while * being in the RESTARTING state. @@ -689,7 +640,6 @@ public void testFailureWhileRestarting() throws Exception { final ExecutionGraph executionGraph = new ExecutionGraphTestUtils.TestingExecutionGraphBuilder(createJobGraph()) .setRestartStrategy(restartStrategy) .setSlotProvider(new TestingSlotProvider(ignored -> new CompletableFuture<>())) - .allowQueuedScheduling() .build(); executionGraph.start(mainThreadExecutor); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 0aaf857990926..5323edb503a32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -121,7 +121,6 @@ public void testScheduleSourceBeforeTarget() throws Exception { final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); jobGraph.setScheduleMode(ScheduleMode.EAGER); - jobGraph.setAllowQueuedScheduling(true); final CompletableFuture sourceFuture = new CompletableFuture<>(); final CompletableFuture targetFuture = new CompletableFuture<>(); @@ -195,7 +194,6 @@ public void testDeployPipelinedConnectedComponentsTogether() throws Exception { final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); jobGraph.setScheduleMode(ScheduleMode.EAGER); - jobGraph.setAllowQueuedScheduling(true); @SuppressWarnings({"unchecked", "rawtypes"}) final CompletableFuture[] sourceFutures = new CompletableFuture[parallelism]; @@ -292,7 +290,6 @@ public void testOneSlotFailureAbortsDeploy() throws Exception { final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex); jobGraph.setScheduleMode(ScheduleMode.EAGER); - jobGraph.setAllowQueuedScheduling(true); // // Create the slots, futures, and the slot provider @@ -376,7 +373,6 @@ public void testEagerSchedulingWithSlotTimeout() throws Exception { final JobID jobId = new JobID(); final JobGraph jobGraph = new JobGraph(jobId, "test", vertex); jobGraph.setScheduleMode(ScheduleMode.EAGER); - jobGraph.setAllowQueuedScheduling(true); final BlockingQueue returnedSlots = new ArrayBlockingQueue<>(2); final TestingSlotOwner slotOwner = new TestingSlotOwner(); @@ -436,7 +432,6 @@ public void testSchedulingOperationCancellationWhenCancel() throws Exception { jobVertex.setParallelism(2); final JobGraph jobGraph = new JobGraph(jobVertex); jobGraph.setScheduleMode(ScheduleMode.EAGER); - jobGraph.setAllowQueuedScheduling(true); final CompletableFuture slotFuture1 = new CompletableFuture<>(); final CompletableFuture slotFuture2 = new CompletableFuture<>(); @@ -478,7 +473,6 @@ public void testSlotReleasingFailsSchedulingOperation() throws Exception { jobVertex.setInvokableClass(NoOpInvokable.class); jobVertex.setParallelism(parallelism); final JobGraph jobGraph = new JobGraph(jobVertex); - jobGraph.setAllowQueuedScheduling(true); jobGraph.setScheduleMode(ScheduleMode.EAGER); final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism); @@ -520,7 +514,6 @@ public void testCancellationOfIncompleteScheduling() throws Exception { jobVertex.setParallelism(parallelism); final JobGraph jobGraph = new JobGraph(jobVertex); - jobGraph.setAllowQueuedScheduling(true); jobGraph.setScheduleMode(ScheduleMode.EAGER); final TestingSlotOwner slotOwner = new TestingSlotOwner(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 288f5f12835fa..fe3621091690c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -627,16 +627,6 @@ public TestingExecutionGraphBuilder setPartitionTracker(final JobMasterPartition return this; } - public TestingExecutionGraphBuilder allowQueuedScheduling() { - jobGraph.setAllowQueuedScheduling(true); - return this; - } - - public TestingExecutionGraphBuilder setAllowQueuedScheduling(boolean allowQueuedScheduling) { - jobGraph.setAllowQueuedScheduling(allowQueuedScheduling); - return this; - } - public TestingExecutionGraphBuilder setScheduleMode(ScheduleMode scheduleMode) { jobGraph.setScheduleMode(scheduleMode); return this; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java index 0e1d3ddc22e16..9c0cbbd14a76b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -233,7 +233,7 @@ private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPar final SlotProvider slotProvider = new SlotProvider() { @Override - public CompletableFuture allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { + public CompletableFuture allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, Time allocationTimeout) { return CompletableFuture.completedFuture( new TestingLogicalSlotBuilder() .setTaskManagerLocation(taskManagerLocation) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java index 9172348615f82..4294452944b94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java @@ -293,7 +293,7 @@ public void testScheduleOrDeployAfterCancel() { // it can occur as the result of races { vertex.scheduleForExecution( - TestingSlotProviderStrategy.from(new ProgrammedSlotProvider(1), false), + TestingSlotProviderStrategy.from(new ProgrammedSlotProvider(1)), LocationPreferenceConstraint.ALL, Collections.emptySet()); @@ -333,7 +333,7 @@ public void testActionsWhileCancelling() { AkkaUtils.getDefaultTimeout()); setVertexState(vertex, ExecutionState.CANCELING); vertex.scheduleForExecution( - TestingSlotProviderStrategy.from(new ProgrammedSlotProvider(1), false), + TestingSlotProviderStrategy.from(new ProgrammedSlotProvider(1)), LocationPreferenceConstraint.ALL, Collections.emptySet()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java index cf55f7082d1c5..babb755bd26ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java @@ -57,7 +57,7 @@ public void testSlotReleasedWhenScheduledImmediately() { assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot vertex.scheduleForExecution( - TestingSlotProviderStrategy.from(new TestingSlotProvider((i) -> future), false), + TestingSlotProviderStrategy.from(new TestingSlotProvider((i) -> future)), LocationPreferenceConstraint.ALL, Collections.emptySet()); @@ -88,7 +88,7 @@ public void testSlotReleasedWhenScheduledQueued() { assertEquals(ExecutionState.CREATED, vertex.getExecutionState()); // try to deploy to the slot vertex.scheduleForExecution( - TestingSlotProviderStrategy.from(new TestingSlotProvider(ignore -> future), true), + TestingSlotProviderStrategy.from(new TestingSlotProvider(ignore -> future)), LocationPreferenceConstraint.ALL, Collections.emptySet()); @@ -121,7 +121,7 @@ public void testScheduleToDeploying() { // try to deploy to the slot vertex.scheduleForExecution( - TestingSlotProviderStrategy.from(new TestingSlotProvider(ignore -> future), false), + TestingSlotProviderStrategy.from(new TestingSlotProvider(ignore -> future)), LocationPreferenceConstraint.ALL, Collections.emptySet()); assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java index 7c5a848ea87f8..9bcdab21bef07 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java @@ -117,7 +117,6 @@ public CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, - boolean allowQueued, Time allocationTimeout) { JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId(); int subtask = task.getTaskToExecute().getParallelSubtaskIndex(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java index a79527881f39f..49aa555889ac1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java @@ -56,7 +56,7 @@ public void setSlotCanceller(Consumer slotCanceller) { } @Override - public CompletableFuture allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, boolean allowQueued, Time timeout) { + public CompletableFuture allocateSlot(SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, Time timeout) { Preconditions.checkState(!slotFutures.containsKey(slotRequestId)); final CompletableFuture slotFuture = slotFutureCreator.apply(slotRequestId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProviderStrategy.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProviderStrategy.java index 4d4c86c38c2d0..b9504e999e854 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProviderStrategy.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProviderStrategy.java @@ -28,11 +28,11 @@ */ public class TestingSlotProviderStrategy extends SlotProviderStrategy.NormalSlotProviderStrategy{ - private TestingSlotProviderStrategy(SlotProvider slotProvider, Time allocationTimeout, boolean allowQueuedScheduling) { - super(slotProvider, allocationTimeout, allowQueuedScheduling); + private TestingSlotProviderStrategy(SlotProvider slotProvider, Time allocationTimeout) { + super(slotProvider, allocationTimeout); } - public static TestingSlotProviderStrategy from(SlotProvider slotProvider, boolean allowQueuedScheduling) { - return new TestingSlotProviderStrategy(slotProvider, Time.seconds(10L), allowQueuedScheduling); + public static TestingSlotProviderStrategy from(SlotProvider slotProvider) { + return new TestingSlotProviderStrategy(slotProvider, Time.seconds(10L)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java index fe55bf3511776..e6257016dc9e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java @@ -46,7 +46,6 @@ import java.util.ArrayDeque; import java.util.HashMap; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeoutException; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -90,7 +89,6 @@ public CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, - boolean allowQueued, Time allocationTimeout) { final SlotContext slot; @@ -112,8 +110,6 @@ public CompletableFuture allocateSlot( .createTestingLogicalSlot(); allocatedSlots.put(slotRequestId, slot); return CompletableFuture.completedFuture(result); - } else if (allowQueued) { - return FutureUtils.completedExceptionally(new TimeoutException()); } else { return FutureUtils.completedExceptionally(new NoResourceAvailableException()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 8e6a22f0f9b23..43bf9bab1256f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -121,14 +121,7 @@ private JobGraph createTestJobGraph( DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - final JobGraph jobGraph = new JobGraph(jobName, sender, receiver); - - // We need to allow queued scheduling, because there are not enough slots available - // to run all tasks at once. We queue tasks and then let them finish/consume the blocking - // result one after the other. - jobGraph.setAllowQueuedScheduling(true); - - return jobGraph; + return new JobGraph(jobName, sender, receiver); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java index cad0af547fe7d..df7b0222a7f30 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java @@ -62,18 +62,18 @@ public void scheduleAllSharedAndCoLocated() throws Exception { CoLocationConstraint c6 = new CoLocationConstraint(ccg); // schedule 4 tasks from the first vertex group - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s9 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s10 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s11 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s12 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 2, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s8 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s9 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s10 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 3, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s11 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 4, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c5), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s12 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 5, 6, sharingGroup), sharingGroup.getSlotSharingGroupId(), c6), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(s1); assertNotNull(s2); @@ -123,7 +123,7 @@ public void scheduleAllSharedAndCoLocated() throws Exception { assertTrue(testingSlotProvider.getNumberOfAvailableSlots() >= 1); LogicalSlot single = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(new JobVertexID(), 0, 1, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertNotNull(single); s1.releaseSlot(); @@ -160,11 +160,11 @@ public void scheduleWithIntermediateRelease() throws Exception { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot sSolo = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot sSolo = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 1, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); ResourceID taskManager = s1.getTaskManagerLocation().getResourceID(); @@ -173,7 +173,7 @@ public void scheduleWithIntermediateRelease() throws Exception { sSolo.releaseSlot(); LogicalSlot sNew = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals(taskManager, sNew.getTaskManagerLocation().getResourceID()); assertEquals(2, testingSlotProvider.getNumberOfLocalizedAssignments()); @@ -196,14 +196,14 @@ public void scheduleWithReleaseNoResource() throws Exception { CoLocationConstraint c1 = new CoLocationConstraint(new CoLocationGroup()); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid1, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 0, 1, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid2, 1, 2, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid3, 0, 1, sharingGroup), sharingGroup.getSlotSharingGroupId(), c1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduled even though no resource was available."); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NoResourceAvailableException); @@ -237,35 +237,35 @@ public void scheduleMixedCoLocationSlotSharing() throws Exception { SlotSharingGroup shareGroup = new SlotSharingGroup(); // first wave - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid1, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // second wave LogicalSlot s21 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s22 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s23 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s24 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid2, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid2, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // third wave LogicalSlot s31 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc2), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s32 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc3), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s33 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc4), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot s34 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertex(jid3, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertex(jid3, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId(), clc1), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 0, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 1, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 2, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(jid4, 3, 4, shareGroup), shareGroup.getSlotSharingGroupId()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); assertEquals(s21.getTaskManagerLocation(), s34.getTaskManagerLocation()); assertEquals(s22.getTaskManagerLocation(), s31.getTaskManagerLocation()); @@ -297,25 +297,25 @@ public void testGetsNonLocalFromSharingGroupFirst() throws Exception { // schedule something into the shared group so that both instances are in the sharing group LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId()), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); // schedule one locally to instance 1 LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // schedule with co location constraint (yet unassigned) and a preference for // instance 1, but it can only get instance 2 LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // schedule something into the assigned co-location constraints and check that they override the // other preferences LogicalSlot s5 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); LogicalSlot s6 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid3, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // check that each slot got three assertEquals(s1.getTaskManagerLocation(), s3.getTaskManagerLocation()); @@ -357,9 +357,9 @@ public void testSlotReleasedInBetween() throws Exception { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -367,9 +367,9 @@ public void testSlotReleasedInBetween() throws Exception { assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // still preserves the previous instance mapping) assertEquals(loc1, s3.getTaskManagerLocation()); @@ -403,9 +403,9 @@ public void testSlotReleasedInBetweenAndNoNewLocal() throws Exception { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); s1.releaseSlot(); s2.releaseSlot(); @@ -413,13 +413,13 @@ public void testSlotReleasedInBetweenAndNoNewLocal() throws Exception { assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); LogicalSlot sa = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 0, 2, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); LogicalSlot sb = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2, null)), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jidx, 1, 2, null)), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); try { testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); fail("should not be able to find a resource"); } catch (ExecutionException e) { @@ -459,14 +459,14 @@ public void testScheduleOutOfOrder() throws Exception { // and give locality preferences that hint at using the same shared slot for both // co location constraints (which we seek to prevent) LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // check that each slot got three assertEquals(s1.getTaskManagerLocation(), s3.getTaskManagerLocation()); @@ -504,14 +504,14 @@ public void nonColocationFollowsCoLocation() throws Exception { CoLocationConstraint cc2 = new CoLocationConstraint(ccg); LogicalSlot s1 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId(), cc1), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s2 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), false, slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid1, 1, 2, sharingGroup, loc2), sharingGroup.getSlotSharingGroupId(), cc2), slotProfileForLocation(loc2), TestingUtils.infiniteTime()).get(); LogicalSlot s3 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 0, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); LogicalSlot s4 = testingSlotProvider.allocateSlot( - new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), false, slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); + new ScheduledUnit(getTestVertexWithLocation(jid2, 1, 2, sharingGroup, loc1), sharingGroup.getSlotSharingGroupId()), slotProfileForLocation(loc1), TestingUtils.infiniteTime()).get(); // check that each slot got two assertEquals(s1.getTaskManagerLocation(), s3.getTaskManagerLocation()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java index 75f9a70372f07..e3214b1bb7742 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.Ignore; import org.junit.Test; import java.util.ArrayList; @@ -52,65 +53,6 @@ * Tests for scheduling individual tasks. */ public class SchedulerIsolatedTasksTest extends SchedulerTestBase { - - @Test - public void testScheduleImmediately() throws Exception { - assertEquals(0, testingSlotProvider.getNumberOfAvailableSlots()); - - testingSlotProvider.addTaskManager(2); - testingSlotProvider.addTaskManager(1); - testingSlotProvider.addTaskManager(2); - assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots()); - - // schedule something into all slots - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - - // the slots should all be different - assertTrue(areAllDistinct(s1, s2, s3, s4, s5)); - - try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - fail("Scheduler accepted scheduling request without available resource."); - } - catch (ExecutionException e) { - assertTrue(e.getCause() instanceof NoResourceAvailableException); - } - - // release some slots again - s3.releaseSlot(); - s4.releaseSlot(); - assertEquals(2, testingSlotProvider.getNumberOfAvailableSlots()); - - // now we can schedule some more slots - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); - - assertTrue(areAllDistinct(s1, s2, s3, s4, s5, s6, s7)); - - // release all - - s1.releaseSlot(); - s2.releaseSlot(); - s5.releaseSlot(); - s6.releaseSlot(); - s7.releaseSlot(); - - assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots()); - - // check that slots that are released twice (accidentally) do not mess things up - - s1.releaseSlot(); - s2.releaseSlot(); - s5.releaseSlot(); - s6.releaseSlot(); - s7.releaseSlot(); - - assertEquals(5, testingSlotProvider.getNumberOfAvailableSlots()); - } @Test public void testScheduleQueueing() throws Exception { @@ -134,7 +76,7 @@ public void testScheduleQueueing() throws Exception { final AtomicBoolean errored = new AtomicBoolean(false); for (int i = 0; i < NUM_TASKS_TO_SCHEDULE; i++) { - CompletableFuture future = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); + CompletableFuture future = testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()); future.thenAcceptAsync( (LogicalSlot slot) -> { synchronized (toRelease) { @@ -179,19 +121,20 @@ public void testScheduleQueueing() throws Exception { assertEquals("All slots should be available.", totalSlots, testingSlotProvider.getNumberOfAvailableSlots()); } - + @Test + @Ignore public void testScheduleWithDyingInstances() throws Exception { final TaskManagerLocation taskManagerLocation1 = testingSlotProvider.addTaskManager(2); final TaskManagerLocation taskManagerLocation2 = testingSlotProvider.addTaskManager(2); final TaskManagerLocation taskManagerLocation3 = testingSlotProvider.addTaskManager(1); List slots = new ArrayList<>(); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); - slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); + slots.add(testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get()); testingSlotProvider.releaseTaskManager(taskManagerLocation2.getResourceID()); @@ -212,7 +155,7 @@ public void testScheduleWithDyingInstances() throws Exception { // cannot get another slot, since all instances are dead try { - testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + testingSlotProvider.allocateSlot(new ScheduledUnit(getDummyTask()), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); fail("Scheduler served a slot from a dead instance"); } catch (ExecutionException e) { @@ -226,7 +169,7 @@ public void testScheduleWithDyingInstances() throws Exception { // that all instances have vanished assertEquals(0, testingSlotProvider.getNumberOfAvailableSlots()); } - + @Test public void testSchedulingLocation() throws Exception { final TaskManagerLocation taskManagerLocation1 = testingSlotProvider.addTaskManager(2); @@ -234,7 +177,7 @@ public void testSchedulingLocation() throws Exception { final TaskManagerLocation taskManagerLocation3 = testingSlotProvider.addTaskManager(2); // schedule something on an arbitrary instance - LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new LocalTaskManagerLocation())), false, SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); + LogicalSlot s1 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(new LocalTaskManagerLocation())), SlotProfile.noRequirements(), TestingUtils.infiniteTime()).get(); // figure out how we use the location hints ResourceID firstResourceId = s1.getTaskManagerLocation().getResourceID(); @@ -256,28 +199,28 @@ public void testSchedulingLocation() throws Exception { TaskManagerLocation third = taskManagerLocations.get((index + 2) % taskManagerLocations.size()); // something that needs to go to the first instance again - LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), false, slotProfileForLocation(s1.getTaskManagerLocation()), TestingUtils.infiniteTime()).get(); + LogicalSlot s2 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(s1.getTaskManagerLocation())), slotProfileForLocation(s1.getTaskManagerLocation()), TestingUtils.infiniteTime()).get(); assertEquals(first.getResourceID(), s2.getTaskManagerLocation().getResourceID()); // first or second --> second, because first is full - LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), false, slotProfileForLocation(first, second), TestingUtils.infiniteTime()).get(); + LogicalSlot s3 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, second)), slotProfileForLocation(first, second), TestingUtils.infiniteTime()).get(); assertEquals(second.getResourceID(), s3.getTaskManagerLocation().getResourceID()); // first or third --> third (because first is full) - LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); - LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s4 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s5 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); assertEquals(third.getResourceID(), s4.getTaskManagerLocation().getResourceID()); assertEquals(third.getResourceID(), s5.getTaskManagerLocation().getResourceID()); // first or third --> second, because all others are full - LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s6 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); assertEquals(second.getResourceID(), s6.getTaskManagerLocation().getResourceID()); // release something on the first and second instance s2.releaseSlot(); s6.releaseSlot(); - LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), false, slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); + LogicalSlot s7 = testingSlotProvider.allocateSlot(new ScheduledUnit(getTestVertex(first, third)), slotProfileForLocation(first, third), TestingUtils.infiniteTime()).get(); assertEquals(first.getResourceID(), s7.getTaskManagerLocation().getResourceID()); assertEquals(1, testingSlotProvider.getNumberOfUnconstrainedAssignments()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java index fba62593289df..bab5aead7de98 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java @@ -235,9 +235,8 @@ public CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, - boolean allowQueued, Time allocationTimeout) { - return scheduler.allocateSlot(task, allowQueued, slotProfile, allocationTimeout).thenApply( + return scheduler.allocateSlot(task, slotProfile, allocationTimeout).thenApply( (LogicalSlot logicalSlot) -> { switch (logicalSlot.getLocality()) { case LOCAL: diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e7f061ee7c693..7b7a6247a3617 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -1079,16 +1079,15 @@ private void runRequestNextInputSplitTest(Function>, Colle source.setInputSplitSource(inputSplitSource); source.setInvokableClass(AbstractInvokable.class); - final JobGraph inputSplitjobGraph = new JobGraph(source); - inputSplitjobGraph.setAllowQueuedScheduling(true); + final JobGraph inputSplitJobGraph = new JobGraph(source); final ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(100, 0)); - inputSplitjobGraph.setExecutionConfig(executionConfig); + inputSplitJobGraph.setExecutionConfig(executionConfig); final JobMaster jobMaster = createJobMaster( configuration, - inputSplitjobGraph, + inputSplitJobGraph, haServices, new TestingJobManagerSharedServicesBuilder().build(), heartbeatServices); @@ -1118,7 +1117,7 @@ private void runRequestNextInputSplitTest(Function>, Colle waitUntilAllExecutionsAreScheduled(jobMasterGateway); // fail the first execution to trigger a failover - jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(inputSplitjobGraph.getJobID(), initialAttemptId, ExecutionState.FAILED)).get(); + jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(inputSplitJobGraph.getJobID(), initialAttemptId, ExecutionState.FAILED)).get(); // wait until the job has been recovered waitUntilAllExecutionsAreScheduled(jobMasterGateway); @@ -2119,10 +2118,7 @@ private JobGraph producerConsumerJobGraph() { consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); - final JobGraph jobGraph = new JobGraph(producer, consumer); - jobGraph.setAllowQueuedScheduling(true); - - return jobGraph; + return new JobGraph(producer, consumer); } private File createSavepoint(long savepointId) throws IOException { @@ -2346,9 +2342,7 @@ private JobGraph createSingleVertexJobGraph() { final JobVertex jobVertex = new JobVertex("Test vertex"); jobVertex.setInvokableClass(NoOpInvokable.class); - final JobGraph jobGraph = new JobGraph(jobVertex); - jobGraph.setAllowQueuedScheduling(true); - return jobGraph; + return new JobGraph(jobVertex); } private static final class DummyCheckpointStorageLocation implements CompletedCheckpointStorageLocation { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java index 52cc3afa3e2f4..b9601ccb1d2d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolCoLocationTest.java @@ -98,7 +98,6 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter jobVertexId1, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -107,7 +106,6 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter jobVertexId2, slotSharingGroupId, coLocationConstraint2), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -116,7 +114,6 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter jobVertexId2, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -125,7 +122,6 @@ public void testSimpleCoLocatedSlotScheduling() throws ExecutionException, Inter jobVertexId1, slotSharingGroupId, coLocationConstraint2), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -196,7 +192,6 @@ public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionExcept jobVertexId1, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noLocality(rp1), TestingUtils.infiniteTime()); @@ -205,7 +200,6 @@ public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionExcept jobVertexId2, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noLocality(rp2), TestingUtils.infiniteTime()); @@ -214,7 +208,6 @@ public void testCoLocatedSlotRequestsFailBeforeResolved() throws ExecutionExcept jobVertexId3, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noLocality(rp3), TestingUtils.infiniteTime()); @@ -274,7 +267,6 @@ public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionExcepti jobVertexId1, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noLocality(rp1), TestingUtils.infiniteTime()); @@ -283,7 +275,6 @@ public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionExcepti jobVertexId2, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noLocality(rp2), TestingUtils.infiniteTime()); @@ -304,7 +295,6 @@ public void testCoLocatedSlotRequestsFailAfterResolved() throws ExecutionExcepti jobVertexId3, slotSharingGroupId, coLocationConstraint1), - true, SlotProfile.noLocality(rp3), TestingUtils.infiniteTime()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java index e2658ec3e09f7..d36be1ad52632 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java @@ -127,7 +127,6 @@ public void testAllocateSimpleSlot() throws Exception { requestId, new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); assertFalse(future.isDone()); @@ -171,13 +170,11 @@ public void testAllocationFulfilledByReturnedSlot() throws Exception { new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); CompletableFuture future2 = scheduler.allocateSlot( new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); assertFalse(future1.isDone()); @@ -230,7 +227,6 @@ public void testAllocateWithFreeSlot() throws Exception { new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); assertFalse(future1.isDone()); @@ -253,7 +249,6 @@ public void testAllocateWithFreeSlot() throws Exception { new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); // second allocation fulfilled by previous slot returning @@ -283,7 +278,6 @@ public void testOfferSlot() throws Exception { new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); assertFalse(future.isDone()); @@ -348,7 +342,6 @@ public void testReleaseResource() throws Exception { new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); final SlotRequest slotRequest = slotRequestFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -357,7 +350,6 @@ public void testReleaseResource() throws Exception { new SlotRequestId(), new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, timeout); final SlotOffer slotOffer = new SlotOffer( @@ -418,7 +410,6 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception slotRequestId1, scheduledUnit, SlotProfile.noRequirements(), - true, timeout); // wait for the first slot request @@ -428,7 +419,6 @@ public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception slotRequestId2, scheduledUnit, SlotProfile.noRequirements(), - true, timeout); // wait for the second slot request @@ -757,7 +747,6 @@ private CompletableFuture allocateSlot(Scheduler scheduler, SlotReq slotRequestId, new DummyScheduledUnit(), SlotProfile.noRequirements(), - true, timeout); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java index ce096acadd1b8..cda9370620bb7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java @@ -98,7 +98,6 @@ public void testSlotAllocationNoResourceManager() throws Exception { new SlotRequestId(), new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, fastTimeout)); try { @@ -127,7 +126,6 @@ public void testCancelSlotAllocationWithoutResourceManager() throws Exception { requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, fastTimeout)); try { @@ -179,7 +177,6 @@ public void testSlotAllocationTimeout() throws Exception { requestId, new DummyScheduledUnit(), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, fastTimeout)); try { @@ -226,7 +223,6 @@ public void testExtraSlotsAreKept() throws Exception { requestId, new ScheduledUnit(SchedulerTestUtils.getDummyTask()), SlotProfile.noLocality(DEFAULT_TESTING_PROFILE), - true, fastTimeout)); try { @@ -281,7 +277,6 @@ public void testProviderAndOwnerSlotAllocationTimeout() throws Exception { // test the pending request is clear when timed out CompletableFuture future = testMainThreadExecutor.execute(() -> scheduler.allocateSlot( new DummyScheduledUnit(), - true, SlotProfile.noRequirements(), fastTimeout)); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java index caa3fc38c6018..08aafd02411b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSharingTest.java @@ -76,7 +76,6 @@ public void testSingleQueuedSharedSlotScheduling() throws Exception { new JobVertexID(), slotSharingGroupId, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -115,7 +114,6 @@ public void testFailingQueuedSharedSlotScheduling() throws Exception { new JobVertexID(), new SlotSharingGroupId(), null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -158,7 +156,6 @@ public void testQueuedSharedSlotScheduling() throws Exception { jobVertexId1, slotSharingGroupId, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -167,7 +164,6 @@ public void testQueuedSharedSlotScheduling() throws Exception { jobVertexId2, slotSharingGroupId, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -181,7 +177,6 @@ public void testQueuedSharedSlotScheduling() throws Exception { jobVertexId1, slotSharingGroupId, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -190,7 +185,6 @@ public void testQueuedSharedSlotScheduling() throws Exception { jobVertexId2, slotSharingGroupId, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -260,7 +254,6 @@ public void testQueuedMultipleSlotSharingGroups() throws Exception { jobVertexId1, slotSharingGroupId1, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -269,7 +262,6 @@ public void testQueuedMultipleSlotSharingGroups() throws Exception { jobVertexId2, slotSharingGroupId1, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -278,7 +270,6 @@ public void testQueuedMultipleSlotSharingGroups() throws Exception { jobVertexId3, slotSharingGroupId2, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -287,7 +278,6 @@ public void testQueuedMultipleSlotSharingGroups() throws Exception { jobVertexId4, slotSharingGroupId2, null), - true, SlotProfile.noRequirements(), TestingUtils.infiniteTime()); @@ -362,7 +352,6 @@ public void testSlotSharingRespectsRemainingResource() throws Exception { jobVertexId1, slotSharingGroupId, null), - true, SlotProfile.noLocality(largeRequestResource), TestingUtils.infiniteTime()); @@ -387,7 +376,6 @@ public void testSlotSharingRespectsRemainingResource() throws Exception { jobVertexId2, slotSharingGroupId, null), - true, SlotProfile.noLocality(largeRequestResource), TestingUtils.infiniteTime()); assertFalse(logicalSlotFuture2.isDone()); @@ -398,7 +386,6 @@ public void testSlotSharingRespectsRemainingResource() throws Exception { jobVertexId3, slotSharingGroupId, null), - true, SlotProfile.noLocality(smallRequestResource), TestingUtils.infiniteTime()); assertTrue(logicalSlotFuture3.isDone()); @@ -450,7 +437,6 @@ public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, Ex jobVertexId1, slotSharingGroupId, null), - true, SlotProfile.noLocality(rp1), TestingUtils.infiniteTime()); @@ -459,7 +445,6 @@ public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, Ex jobVertexId2, slotSharingGroupId, null), - true, SlotProfile.noLocality(rp2), TestingUtils.infiniteTime()); @@ -468,7 +453,6 @@ public void testRetryOnSharedSlotOverAllocated() throws InterruptedException, Ex jobVertexId3, slotSharingGroupId, null), - true, SlotProfile.noLocality(rp3), TestingUtils.infiniteTime()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java index ce3f83711adca..d6eb59eddafb3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.java @@ -284,8 +284,7 @@ private DefaultExecutionSlotAllocator createExecutionSlotAllocator(InputsLocatio SlotProviderStrategy.from( ScheduleMode.EAGER, slotProvider, - Time.seconds(10), - true), + Time.seconds(10)), inputsLocationsRetriever); } @@ -323,7 +322,6 @@ public CompletableFuture allocateSlot( SlotRequestId slotRequestId, ScheduledUnit task, SlotProfile slotProfile, - boolean allowQueued, Time timeout) { slotAllocationRequests.add(Tuple3.of(slotRequestId, task, slotProfile)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java index 03df7130daab3..e2155d99e0573 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java @@ -218,10 +218,7 @@ private JobGraph createJobGraph(int parallelism) { final JobVertex jobVertex = new JobVertex("testing task"); jobVertex.setParallelism(parallelism); jobVertex.setInvokableClass(NoOpInvokable.class); - final JobGraph jobGraph = new JobGraph(jobId, "test job", jobVertex); - jobGraph.setAllowQueuedScheduling(true); - - return jobGraph; + return new JobGraph(jobId, "test job", jobVertex); } private static class GloballyTerminalJobStatusListener implements JobStatusListener { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 9b1618ee1c569..f958b70b8f605 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -425,10 +425,6 @@ public ClusterClient deployJobCluster( ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) throws ClusterDeploymentException { - - // this is required because the slots are allocated lazily - jobGraph.setAllowQueuedScheduling(true); - try { return deployInternal( clusterSpecification,