Skip to content

Commit

Permalink
[FLINK-14462][coordination] Remove JobGraph#allowQueuedScheduling fla…
Browse files Browse the repository at this point in the history
…g because it is always true

Rewrite testRestartWithSlotSharingAndNotEnoughResources
  • Loading branch information
tisonkun authored and tillrohrmann committed Nov 4, 2019
1 parent 845fb95 commit 76225ed
Show file tree
Hide file tree
Showing 46 changed files with 347 additions and 489 deletions.
4 changes: 1 addition & 3 deletions docs/ops/deployment/mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ The job graph file may be generated like this way:

{% highlight java %}
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
final String jobGraphFilename = "job.graph";
File jobGraphFile = new File(jobGraphFilename);
try (FileOutputStream output = new FileOutputStream(jobGraphFile);
Expand All @@ -203,8 +202,7 @@ try (FileOutputStream output = new FileOutputStream(jobGraphFile);
{% endhighlight %}

Note:
1. Before serializing the job graph, please make sure to enable queued scheduling because slots need to be allocated lazily
2. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory)
1. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory)

#### General configuration

Expand Down
4 changes: 1 addition & 3 deletions docs/ops/deployment/mesos.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ The job graph file may be generated like this way:

{% highlight java %}
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
final String jobGraphFilename = "job.graph";
File jobGraphFile = new File(jobGraphFilename);
try (FileOutputStream output = new FileOutputStream(jobGraphFile);
Expand All @@ -203,8 +202,7 @@ try (FileOutputStream output = new FileOutputStream(jobGraphFile);
{% endhighlight %}

Note:
1. Before serializing the job graph, please make sure to enable queued scheduling because slots need to be allocated lazily
2. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory)
1. Make sure that all Mesos processes have the user code jar on the classpath (e.g. putting them in the lib directory)

#### General configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ public JobExecutionResult executePlan(
baseConfiguration,
1);

jobGraph.setAllowQueuedScheduling(true);

try (final JobExecutorService executorService = createJobExecutorService(jobGraph,
baseConfiguration)) {
return executorService.executeJobBlocking(jobGraph);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,6 @@ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) {
*/
@Override
public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGraph) {
// we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);

CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
try {
final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept
configuration,
defaultParallelism,
jobId);
jobGraph.setAllowQueuedScheduling(true);
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);

return jobGraph;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,6 @@ public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
"This indicates that non-serializable types (like custom serializers) were registered");
}

graph.setAllowQueuedScheduling(false);

// add vertices to the graph
for (JobVertex vertex : this.vertices.values()) {
vertex.setInputDependencyConstraint(program.getOriginalPlan().getExecutionConfig().getDefaultInputDependencyConstraint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,7 @@ protected CompletableFuture<JarRunResponseBody> handleRequest(
return jobGraph;
});

CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> {
// we have to enable queued scheduling because slots will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
return gateway.submitJob(jobGraph, timeout);
});
CompletableFuture<Acknowledge> jobSubmissionFuture = jarUploadFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

return jobSubmissionFuture
.thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(jobGraph.getJobID()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,7 @@ public CompletableFuture<Void> scheduleForExecution(
locationPreferenceConstraint,
allPreviousExecutionGraphAllocationIds);

final CompletableFuture<Void> deploymentFuture;

if (allocationFuture.isDone() || slotProviderStrategy.isQueuedSchedulingAllowed()) {
deploymentFuture = allocationFuture.thenRun(ThrowingRunnable.unchecked(this::deploy));
} else {
deploymentFuture = FutureUtils.completedExceptionally(
new IllegalArgumentException("The slot allocation future has not been completed yet."));
}
final CompletableFuture<Void> deploymentFuture = allocationFuture.thenRun(ThrowingRunnable.unchecked(this::deploy));

deploymentFuture.whenComplete(
(Void ignored, Throwable failure) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ public class ExecutionGraph implements AccessExecutionGraph {

// ------ Configuration of the Execution -------

/** Flag to indicate whether the scheduler may queue tasks for execution, or needs to be able
* to deploy them immediately. */
private final boolean allowQueuedScheduling;

/** The mode of scheduling. Decides how to select the initial set of tasks to be deployed.
* May indicate to deploy all sources, or to deploy everything, or to deploy via backtracking
* from results than need to be materialized. */
Expand Down Expand Up @@ -440,8 +436,7 @@ public ExecutionGraph(
jobInformation.getJobId(),
NettyShuffleMaster.INSTANCE,
ignored -> Optional.empty()),
ScheduleMode.LAZY_FROM_SOURCES,
false);
ScheduleMode.LAZY_FROM_SOURCES);
}

