Skip to content

Commit

Permalink
[hotfix][tests] Merge DefaultSchedulerBuilder and AdaptiveBatchSchedu…
Browse files Browse the repository at this point in the history
…lerBuilder

The extending AdaptiveBatchSchedulerBuilder is in a good shape, which requires hack logics to create AdaptiveBatchScheduler. Merging them can make it simpler to create different schedulers.
  • Loading branch information
zhuzhurk committed Jul 13, 2022
1 parent 331264c commit 51010a1
Show file tree
Hide file tree
Showing 25 changed files with 445 additions and 483 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule

private final Map<JobVertexID, ForwardGroup> forwardGroupsByJobVertexId;

AdaptiveBatchScheduler(
public AdaptiveBatchScheduler(
final Logger log,
final JobGraph jobGraph,
final Executor ioExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
Expand Down Expand Up @@ -208,7 +208,7 @@ private DefaultScheduler createSchedulerAndEnableCheckpointing(
.setJobCheckpointingSettings(checkpointingSettings)
.build();

return new SchedulerTestingUtils.DefaultSchedulerBuilder(
return new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
Expand Down Expand Up @@ -477,7 +478,7 @@ public void testNoResourceAvailableFailure() throws Exception {

// execution graph that executes actions synchronously
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
graph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -538,7 +539,7 @@ private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int d

// execution graph that executes actions synchronously
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(v1, v2),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -624,7 +625,7 @@ public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
final TestingPhysicalSlotProvider physicalSlotProvider =
TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
Expand Down Expand Up @@ -80,7 +81,7 @@ public void testConstraintsAfterRestart() throws Exception {
final ManuallyTriggeredScheduledExecutorService delayExecutor =
new ManuallyTriggeredScheduledExecutorService();
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
Expand Down Expand Up @@ -55,7 +55,7 @@ public void testJobFinishes() throws Exception {
ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class));

SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
Expand Down Expand Up @@ -251,7 +252,7 @@ private SchedulerBase createScheduler(

final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(vertices);
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
mainThreadExecutor.getMainThreadExecutor(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
Expand Down Expand Up @@ -107,7 +108,7 @@ public void testCancelAllPendingRequestWhileCanceling() throws Exception {
"Task", NUM_TASKS + numTasksExceedSlotPool, NoOpInvokable.class);
JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
graph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
Expand Down Expand Up @@ -136,7 +137,7 @@ public void testCancelAllPendingRequestWhileFailing() throws Exception {
"Task", NUM_TASKS + numTasksExceedSlotPool, NoOpInvokable.class);
JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
graph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
Expand All @@ -159,7 +160,7 @@ public void testCancelWhileRestarting() throws Exception {
// We want to manually control the restart and delay
try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -202,7 +203,7 @@ private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
public void testCancelWhileFailing() throws Exception {
try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -240,7 +241,7 @@ public void testCancelWhileFailing() throws Exception {
public void testFailWhileCanceling() throws Exception {
try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -291,7 +292,7 @@ public void testFailingExecutionAfterRestart() throws Exception {

try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
Expand Down Expand Up @@ -351,7 +352,7 @@ public void testFailingExecutionAfterRestart() throws Exception {
public void testFailExecutionAfterCancel() throws Exception {
try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) {
SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
createJobGraphToCancel(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
Expand Down Expand Up @@ -237,7 +238,7 @@ public void testSuspendWhileRestarting() throws Exception {
final ManuallyTriggeredScheduledExecutor taskRestartExecutor =
new ManuallyTriggeredScheduledExecutor();
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.emptyJobGraph(),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -318,7 +319,7 @@ private static SchedulerBase createScheduler(TaskManagerGateway gateway, int par
vertex.setParallelism(parallelism);

final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(vertex),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
Expand Down Expand Up @@ -391,7 +391,7 @@ public static ExecutionJobVertex getExecutionJobVertex(
JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);

SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
executor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
Expand Down Expand Up @@ -279,7 +280,7 @@ private void setupExecutionGraphAndStartRunningJob(

final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producerVertex, consumerVertex);
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception
final TestingPhysicalSlotProvider physicalSlotProvider =
TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(jobVertex),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
final JobVertexID jobVertexId = jobVertex.getID();

final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(jobVertex),
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -168,7 +169,7 @@ public void testCanceledExecutionReturnsSlot() throws Exception {
.withTaskManagerGateway(taskManagerGateway)
.build()));
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(jobVertex),
testMainThreadUtil.getMainThreadExecutor(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -208,7 +209,7 @@ public void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
final TestingPhysicalSlotProvider physicalSlotProvider =
TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
JobGraphTestUtils.streamingJobGraph(jobVertex),
testMainThreadUtil.getMainThreadExecutor(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
Expand Down Expand Up @@ -74,7 +75,7 @@ public void testResetForNewExecutionReleasesPartitions() throws Exception {
final JobGraph jobGraph =
JobGraphTestUtils.streamingJobGraph(producerJobVertex, consumerJobVertex);
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down Expand Up @@ -122,7 +123,7 @@ public void testFindLatestAllocationIgnoresFailedAttempts() throws Exception {
// make sure that retrieving the last (al)location is independent from the history size
configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
final SchedulerBase scheduler =
new SchedulerTestingUtils.DefaultSchedulerBuilder(
new DefaultSchedulerBuilder(
jobGraph,
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR_RESOURCE.getExecutor())
Expand Down
Loading

0 comments on commit 51010a1

Please sign in to comment.