From 38c92ffae34302512a67d1b50983ff6023c246ed Mon Sep 17 00:00:00 2001 From: Zhu Zhu Date: Sun, 19 Jan 2020 17:51:28 +0800 Subject: [PATCH] [FLINK-15629][runtime] Remove LegacyScheduler class, factory and tests --- .../configuration/JobManagerOptions.java | 1 - .../dispatcher/SchedulerNGFactoryFactory.java | 6 - .../runtime/scheduler/LegacyScheduler.java | 106 ------------------ .../scheduler/LegacySchedulerFactory.java | 88 --------------- .../runtime/scheduler/SchedulerBase.java | 1 - .../SchedulerNGFactoryFactoryTest.java | 2 +- .../LegacySchedulerBatchSchedulingTest.java | 65 ----------- .../checkpointing/RegionFailoverITCase.java | 30 ----- 8 files changed, 1 insertion(+), 298 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index c0a5a7bb15480..cd624578a61ff 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -321,7 +321,6 @@ public class JobManagerOptions { .withDescription(Description.builder() .text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:") .list( - text("'legacy': legacy scheduler"), text("'ng': new generation scheduler")) .build()); /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java index 6685f75b1d3d7..36419589d48a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactory.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory; -import org.apache.flink.runtime.scheduler.LegacySchedulerFactory; import org.apache.flink.runtime.scheduler.SchedulerNGFactory; /** @@ -31,8 +30,6 @@ */ public final class SchedulerNGFactoryFactory { - public static final String SCHEDULER_TYPE_LEGACY = "legacy"; - public static final String SCHEDULER_TYPE_NG = "ng"; private SchedulerNGFactoryFactory() {} @@ -43,9 +40,6 @@ public static SchedulerNGFactory createSchedulerNGFactory( final String schedulerName = configuration.getString(JobManagerOptions.SCHEDULER); switch (schedulerName) { - case SCHEDULER_TYPE_LEGACY: - return new LegacySchedulerFactory(restartStrategyFactory); - case SCHEDULER_TYPE_NG: return new DefaultSchedulerFactory(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java deleted file mode 100644 index 86da9c341bd6f..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.runtime.scheduler; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobWriter; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; -import org.apache.flink.runtime.shuffle.ShuffleMaster; - -import org.slf4j.Logger; - -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; - -/** - * A scheduler that delegates to the scheduling logic in the {@link ExecutionGraph}. - * - * @see ExecutionGraph#scheduleForExecution() - */ -public class LegacyScheduler extends SchedulerBase { - - public LegacyScheduler( - final Logger log, - final JobGraph jobGraph, - final BackPressureStatsTracker backPressureStatsTracker, - final Executor ioExecutor, - final Configuration jobMasterConfiguration, - final SlotProvider slotProvider, - final ScheduledExecutorService futureExecutor, - final ClassLoader userCodeLoader, - final CheckpointRecoveryFactory checkpointRecoveryFactory, - final Time rpcTimeout, - final RestartStrategyFactory restartStrategyFactory, - final BlobWriter blobWriter, - final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final Time slotRequestTimeout, - final ShuffleMaster shuffleMaster, - final JobMasterPartitionTracker partitionTracker) throws Exception { - - super( - log, - jobGraph, - backPressureStatsTracker, - ioExecutor, - jobMasterConfiguration, - slotProvider, - futureExecutor, - userCodeLoader, - checkpointRecoveryFactory, - rpcTimeout, - restartStrategyFactory, - blobWriter, - jobManagerJobMetricGroup, - slotRequestTimeout, - shuffleMaster, - partitionTracker, - new ExecutionVertexVersioner(), - true); - } - - @Override - protected long getNumberOfRestarts() { - return getExecutionGraph().getNumberOfRestarts(); - } - - @Override - protected void startSchedulingInternal() { - final ExecutionGraph executionGraph = getExecutionGraph(); - try { - executionGraph.scheduleForExecution(); - } - catch (Throwable t) { - executionGraph.failGlobal(t); - } - } - - @Override - public void handleGlobalFailure(Throwable cause) { - throw new IllegalStateException("Unexpected handleGlobalFailure(...) call"); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java deleted file mode 100644 index 6147c06524cc4..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.flink.runtime.scheduler; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.BlobWriter; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; -import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; -import org.apache.flink.runtime.shuffle.ShuffleMaster; - -import org.slf4j.Logger; - -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Factory for {@link LegacyScheduler}. - */ -public class LegacySchedulerFactory implements SchedulerNGFactory { - - private final RestartStrategyFactory restartStrategyFactory; - - public LegacySchedulerFactory(final RestartStrategyFactory restartStrategyFactory) { - this.restartStrategyFactory = checkNotNull(restartStrategyFactory); - } - - @Override - public SchedulerNG createInstance( - final Logger log, - final JobGraph jobGraph, - final BackPressureStatsTracker backPressureStatsTracker, - final Executor ioExecutor, - final Configuration jobMasterConfiguration, - final SlotProvider slotProvider, - final ScheduledExecutorService futureExecutor, - final ClassLoader userCodeLoader, - final CheckpointRecoveryFactory checkpointRecoveryFactory, - final Time rpcTimeout, - final BlobWriter blobWriter, - final JobManagerJobMetricGroup jobManagerJobMetricGroup, - final Time slotRequestTimeout, - final ShuffleMaster shuffleMaster, - final JobMasterPartitionTracker partitionTracker) throws Exception { - - return new LegacyScheduler( - log, - jobGraph, - backPressureStatsTracker, - ioExecutor, - jobMasterConfiguration, - slotProvider, - futureExecutor, - userCodeLoader, - checkpointRecoveryFactory, - rpcTimeout, - restartStrategyFactory, - blobWriter, - jobManagerJobMetricGroup, - slotRequestTimeout, - shuffleMaster, - partitionTracker); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 7cc17dcaffef9..8ea7c040c06c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -297,7 +297,6 @@ private ExecutionGraph createExecutionGraph( *
  • {@link #getExecutionVertexId(ExecutionAttemptID)} *
  • {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)} * - * Currently, only {@link LegacyScheduler} requires direct access to the execution graph. */ @Deprecated protected ExecutionGraph getExecutionGraph() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java index 79c53ff88fa70..971a69c98352b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/SchedulerNGFactoryFactoryTest.java @@ -41,7 +41,7 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger { private static final NoRestartStrategy.NoRestartStrategyFactory TEST_RESTART_STRATEGY_FACTORY = new NoRestartStrategy.NoRestartStrategyFactory(); @Test - public void createLegacySchedulerFactoryByDefault() { + public void createDefaultSchedulerFactoryByDefault() { final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(new Configuration()); assertThat(schedulerNGFactory, is(instanceOf(DefaultSchedulerFactory.class))); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java deleted file mode 100644 index c5b6b4212268b..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LegacySchedulerBatchSchedulingTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.blob.VoidBlobWriter; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; -import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; -import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; -import org.apache.flink.runtime.rest.handler.legacy.backpressure.VoidBackPressureStatsTracker; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; -import org.apache.flink.runtime.testingUtils.TestingUtils; - -/** - * Tests for the scheduling of batch jobs with {@link LegacyScheduler}. - */ -public class LegacySchedulerBatchSchedulingTest extends BatchSchedulingTestBase { - - @Override - protected LegacyScheduler createScheduler( - final JobGraph jobGraph, - final SlotProvider slotProvider, - final Time slotRequestTimeout) throws Exception { - - final LegacyScheduler legacyScheduler = new LegacyScheduler( - log, - jobGraph, - VoidBackPressureStatsTracker.INSTANCE, - TestingUtils.defaultExecutor(), - new Configuration(), - slotProvider, - TestingUtils.defaultExecutor(), - getClass().getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - TestingUtils.TIMEOUT(), - new NoRestartStrategy.NoRestartStrategyFactory(), - VoidBlobWriter.getInstance(), - UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), - slotRequestTimeout, - NettyShuffleMaster.INSTANCE, - NoOpJobMasterPartitionTracker.INSTANCE); - - return legacyScheduler; - } -} diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 43f9fe99f0df6..d18067566ce21 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -32,13 +32,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; -import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; -import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices; @@ -65,8 +63,6 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import java.io.IOException; import java.util.Collections; @@ -81,15 +77,11 @@ import java.util.stream.IntStream; import java.util.stream.StreamSupport; -import static org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory.SCHEDULER_TYPE_LEGACY; -import static org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory.SCHEDULER_TYPE_NG; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; /** * Tests for region failover with multi regions. */ -@RunWith(Parameterized.class) public class RegionFailoverITCase extends TestLogger { private static final int FAIL_BASE = 1000; @@ -116,34 +108,12 @@ public class RegionFailoverITCase extends TestLogger { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - private final String schedulerType; - - public RegionFailoverITCase(final String schedulerType) { - this.schedulerType = checkNotNull(schedulerType); - } - - @Parameterized.Parameters(name = "scheduler = {0}") - public static Object[] testParameters() { - return new Object[]{SCHEDULER_TYPE_NG, SCHEDULER_TYPE_LEGACY}; - } - @Before public void setup() throws Exception { Configuration configuration = new Configuration(); configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region"); configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName()); - configuration.setString(JobManagerOptions.SCHEDULER, schedulerType); - - // If the LegacyScheduler is configured, we will use a custom RestartStrategy - // (FailingRestartStrategy). This is done to test FLINK-13452. DefaultScheduler takes a - // different code path, and also cannot be configured with custom RestartStrategies. - if (SCHEDULER_TYPE_LEGACY.equals(schedulerType)) { - // global failover times: 3, region failover times: NUM_OF_RESTARTS - configuration.setInteger(FailingRestartStrategy.NUM_FAILURES_CONFIG_OPTION, 3); - configuration.setString(RestartStrategyOptions.RESTART_STRATEGY, FailingRestartStrategy.class.getName()); - } - cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setConfiguration(configuration)