Skip to content

Commit

Permalink
[FLINK-15629][runtime] Remove LegacyScheduler class, factory and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuzhurk authored and tillrohrmann committed May 16, 2020
1 parent 7796f6e commit 38c92ff
Show file tree
Hide file tree
Showing 8 changed files with 1 addition and 298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ public class JobManagerOptions {
.withDescription(Description.builder()
.text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
.list(
text("'legacy': legacy scheduler"),
text("'ng': new generation scheduler"))
.build());
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;

/**
* Factory for {@link SchedulerNGFactory}.
*/
public final class SchedulerNGFactoryFactory {

public static final String SCHEDULER_TYPE_LEGACY = "legacy";

public static final String SCHEDULER_TYPE_NG = "ng";

private SchedulerNGFactoryFactory() {}
Expand All @@ -43,9 +40,6 @@ public static SchedulerNGFactory createSchedulerNGFactory(

final String schedulerName = configuration.getString(JobManagerOptions.SCHEDULER);
switch (schedulerName) {
case SCHEDULER_TYPE_LEGACY:
return new LegacySchedulerFactory(restartStrategyFactory);

case SCHEDULER_TYPE_NG:
return new DefaultSchedulerFactory();

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ private ExecutionGraph createExecutionGraph(
* <li>{@link #getExecutionVertexId(ExecutionAttemptID)}
* <li>{@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)}
* </ul>
* Currently, only {@link LegacyScheduler} requires direct access to the execution graph.
*/
@Deprecated
protected ExecutionGraph getExecutionGraph() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
private static final NoRestartStrategy.NoRestartStrategyFactory TEST_RESTART_STRATEGY_FACTORY = new NoRestartStrategy.NoRestartStrategyFactory();

@Test
public void createLegacySchedulerFactoryByDefault() {
public void createDefaultSchedulerFactoryByDefault() {
final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(new Configuration());
assertThat(schedulerNGFactory, is(instanceOf(DefaultSchedulerFactory.class)));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
import org.apache.flink.runtime.executiongraph.restart.FailingRestartStrategy;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
Expand All @@ -65,8 +63,6 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -81,15 +77,11 @@
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory.SCHEDULER_TYPE_LEGACY;
import static org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory.SCHEDULER_TYPE_NG;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.junit.Assert.assertEquals;

/**
* Tests for region failover with multi regions.
*/
@RunWith(Parameterized.class)
public class RegionFailoverITCase extends TestLogger {

private static final int FAIL_BASE = 1000;
Expand All @@ -116,34 +108,12 @@ public class RegionFailoverITCase extends TestLogger {
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final String schedulerType;

public RegionFailoverITCase(final String schedulerType) {
this.schedulerType = checkNotNull(schedulerType);
}

@Parameterized.Parameters(name = "scheduler = {0}")
public static Object[] testParameters() {
return new Object[]{SCHEDULER_TYPE_NG, SCHEDULER_TYPE_LEGACY};
}

@Before
public void setup() throws Exception {
Configuration configuration = new Configuration();
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
configuration.setString(HighAvailabilityOptions.HA_MODE, TestingHAFactory.class.getName());

configuration.setString(JobManagerOptions.SCHEDULER, schedulerType);

// If the LegacyScheduler is configured, we will use a custom RestartStrategy
// (FailingRestartStrategy). This is done to test FLINK-13452. DefaultScheduler takes a
// different code path, and also cannot be configured with custom RestartStrategies.
if (SCHEDULER_TYPE_LEGACY.equals(schedulerType)) {
// global failover times: 3, region failover times: NUM_OF_RESTARTS
configuration.setInteger(FailingRestartStrategy.NUM_FAILURES_CONFIG_OPTION, 3);
configuration.setString(RestartStrategyOptions.RESTART_STRATEGY, FailingRestartStrategy.class.getName());
}

cluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
Expand Down

0 comments on commit 38c92ff

Please sign in to comment.