public ExecutionGraph(
Expand All @@ -459,17 +454,14 @@ public ExecutionGraph(
PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
ScheduleMode scheduleMode,
boolean allowQueuedScheduling) throws IOException {
ScheduleMode scheduleMode) throws IOException {

this.jobInformation = Preconditions.checkNotNull(jobInformation);

this.blobWriter = Preconditions.checkNotNull(blobWriter);

this.scheduleMode = checkNotNull(scheduleMode);

this.allowQueuedScheduling = allowQueuedScheduling;

this.jobInformationOrBlobKey = BlobWriter.serializeAndTryOffload(jobInformation, jobInformation.getJobId(), blobWriter);

this.futureExecutor = Preconditions.checkNotNull(futureExecutor);
Expand All @@ -478,8 +470,7 @@ public ExecutionGraph(
this.slotProviderStrategy = SlotProviderStrategy.from(
scheduleMode,
slotProvider,
allocationTimeout,
allowQueuedScheduling);
allocationTimeout);
this.userClassLoader = Preconditions.checkNotNull(userClassLoader, "userClassLoader");

this.tasks = new ConcurrentHashMap<>(16);
Expand Down Expand Up @@ -541,10 +532,6 @@ public int getNumberOfExecutionJobVertices() {
return this.verticesInCreationOrder.size();
}

public boolean isQueuedSchedulingAllowed() {
return this.allowQueuedScheduling;
}

public SchedulingTopology<?, ?> getSchedulingTopology() {
return executionTopology;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ public static ExecutionGraph buildGraph(
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
jobGraph.getScheduleMode(),
jobGraph.getAllowQueuedScheduling());
jobGraph.getScheduleMode());
} catch (IOException e) {
throw new JobException("Could not create the ExecutionGraph.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,8 @@ public abstract class SlotProviderStrategy {

protected final SlotProvider slotProvider;

protected final boolean allowQueuedScheduling;

SlotProviderStrategy(SlotProvider slotProvider, boolean allowQueuedScheduling) {
SlotProviderStrategy(SlotProvider slotProvider) {
this.slotProvider = Preconditions.checkNotNull(slotProvider);
this.allowQueuedScheduling = allowQueuedScheduling;
}

boolean isQueuedSchedulingAllowed() {
return allowQueuedScheduling;
}

/**
Expand Down Expand Up @@ -80,15 +73,14 @@ public void cancelSlotRequest(
public static SlotProviderStrategy from(
ScheduleMode scheduleMode,
SlotProvider slotProvider,
Time allocationTimeout,
boolean allowQueuedScheduling) {
Time allocationTimeout) {

switch (scheduleMode) {
case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
return new BatchSlotProviderStrategy(slotProvider, allowQueuedScheduling);
return new BatchSlotProviderStrategy(slotProvider);
case LAZY_FROM_SOURCES:
case EAGER:
return new NormalSlotProviderStrategy(slotProvider, allocationTimeout, allowQueuedScheduling);
return new NormalSlotProviderStrategy(slotProvider, allocationTimeout);
default:
throw new IllegalArgumentException(String.format("Unknown scheduling mode: %s", scheduleMode));
}
Expand All @@ -100,27 +92,27 @@ SlotProvider asSlotProvider() {

static class BatchSlotProviderStrategy extends SlotProviderStrategy {

BatchSlotProviderStrategy(SlotProvider slotProvider, boolean allowQueuedScheduling) {
super(slotProvider, allowQueuedScheduling);
BatchSlotProviderStrategy(SlotProvider slotProvider) {
super(slotProvider);
}

@Override
public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile) {
return slotProvider.allocateBatchSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling);
return slotProvider.allocateBatchSlot(slotRequestId, scheduledUnit, slotProfile);
}
}

static class NormalSlotProviderStrategy extends SlotProviderStrategy {
private final Time allocationTimeout;

NormalSlotProviderStrategy(SlotProvider slotProvider, Time allocationTimeout, boolean allowQueuedScheduling) {
super(slotProvider, allowQueuedScheduling);
NormalSlotProviderStrategy(SlotProvider slotProvider, Time allocationTimeout) {
super(slotProvider);
this.allocationTimeout = Preconditions.checkNotNull(allocationTimeout);
}

@Override
public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile) {
return slotProvider.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allowQueuedScheduling, allocationTimeout);
return slotProvider.allocateSlot(slotRequestId, scheduledUnit, slotProfile, allocationTimeout);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ public class JobGraph implements Serializable {
/** Name of this job. */
private final String jobName;

/** flag to enable queued scheduling */
private boolean allowQueuedScheduling;

/** The mode in which the job is scheduled */
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

Expand Down Expand Up @@ -220,14 +217,6 @@ public SerializedValue<ExecutionConfig> getSerializedExecutionConfig() {
return serializedExecutionConfig;
}

public void setAllowQueuedScheduling(boolean allowQueuedScheduling) {
this.allowQueuedScheduling = allowQueuedScheduling;
}

public boolean getAllowQueuedScheduling() {
return allowQueuedScheduling;
}

public void setScheduleMode(ScheduleMode scheduleMode) {
this.scheduleMode = scheduleMode;
}
Expand Down
Loading

0 comments on commit 76225ed

Please sign in to comment.