diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 6507774b3e684..f744d68f28bec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -80,7 +80,7 @@ public class AdaptiveBatchScheduler extends DefaultScheduler implements Schedule private final Map forwardGroupsByJobVertexId; - AdaptiveBatchScheduler( + public AdaptiveBatchScheduler( final Logger log, final JobGraph jobGraph, final Executor ioExecutor, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java index 1fe9112284961..5b746502ca758 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultSchedulerCheckpointCoordinatorTest.java @@ -32,8 +32,8 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; @@ -208,7 +208,7 @@ private DefaultScheduler createSchedulerAndEnableCheckpointing( .setJobCheckpointingSettings(checkpointingSettings) .build(); - return new SchedulerTestingUtils.DefaultSchedulerBuilder( + return new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index a2abb0b590187..05237422a719a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; @@ -477,7 +478,7 @@ public void testNoResourceAvailableFailure() throws Exception { // execution graph that executes actions synchronously final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( graph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) @@ -538,7 +539,7 @@ private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int d // execution graph that executes actions synchronously final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(v1, v2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) @@ -624,7 +625,7 @@ public void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception { final TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation(); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java index d7a9d89557b25..96b23b8dba564 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; @@ -80,7 +81,7 @@ public void testConstraintsAfterRestart() throws Exception { final ManuallyTriggeredScheduledExecutorService delayExecutor = new ManuallyTriggeredScheduledExecutorService(); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java index d23b13cc59f4f..f8959481fd30d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphFinishTest.java @@ -22,8 +22,8 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; @@ -55,7 +55,7 @@ public void testJobFinishes() throws Exception { ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class)); SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index 0ca4f106f95d4..29711bc995677 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -251,7 +252,7 @@ private SchedulerBase createScheduler( final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(vertices); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor.getMainThreadExecutor(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index facd4be2fbe2e..a08d5444083a8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; @@ -107,7 +108,7 @@ public void testCancelAllPendingRequestWhileCanceling() throws Exception { "Task", NUM_TASKS + numTasksExceedSlotPool, NoOpInvokable.class); JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender); SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( graph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setExecutionSlotAllocatorFactory( createExecutionSlotAllocatorFactory(slotPool)) @@ -136,7 +137,7 @@ public void testCancelAllPendingRequestWhileFailing() throws Exception { "Task", NUM_TASKS + numTasksExceedSlotPool, NoOpInvokable.class); JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender); SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( graph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setExecutionSlotAllocatorFactory( createExecutionSlotAllocatorFactory(slotPool)) @@ -159,7 +160,7 @@ public void testCancelWhileRestarting() throws Exception { // We want to manually control the restart and delay try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) { SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( createJobGraph(), mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) @@ -202,7 +203,7 @@ private static ResourceID offerSlots(SlotPool slotPool, int numSlots) { public void testCancelWhileFailing() throws Exception { try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) { SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( createJobGraph(), mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) @@ -240,7 +241,7 @@ public void testCancelWhileFailing() throws Exception { public void testFailWhileCanceling() throws Exception { try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) { SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( createJobGraph(), mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) @@ -291,7 +292,7 @@ public void testFailingExecutionAfterRestart() throws Exception { try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) { SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setExecutionSlotAllocatorFactory( createExecutionSlotAllocatorFactory(slotPool)) @@ -351,7 +352,7 @@ public void testFailingExecutionAfterRestart() throws Exception { public void testFailExecutionAfterCancel() throws Exception { try (SlotPool slotPool = SlotPoolUtils.createDeclarativeSlotPoolBridge()) { SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( createJobGraphToCancel(), mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java index 5804f782ff9e4..3277cf3dbf555 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSuspendTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider; @@ -237,7 +238,7 @@ public void testSuspendWhileRestarting() throws Exception { final ManuallyTriggeredScheduledExecutor taskRestartExecutor = new ManuallyTriggeredScheduledExecutor(); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.emptyJobGraph(), ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) @@ -318,7 +319,7 @@ private static SchedulerBase createScheduler(TaskManagerGateway gateway, int par vertex.setParallelism(parallelism); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(vertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index 10ff1baa9153c..ff2baf807e178 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -391,7 +391,7 @@ public static ExecutionJobVertex getExecutionJobVertex( JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex); SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), executor) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java index df765c69cd71e..6ad9d321b962b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; @@ -279,7 +280,7 @@ private void setupExecutionGraphAndStartRunningJob( final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(producerVertex, consumerVertex); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 9f11c7276325f..0c991f55ea3d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; @@ -79,7 +80,7 @@ public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception final TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) @@ -123,7 +124,7 @@ public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception { final JobVertexID jobVertexId = jobVertex.getID(); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) @@ -168,7 +169,7 @@ public void testCanceledExecutionReturnsSlot() throws Exception { .withTaskManagerGateway(taskManagerGateway) .build())); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(jobVertex), testMainThreadUtil.getMainThreadExecutor(), EXECUTOR_RESOURCE.getExecutor()) @@ -208,7 +209,7 @@ public void testSlotReleaseAtomicallyReleasesExecution() throws Exception { final TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( JobGraphTestUtils.streamingJobGraph(jobVertex), testMainThreadUtil.getMainThreadExecutor(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java index dd87d04abcbdd..d5f6f4f13e946 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; @@ -74,7 +75,7 @@ public void testResetForNewExecutionReleasesPartitions() throws Exception { final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(producerJobVertex, consumerJobVertex); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) @@ -122,7 +123,7 @@ public void testFindLatestAllocationIgnoresFailedAttempts() throws Exception { // make sure that retrieving the last (al)location is independent from the history size configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index e5470cb99cde3..456de0256680b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -26,8 +26,8 @@ import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; @@ -304,7 +304,7 @@ private static IntermediateResult createResult( JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink); SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), executorService) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java index 2ba942dcecca2..73dd9cdeb3a88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/RemoveCachedShuffleDescriptorTest.java @@ -39,7 +39,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.scheduler.DefaultScheduler; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.testutils.TestingUtils; @@ -393,7 +393,7 @@ private static DefaultScheduler createScheduler( .addJobVertices(jobVertices) .build(); - return new SchedulerTestingUtils.DefaultSchedulerBuilder( + return new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0)) .setBlobWriter(blobWriter) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java index b1b46dee1a0d2..65e7975105d4c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/UpdatePartitionConsumersTest.java @@ -31,8 +31,8 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; @@ -132,7 +132,7 @@ public void testUpdatePartitionConsumers() throws Exception { new SimpleAckingTaskManagerGateway(); final SchedulerBase scheduler = - new SchedulerTestingUtils.DefaultSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index e8022c97ada58..0bc4aba7187ef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -37,17 +37,23 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartAllFailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.TestExecutionSlotAllocatorFactory; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -694,11 +700,11 @@ private DefaultScheduler setupTestJobAndScheduler( new ComponentMainThreadExecutorServiceAdapter( (ScheduledExecutorService) executor, Thread.currentThread()); - final SchedulerTestingUtils.DefaultSchedulerBuilder schedulerBuilder = + final DefaultSchedulerBuilder schedulerBuilder = taskExecutorOperatorEventGateway == null - ? SchedulerTestingUtils.createSchedulerBuilder( + ? createSchedulerBuilder( jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) - : SchedulerTestingUtils.createSchedulerBuilder( + : createSchedulerBuilder( jobGraph, mainThreadExecutor, taskExecutorOperatorEventGateway, @@ -714,6 +720,46 @@ private DefaultScheduler setupTestJobAndScheduler( return scheduler; } + private static DefaultSchedulerBuilder createSchedulerBuilder( + JobGraph jobGraph, + ComponentMainThreadExecutor mainThreadExecutor, + ScheduledExecutorService scheduledExecutorService) { + + return createSchedulerBuilder( + jobGraph, + mainThreadExecutor, + new SimpleAckingTaskManagerGateway(), + scheduledExecutorService); + } + + private static DefaultSchedulerBuilder createSchedulerBuilder( + JobGraph jobGraph, + ComponentMainThreadExecutor mainThreadExecutor, + TaskExecutorOperatorEventGateway operatorEventGateway, + ScheduledExecutorService scheduledExecutorService) { + + final TaskManagerGateway gateway = + operatorEventGateway instanceof TaskManagerGateway + ? (TaskManagerGateway) operatorEventGateway + : new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway); + + return createSchedulerBuilder( + jobGraph, mainThreadExecutor, gateway, scheduledExecutorService); + } + + private static DefaultSchedulerBuilder createSchedulerBuilder( + JobGraph jobGraph, + ComponentMainThreadExecutor mainThreadExecutor, + TaskManagerGateway taskManagerGateway, + ScheduledExecutorService executorService) { + + return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService) + .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0)) + .setExecutionSlotAllocatorFactory( + new TestExecutionSlotAllocatorFactory(taskManagerGateway)); + } + private void scheduleAllTasksToRunning(DefaultScheduler scheduler) { scheduler.startScheduling(); executor.triggerAll(); @@ -1014,4 +1060,21 @@ public OperatorStateBackend createOperatorStateBackend( throw new UnsupportedOperationException(); } } + + private static final class TaskExecutorOperatorEventGatewayAdapter + extends SimpleAckingTaskManagerGateway { + + private final TaskExecutorOperatorEventGateway operatorGateway; + + private TaskExecutorOperatorEventGatewayAdapter( + TaskExecutorOperatorEventGateway operatorGateway) { + this.operatorGateway = operatorGateway; + } + + @Override + public CompletableFuture sendOperatorEventToTask( + ExecutionAttemptID task, OperatorID operator, SerializedValue evt) { + return operatorGateway.sendOperatorEventToTask(task, operator, evt); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java index 2b72068abc76e..442b0adb9b112 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.java @@ -205,7 +205,7 @@ private SchedulerNG createScheduler( Time slotRequestTimeout, JobStatusListener jobStatusListener) throws Exception { - return new SchedulerTestingUtils.DefaultSchedulerBuilder( + return new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java new file mode 100644 index 0000000000000..4c99e8ce5ecd4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.blob.BlobWriter; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy; +import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; +import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler; +import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismDecider; +import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; +import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; +import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleTestUtils; +import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore; + +/** A builder to create {@link DefaultScheduler} instances for testing. */ +public class DefaultSchedulerBuilder { + private static final Logger LOG = LoggerFactory.getLogger(DefaultSchedulerBuilder.class); + + private final JobGraph jobGraph; + private final ComponentMainThreadExecutor mainThreadExecutor; + + private Executor ioExecutor; + private ScheduledExecutorService futureExecutor; + private ScheduledExecutor delayExecutor; + private Logger log = LOG; + private Configuration jobMasterConfiguration = new Configuration(); + private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader(); + private CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner(); + private CheckpointRecoveryFactory checkpointRecoveryFactory = + new StandaloneCheckpointRecoveryFactory(); + private Time rpcTimeout = Time.seconds(300); + private BlobWriter blobWriter = VoidBlobWriter.getInstance(); + private JobManagerJobMetricGroup jobManagerJobMetricGroup = + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); + private ShuffleMaster shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER; + private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE; + private SchedulingStrategyFactory schedulingStrategyFactory = + new PipelinedRegionSchedulingStrategy.Factory(); + private FailoverStrategy.Factory failoverStrategyFactory = + new RestartPipelinedRegionFailoverStrategy.Factory(); + private RestartBackoffTimeStrategy restartBackoffTimeStrategy = + NoRestartBackoffTimeStrategy.INSTANCE; + private ExecutionOperations executionOperations = new DefaultExecutionOperations(); + private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner(); + private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = + new TestExecutionSlotAllocatorFactory(); + private JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {}; + private ExecutionDeployer.Factory executionDeployerFactory = + new DefaultExecutionDeployer.Factory(); + private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0; + private int defaultMaxParallelism = + JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue(); + + public DefaultSchedulerBuilder( + JobGraph jobGraph, + ComponentMainThreadExecutor mainThreadExecutor, + ScheduledExecutorService generalExecutorService) { + this( + jobGraph, + mainThreadExecutor, + generalExecutorService, + generalExecutorService, + new ScheduledExecutorServiceAdapter(generalExecutorService)); + } + + public DefaultSchedulerBuilder( + JobGraph jobGraph, + ComponentMainThreadExecutor mainThreadExecutor, + Executor ioExecutor, + ScheduledExecutorService futureExecutor, + ScheduledExecutor delayExecutor) { + this.jobGraph = jobGraph; + this.mainThreadExecutor = mainThreadExecutor; + this.ioExecutor = ioExecutor; + this.futureExecutor = futureExecutor; + this.delayExecutor = delayExecutor; + } + + public DefaultSchedulerBuilder setIoExecutor(Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + + public DefaultSchedulerBuilder setFutureExecutor(ScheduledExecutorService futureExecutor) { + this.futureExecutor = futureExecutor; + return this; + } + + public DefaultSchedulerBuilder setDelayExecutor(ScheduledExecutor delayExecutor) { + this.delayExecutor = delayExecutor; + return this; + } + + public DefaultSchedulerBuilder setLogger(Logger log) { + this.log = log; + return this; + } + + public DefaultSchedulerBuilder setJobMasterConfiguration(Configuration jobMasterConfiguration) { + this.jobMasterConfiguration = jobMasterConfiguration; + return this; + } + + public DefaultSchedulerBuilder setUserCodeLoader(ClassLoader userCodeLoader) { + this.userCodeLoader = userCodeLoader; + return this; + } + + public DefaultSchedulerBuilder setCheckpointCleaner(CheckpointsCleaner checkpointsCleaner) { + this.checkpointCleaner = checkpointsCleaner; + return this; + } + + public DefaultSchedulerBuilder setCheckpointRecoveryFactory( + CheckpointRecoveryFactory checkpointRecoveryFactory) { + this.checkpointRecoveryFactory = checkpointRecoveryFactory; + return this; + } + + public DefaultSchedulerBuilder setRpcTimeout(Time rpcTimeout) { + this.rpcTimeout = rpcTimeout; + return this; + } + + public DefaultSchedulerBuilder setBlobWriter(BlobWriter blobWriter) { + this.blobWriter = blobWriter; + return this; + } + + public DefaultSchedulerBuilder setJobManagerJobMetricGroup( + JobManagerJobMetricGroup jobManagerJobMetricGroup) { + this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; + return this; + } + + public DefaultSchedulerBuilder setShuffleMaster(ShuffleMaster shuffleMaster) { + this.shuffleMaster = shuffleMaster; + return this; + } + + public DefaultSchedulerBuilder setPartitionTracker(JobMasterPartitionTracker partitionTracker) { + this.partitionTracker = partitionTracker; + return this; + } + + public DefaultSchedulerBuilder setSchedulingStrategyFactory( + SchedulingStrategyFactory schedulingStrategyFactory) { + this.schedulingStrategyFactory = schedulingStrategyFactory; + return this; + } + + public DefaultSchedulerBuilder setFailoverStrategyFactory( + FailoverStrategy.Factory failoverStrategyFactory) { + this.failoverStrategyFactory = failoverStrategyFactory; + return this; + } + + public DefaultSchedulerBuilder setRestartBackoffTimeStrategy( + RestartBackoffTimeStrategy restartBackoffTimeStrategy) { + this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; + return this; + } + + public DefaultSchedulerBuilder setExecutionOperations(ExecutionOperations executionOperations) { + this.executionOperations = executionOperations; + return this; + } + + public DefaultSchedulerBuilder setExecutionVertexVersioner( + ExecutionVertexVersioner executionVertexVersioner) { + this.executionVertexVersioner = executionVertexVersioner; + return this; + } + + public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory( + ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) { + this.executionSlotAllocatorFactory = executionSlotAllocatorFactory; + return this; + } + + public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) { + this.jobStatusListener = jobStatusListener; + return this; + } + + public DefaultSchedulerBuilder setExecutionDeployerFactory( + ExecutionDeployer.Factory executionDeployerFactory) { + this.executionDeployerFactory = executionDeployerFactory; + return this; + } + + public DefaultSchedulerBuilder setVertexParallelismDecider( + VertexParallelismDecider vertexParallelismDecider) { + this.vertexParallelismDecider = vertexParallelismDecider; + return this; + } + + public DefaultSchedulerBuilder setDefaultMaxParallelism(int defaultMaxParallelism) { + this.defaultMaxParallelism = defaultMaxParallelism; + return this; + } + + public DefaultScheduler build() throws Exception { + return new DefaultScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + componentMainThreadExecutor -> {}, + delayExecutor, + userCodeLoader, + checkpointCleaner, + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + schedulingStrategyFactory, + failoverStrategyFactory, + restartBackoffTimeStrategy, + executionOperations, + executionVertexVersioner, + executionSlotAllocatorFactory, + System.currentTimeMillis(), + mainThreadExecutor, + jobStatusListener, + createExecutionGraphFactory(false), + shuffleMaster, + rpcTimeout, + computeVertexParallelismStore(jobGraph), + executionDeployerFactory); + } + + public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws Exception { + return new AdaptiveBatchScheduler( + log, + jobGraph, + ioExecutor, + jobMasterConfiguration, + componentMainThreadExecutor -> {}, + delayExecutor, + userCodeLoader, + checkpointCleaner, + checkpointRecoveryFactory, + jobManagerJobMetricGroup, + new VertexwiseSchedulingStrategy.Factory(), + failoverStrategyFactory, + restartBackoffTimeStrategy, + executionOperations, + executionVertexVersioner, + executionSlotAllocatorFactory, + System.currentTimeMillis(), + mainThreadExecutor, + jobStatusListener, + createExecutionGraphFactory(true), + shuffleMaster, + rpcTimeout, + vertexParallelismDecider, + defaultMaxParallelism); + } + + private ExecutionGraphFactory createExecutionGraphFactory(boolean isDynamicGraph) { + return new DefaultExecutionGraphFactory( + jobMasterConfiguration, + userCodeLoader, + new DefaultExecutionDeploymentTracker(), + futureExecutor, + ioExecutor, + rpcTimeout, + jobManagerJobMetricGroup, + blobWriter, + shuffleMaster, + partitionTracker, + isDynamicGraph, + new ExecutionJobVertex.Factory()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java index 733dbbc3d6ef1..d78610c25aa8e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerTest.java @@ -1599,7 +1599,7 @@ public void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Except final JobGraph jobGraph = singleJobVertexJobGraph(1); enableCheckpointing(jobGraph); try { - return new SchedulerTestingUtils.DefaultSchedulerBuilder( + return new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter .forSingleThreadExecutor(executorService), @@ -1844,10 +1844,9 @@ private DefaultScheduler createScheduler( .build(); } - private SchedulerTestingUtils.DefaultSchedulerBuilder createSchedulerBuilder( - final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) - throws Exception { - return new SchedulerTestingUtils.DefaultSchedulerBuilder( + private DefaultSchedulerBuilder createSchedulerBuilder( + final JobGraph jobGraph, final ComponentMainThreadExecutor mainThreadExecutor) { + return new DefaultSchedulerBuilder( jobGraph, mainThreadExecutor, executor, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index ba6b7a0d880b3..eb09a7e139c5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -20,72 +20,38 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobWriter; -import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; -import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.PendingCheckpoint; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.JobStatusListener; -import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; -import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy; -import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; -import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy; -import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy; -import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; -import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; -import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; -import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider; -import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; -import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy; -import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; -import org.apache.flink.runtime.shuffle.ShuffleMaster; -import org.apache.flink.runtime.shuffle.ShuffleTestUtils; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TernaryBoolean; -import org.apache.flink.util.concurrent.ScheduledExecutor; -import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.apache.flink.runtime.scheduler.SchedulerBase.computeVertexParallelismStore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -94,8 +60,6 @@ /** A utility class to create {@link DefaultScheduler} instances for testing. */ public class SchedulerTestingUtils { - private static final Logger LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class); - private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 10 * 60 * 1000; private static final Time DEFAULT_TIMEOUT = Time.seconds(300); @@ -110,47 +74,6 @@ public static DefaultScheduler createScheduler( return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService).build(); } - public static DefaultSchedulerBuilder createSchedulerBuilder( - JobGraph jobGraph, - ComponentMainThreadExecutor mainThreadExecutor, - ScheduledExecutorService scheduledExecutorService) { - - return createSchedulerBuilder( - jobGraph, - mainThreadExecutor, - new SimpleAckingTaskManagerGateway(), - scheduledExecutorService); - } - - public static DefaultSchedulerBuilder createSchedulerBuilder( - JobGraph jobGraph, - ComponentMainThreadExecutor mainThreadExecutor, - TaskExecutorOperatorEventGateway operatorEventGateway, - ScheduledExecutorService scheduledExecutorService) { - - final TaskManagerGateway gateway = - operatorEventGateway instanceof TaskManagerGateway - ? (TaskManagerGateway) operatorEventGateway - : new TaskExecutorOperatorEventGatewayAdapter(operatorEventGateway); - - return createSchedulerBuilder( - jobGraph, mainThreadExecutor, gateway, scheduledExecutorService); - } - - private static DefaultSchedulerBuilder createSchedulerBuilder( - JobGraph jobGraph, - ComponentMainThreadExecutor mainThreadExecutor, - TaskManagerGateway taskManagerGateway, - ScheduledExecutorService executorService) { - - return new SchedulerTestingUtils.DefaultSchedulerBuilder( - jobGraph, mainThreadExecutor, executorService) - .setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory()) - .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0)) - .setExecutionSlotAllocatorFactory( - new TestExecutionSlotAllocatorFactory(taskManagerGateway)); - } - public static void enableCheckpointing(final JobGraph jobGraph) { enableCheckpointing(jobGraph, null, null); } @@ -353,22 +276,6 @@ public static ExecutionAttemptID getAttemptId( // ------------------------------------------------------------------------ - private static final class TaskExecutorOperatorEventGatewayAdapter - extends SimpleAckingTaskManagerGateway { - - private final TaskExecutorOperatorEventGateway operatorGateway; - - TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway operatorGateway) { - this.operatorGateway = operatorGateway; - } - - @Override - public CompletableFuture sendOperatorEventToTask( - ExecutionAttemptID task, OperatorID operator, SerializedValue evt) { - return operatorGateway.sendOperatorEventToTask(task, operator, evt); - } - } - public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory() { return newSlotSharingExecutionSlotAllocatorFactory( @@ -390,227 +297,4 @@ public CompletableFuture sendOperatorEventToTask( allocationTimeout, new LocalInputPreferredSlotSharingStrategy.Factory()); } - - /** Builder for {@link DefaultScheduler}. */ - public static class DefaultSchedulerBuilder { - protected final JobGraph jobGraph; - - protected final ComponentMainThreadExecutor mainThreadExecutor; - - protected SchedulingStrategyFactory schedulingStrategyFactory = - new PipelinedRegionSchedulingStrategy.Factory(); - - protected Logger log = LOG; - protected Executor ioExecutor; - protected Configuration jobMasterConfiguration = new Configuration(); - protected ScheduledExecutorService futureExecutor; - protected ScheduledExecutor delayExecutor; - protected ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader(); - protected CheckpointsCleaner checkpointCleaner = new CheckpointsCleaner(); - protected CheckpointRecoveryFactory checkpointRecoveryFactory = - new StandaloneCheckpointRecoveryFactory(); - protected Time rpcTimeout = DEFAULT_TIMEOUT; - protected BlobWriter blobWriter = VoidBlobWriter.getInstance(); - protected JobManagerJobMetricGroup jobManagerJobMetricGroup = - UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(); - protected ShuffleMaster shuffleMaster = ShuffleTestUtils.DEFAULT_SHUFFLE_MASTER; - protected JobMasterPartitionTracker partitionTracker = - NoOpJobMasterPartitionTracker.INSTANCE; - protected FailoverStrategy.Factory failoverStrategyFactory = - new RestartPipelinedRegionFailoverStrategy.Factory(); - protected RestartBackoffTimeStrategy restartBackoffTimeStrategy = - NoRestartBackoffTimeStrategy.INSTANCE; - protected ExecutionOperations executionOperations = new DefaultExecutionOperations(); - protected ExecutionVertexVersioner executionVertexVersioner = - new ExecutionVertexVersioner(); - protected ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = - new TestExecutionSlotAllocatorFactory(); - protected JobStatusListener jobStatusListener = (ignoredA, ignoredB, ignoredC) -> {}; - protected ExecutionDeployer.Factory executionDeployerFactory = - new DefaultExecutionDeployer.Factory(); - - public DefaultSchedulerBuilder( - final JobGraph jobGraph, - ComponentMainThreadExecutor mainThreadExecutor, - ScheduledExecutorService generalExecutorService) { - this( - jobGraph, - mainThreadExecutor, - generalExecutorService, - generalExecutorService, - new ScheduledExecutorServiceAdapter(generalExecutorService)); - } - - public DefaultSchedulerBuilder( - final JobGraph jobGraph, - ComponentMainThreadExecutor mainThreadExecutor, - Executor ioExecutor, - ScheduledExecutorService futureExecutor, - ScheduledExecutor delayExecuto) { - this.jobGraph = jobGraph; - this.mainThreadExecutor = mainThreadExecutor; - this.ioExecutor = ioExecutor; - this.futureExecutor = futureExecutor; - this.delayExecutor = delayExecuto; - } - - public DefaultSchedulerBuilder setLogger(final Logger log) { - this.log = log; - return this; - } - - public DefaultSchedulerBuilder setIoExecutor(final Executor ioExecutor) { - this.ioExecutor = ioExecutor; - return this; - } - - public DefaultSchedulerBuilder setJobMasterConfiguration( - final Configuration jobMasterConfiguration) { - this.jobMasterConfiguration = jobMasterConfiguration; - return this; - } - - public DefaultSchedulerBuilder setFutureExecutor( - final ScheduledExecutorService futureExecutor) { - this.futureExecutor = futureExecutor; - return this; - } - - public DefaultSchedulerBuilder setDelayExecutor(final ScheduledExecutor delayExecutor) { - this.delayExecutor = delayExecutor; - return this; - } - - public DefaultSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) { - this.userCodeLoader = userCodeLoader; - return this; - } - - public DefaultSchedulerBuilder setCheckpointCleaner( - final CheckpointsCleaner checkpointsCleaner) { - this.checkpointCleaner = checkpointsCleaner; - return this; - } - - public DefaultSchedulerBuilder setCheckpointRecoveryFactory( - final CheckpointRecoveryFactory checkpointRecoveryFactory) { - this.checkpointRecoveryFactory = checkpointRecoveryFactory; - return this; - } - - public DefaultSchedulerBuilder setRpcTimeout(final Time rpcTimeout) { - this.rpcTimeout = rpcTimeout; - return this; - } - - public DefaultSchedulerBuilder setBlobWriter(final BlobWriter blobWriter) { - this.blobWriter = blobWriter; - return this; - } - - public DefaultSchedulerBuilder setJobManagerJobMetricGroup( - final JobManagerJobMetricGroup jobManagerJobMetricGroup) { - this.jobManagerJobMetricGroup = jobManagerJobMetricGroup; - return this; - } - - public DefaultSchedulerBuilder setShuffleMaster(final ShuffleMaster shuffleMaster) { - this.shuffleMaster = shuffleMaster; - return this; - } - - public DefaultSchedulerBuilder setPartitionTracker( - final JobMasterPartitionTracker partitionTracker) { - this.partitionTracker = partitionTracker; - return this; - } - - public DefaultSchedulerBuilder setSchedulingStrategyFactory( - final SchedulingStrategyFactory schedulingStrategyFactory) { - this.schedulingStrategyFactory = schedulingStrategyFactory; - return this; - } - - public DefaultSchedulerBuilder setFailoverStrategyFactory( - final FailoverStrategy.Factory failoverStrategyFactory) { - this.failoverStrategyFactory = failoverStrategyFactory; - return this; - } - - public DefaultSchedulerBuilder setRestartBackoffTimeStrategy( - final RestartBackoffTimeStrategy restartBackoffTimeStrategy) { - this.restartBackoffTimeStrategy = restartBackoffTimeStrategy; - return this; - } - - public DefaultSchedulerBuilder setExecutionOperations( - final ExecutionOperations executionOperations) { - this.executionOperations = executionOperations; - return this; - } - - public DefaultSchedulerBuilder setExecutionVertexVersioner( - final ExecutionVertexVersioner executionVertexVersioner) { - this.executionVertexVersioner = executionVertexVersioner; - return this; - } - - public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory( - final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) { - this.executionSlotAllocatorFactory = executionSlotAllocatorFactory; - return this; - } - - public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) { - this.jobStatusListener = jobStatusListener; - return this; - } - - public DefaultSchedulerBuilder setExecutionDeployerFactory( - ExecutionDeployer.Factory executionDeployerFactory) { - this.executionDeployerFactory = executionDeployerFactory; - return this; - } - - public DefaultScheduler build() throws Exception { - final ExecutionGraphFactory executionGraphFactory = - new DefaultExecutionGraphFactory( - jobMasterConfiguration, - userCodeLoader, - new DefaultExecutionDeploymentTracker(), - futureExecutor, - ioExecutor, - rpcTimeout, - jobManagerJobMetricGroup, - blobWriter, - shuffleMaster, - partitionTracker); - - return new DefaultScheduler( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - componentMainThreadExecutor -> {}, - delayExecutor, - userCodeLoader, - checkpointCleaner, - checkpointRecoveryFactory, - jobManagerJobMetricGroup, - schedulingStrategyFactory, - failoverStrategyFactory, - restartBackoffTimeStrategy, - executionOperations, - executionVertexVersioner, - executionSlotAllocatorFactory, - System.currentTimeMillis(), - mainThreadExecutor, - jobStatusListener, - executionGraphFactory, - shuffleMaster, - rpcTimeout, - computeVertexParallelismStore(jobGraph), - executionDeployerFactory); - } - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index e8636ea843556..d6508d8b66a5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; @@ -177,15 +178,10 @@ public SchedulerBase createScheduler(JobGraph jobGraph) throws Exception { configuration.set( JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch); - final AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder schedulerBuilder = - (AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder) - new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder( - jobGraph, - mainThreadExecutor, - EXECUTOR_RESOURCE.getExecutor()) - .setJobMasterConfiguration(configuration); - schedulerBuilder.setVertexParallelismDecider((ignored) -> 10); - - return schedulerBuilder.build(); + return new DefaultSchedulerBuilder( + jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor()) + .setJobMasterConfiguration(configuration) + .setVertexParallelismDecider((ignored) -> 10) + .buildAdaptiveBatchJobScheduler(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java deleted file mode 100644 index d462c726bc43e..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTestUtils.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.runtime.scheduler.adaptivebatch; - -import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; -import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory; -import org.apache.flink.runtime.scheduler.ExecutionGraphFactory; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; -import org.apache.flink.runtime.scheduler.strategy.VertexwiseSchedulingStrategy; - -import java.util.concurrent.ScheduledExecutorService; - -/** A utility class to create {@link AdaptiveBatchScheduler} instances for testing. */ -public class AdaptiveBatchSchedulerTestUtils { - - /** Builder for {@link AdaptiveBatchScheduler}. */ - public static class AdaptiveBatchSchedulerBuilder - extends SchedulerTestingUtils.DefaultSchedulerBuilder { - - private VertexParallelismDecider vertexParallelismDecider = (ignored) -> 0; - - private int defaultMaxParallelism = - JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM.defaultValue(); - - public AdaptiveBatchSchedulerBuilder( - JobGraph jobGraph, - ComponentMainThreadExecutor mainThreadExecutor, - ScheduledExecutorService executorService) { - super(jobGraph, mainThreadExecutor, executorService); - setSchedulingStrategyFactory(new VertexwiseSchedulingStrategy.Factory()); - } - - public void setVertexParallelismDecider(VertexParallelismDecider vertexParallelismDecider) { - this.vertexParallelismDecider = vertexParallelismDecider; - } - - public void setDefaultMaxParallelism(int defaultMaxParallelism) { - this.defaultMaxParallelism = defaultMaxParallelism; - } - - @Override - public AdaptiveBatchScheduler build() throws Exception { - final ExecutionGraphFactory executionGraphFactory = - new DefaultExecutionGraphFactory( - jobMasterConfiguration, - userCodeLoader, - new DefaultExecutionDeploymentTracker(), - futureExecutor, - ioExecutor, - rpcTimeout, - jobManagerJobMetricGroup, - blobWriter, - shuffleMaster, - partitionTracker, - true, - new ExecutionJobVertex.Factory()); - - return new AdaptiveBatchScheduler( - log, - jobGraph, - ioExecutor, - jobMasterConfiguration, - componentMainThreadExecutor -> {}, - delayExecutor, - userCodeLoader, - checkpointCleaner, - checkpointRecoveryFactory, - jobManagerJobMetricGroup, - schedulingStrategyFactory, - failoverStrategyFactory, - restartBackoffTimeStrategy, - executionOperations, - executionVertexVersioner, - executionSlotAllocatorFactory, - System.currentTimeMillis(), - mainThreadExecutor, - jobStatusListener, - executionGraphFactory, - shuffleMaster, - rpcTimeout, - vertexParallelismDecider, - defaultMaxParallelism); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java index ed5ef5f7eeaf7..887634cd9746b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/SchedulerBenchmarkUtils.java @@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.scheduler.DefaultScheduler; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter; @@ -104,8 +104,7 @@ public static ExecutionGraph createAndInitExecutionGraph( ComponentMainThreadExecutorServiceAdapter.forMainThread(); final DefaultScheduler scheduler = - SchedulerTestingUtils.createSchedulerBuilder( - jobGraph, mainThreadExecutor, scheduledExecutorService) + new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, scheduledExecutorService) .setIoExecutor(scheduledExecutorService) .setFutureExecutor(scheduledExecutorService) .setDelayExecutor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java index 42fcf4cf317b9..318375d8bc017 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/benchmark/e2e/SchedulerEndToEndBenchmarkBase.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl; import org.apache.flink.runtime.jobmaster.slotpool.SlotPool; import org.apache.flink.runtime.scheduler.DefaultScheduler; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; import org.apache.flink.runtime.scheduler.benchmark.JobConfiguration; import org.apache.flink.runtime.scheduler.benchmark.SchedulerBenchmarkBase; @@ -76,8 +77,7 @@ static DefaultScheduler createScheduler( ComponentMainThreadExecutor mainThreadExecutor, ScheduledExecutorService executorService) throws Exception { - return new SchedulerTestingUtils.DefaultSchedulerBuilder( - jobGraph, mainThreadExecutor, executorService) + return new DefaultSchedulerBuilder(jobGraph, mainThreadExecutor, executorService) .setExecutionSlotAllocatorFactory( SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory( physicalSlotProvider)) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java index af2624a55beeb..620fcbe67f7d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/slowtaskdetector/ExecutionTimeBasedSlowTaskDetectorTest.java @@ -31,9 +31,9 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; -import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerTestUtils; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.testutils.TestingUtils; @@ -221,11 +221,11 @@ private ExecutionGraph createDynamicExecutionGraph(JobVertex... jobVertices) thr final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices); final SchedulerBase scheduler = - new AdaptiveBatchSchedulerTestUtils.AdaptiveBatchSchedulerBuilder( + new DefaultSchedulerBuilder( jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), EXECUTOR_RESOURCE.getExecutor()) - .build(); + .buildAdaptiveBatchJobScheduler(); final ExecutionGraph executionGraph = scheduler.getExecutionGraph();