Skip to content

Commit

Permalink
[FLINK-14651][runtime] Enable DefaultScheduler by default
Browse files Browse the repository at this point in the history
Run tests against legacy scheduler in separate Travis stages

This closes apache#14651.
  • Loading branch information
GJL committed Dec 2, 2019
1 parent b04337f commit e62a04a
Show file tree
Hide file tree
Showing 68 changed files with 158 additions and 159 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ jobs:
env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11"
name: tests
- if: type in (pull_request, push)
script: ./tools/travis_controller.sh scheduler_ng_core
script: ./tools/travis_controller.sh legacy_scheduler_core
env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11"
name: core - scheduler_ng
name: core - legacy_scheduler
- if: type in (pull_request, push)
script: ./tools/travis_controller.sh scheduler_ng_tests
script: ./tools/travis_controller.sh legacy_scheduler_tests
env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11"
name: tests - scheduler_ng
name: tests - legacy_scheduler
- if: type in (pull_request, push)
script: ./tools/travis_controller.sh misc
env: PROFILE="-Dhadoop.version=2.8.3 -Dinclude_hadoop_aws -Dscala-2.11"
Expand Down Expand Up @@ -387,4 +387,4 @@ jobs:
- if: type = cron
env: PROFILE="-Djdk11 -Dinclude-hadoop -Dhadoop.version=2.8.3 -DincludeE2E=org.apache.flink.tests.util.categories.TravisGroup6"
script: ./tools/travis/nightly.sh split_tpcds.sh
name: e2e - tpcds - jdk 11
name: e2e - tpcds - jdk 11
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -80,7 +80,7 @@
/**
* Simple and maybe stupid test to check the {@link ClusterClient} class.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class ClientTest extends TestLogger {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ public class JobManagerOptions {
@Documentation.ExcludeFromDocumentation("SchedulerNG is still in development.")
public static final ConfigOption<String> SCHEDULER =
key("jobmanager.scheduler")
.defaultValue("legacy")
.stringType()
.defaultValue("ng")
.withDescription(Description.builder()
.text("Determines which scheduler implementation is used to schedule tasks. Accepted values are:")
.list(
Expand Down
1 change: 1 addition & 0 deletions flink-end-to-end-tests/test-scripts/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ function check_logs_for_exceptions {
| grep -v "org.apache.flink.runtime.checkpoint.CheckpointException" \
| grep -v "org.elasticsearch.ElasticsearchException" \
| grep -v "Elasticsearch exception" \
| grep -v "org.apache.flink.runtime.JobException: Recovery is suppressed" \
| grep -ic "exception" || true)
if [[ ${exception_count} -gt 0 ]]; then
echo "Found exception in log files:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -71,7 +71,7 @@
/**
* Tests for the WebFrontend.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class WebFrontendITCase extends TestLogger {

private static final int NUM_TASK_MANAGERS = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

Expand All @@ -48,7 +48,7 @@
/**
* Tests for the {@link JarRunHandler}.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class JarRunHandlerTest extends TestLogger {

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
Expand Down Expand Up @@ -67,7 +67,7 @@
* Tests for the HistoryServer.
*/
@RunWith(Parameterized.class)
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class HistoryServerTest extends TestLogger {

private static final JsonFactory JACKSON_FACTORY = new JsonFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class SchedulerNGFactoryFactory {

private SchedulerNGFactoryFactory() {}

static SchedulerNGFactory createSchedulerNGFactory(
public static SchedulerNGFactory createSchedulerNGFactory(
final Configuration configuration,
final RestartStrategyFactory restartStrategyFactory) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,13 @@ public class FailoverStrategyLoader {
/** Config name for the {@link AdaptedRestartPipelinedRegionStrategyNG}. */
public static final String PIPELINED_REGION_RESTART_STRATEGY_NAME = "region";

/** Config name for the {@link NoOpFailoverStrategy}. */
public static final String NO_OP_FAILOVER_STRATEGY = "noop";

// ------------------------------------------------------------------------

/**
* Loads a FailoverStrategy Factory from the given configuration.
*/
public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config, @Nullable Logger logger) {
// The new generation scheduler does not depend on the FailoverStrategy loaded here.
// Therefore, we load a noop failover strategy if the new generation scheduler is configured.
final String strategyParam = config.getString(JobManagerOptions.SCHEDULER).equals("ng") ?
NO_OP_FAILOVER_STRATEGY :
config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);
final String strategyParam = config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);

if (StringUtils.isNullOrWhitespaceOnly(strategyParam)) {
if (logger != null) {
Expand All @@ -75,9 +68,6 @@ public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config
case INDIVIDUAL_RESTART_STRATEGY_NAME:
return new RestartIndividualStrategy.Factory();

case NO_OP_FAILOVER_STRATEGY:
return new NoOpFailoverStrategy.Factory();

default:
// we could interpret the parameter as a factory class name and instantiate that
// for now we simply do not support this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public DefaultScheduler(
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker);
partitionTracker,
false);

this.log = log;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public LegacyScheduler(
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker);
partitionTracker,
true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import org.apache.flink.runtime.executiongraph.failover.NoOpFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
Expand Down Expand Up @@ -145,6 +148,8 @@ public abstract class SchedulerBase implements SchedulerNG {

private final Time slotRequestTimeout;

private final boolean legacyScheduling;

private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor(
"SchedulerBase is not initialized with proper main thread executor. " +
"Call to SchedulerBase.setMainThreadExecutor(...) required.");
Expand All @@ -165,7 +170,8 @@ public SchedulerBase(
final JobManagerJobMetricGroup jobManagerJobMetricGroup,
final Time slotRequestTimeout,
final ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker) throws Exception {
final JobMasterPartitionTracker partitionTracker,
final boolean legacyScheduling) throws Exception {

this.log = checkNotNull(log);
this.jobGraph = checkNotNull(jobGraph);
Expand All @@ -192,6 +198,7 @@ public SchedulerBase(
this.blobWriter = checkNotNull(blobWriter);
this.jobManagerJobMetricGroup = checkNotNull(jobManagerJobMetricGroup);
this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
this.legacyScheduling = legacyScheduling;

this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
this.schedulingTopology = executionGraph.getSchedulingTopology();
Expand Down Expand Up @@ -228,6 +235,11 @@ private ExecutionGraph createExecutionGraph(
JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
ShuffleMaster<?> shuffleMaster,
final JobMasterPartitionTracker partitionTracker) throws JobExecutionException, JobException {

final FailoverStrategy.Factory failoverStrategy = legacyScheduling ?
FailoverStrategyLoader.loadFailoverStrategy(jobMasterConfiguration, log) :
new NoOpFailoverStrategy.Factory();

return ExecutionGraphBuilder.buildGraph(
null,
jobGraph,
Expand All @@ -244,7 +256,8 @@ private ExecutionGraph createExecutionGraph(
slotRequestTimeout,
log,
shuffleMaster,
partitionTracker);
partitionTracker,
failoverStrategy);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.util.TestLogger;

Expand All @@ -44,7 +43,7 @@ public class SchedulerNGFactoryFactoryTest extends TestLogger {
@Test
public void createLegacySchedulerFactoryByDefault() {
final SchedulerNGFactory schedulerNGFactory = createSchedulerNGFactory(new Configuration());
assertThat(schedulerNGFactory, is(instanceOf(LegacySchedulerFactory.class)));
assertThat(schedulerNGFactory, is(instanceOf(DefaultSchedulerFactory.class)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.TestLogger;

import org.junit.ClassRule;
Expand All @@ -44,7 +44,7 @@
/**
* Test for consuming a pipelined result only partially.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class PartialConsumePipelinedResultTest extends TestLogger {

// Test configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -71,7 +71,7 @@
* Small test to check that the {@link org.apache.flink.runtime.blob.BlobServer} cleanup is executed
* after job termination.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class BlobsCleanupITCase extends TestLogger {

private static final long RETRY_INTERVAL = 100L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;

Expand All @@ -48,7 +48,7 @@
* of slots. This effectively tests that Flink can execute jobs with blocking results
* in a staged fashion.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class SlotCountExceedingParallelismTest extends TestLogger {

// Test configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;

Expand All @@ -50,7 +50,7 @@
* Tests for the lazy scheduling/updating of consumers depending on the
* producers result.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class ScheduleOrUpdateConsumersTest extends TestLogger {

private static final int NUMBER_OF_TMS = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.testutils.junit.category.AlsoRunWithSchedulerNG;
import org.apache.flink.testutils.junit.category.AlsoRunWithLegacyScheduler;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand All @@ -39,7 +39,7 @@
/**
* Integration tests for job scheduling.
*/
@Category(AlsoRunWithSchedulerNG.class)
@Category(AlsoRunWithLegacyScheduler.class)
public class JobExecutionITCase extends TestLogger {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
Expand Down Expand Up @@ -108,7 +109,6 @@
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.scheduler.LegacySchedulerFactory;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
Expand Down Expand Up @@ -294,7 +294,8 @@ public void testDeclineCheckpointInvocationWithUserException() throws Exception
final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);

final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(
configuration,
jobManagerSharedServices.getRestartStrategyFactory());

final JobMaster jobMaster = new JobMaster(
Expand Down Expand Up @@ -1054,6 +1055,8 @@ public void testRequestNextInputSplitWithLocalFailover() throws Exception {
public void testRequestNextInputSplitWithGlobalFailover() throws Exception {
configuration.setInteger(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofSeconds(0));
configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");


final Function<List<List<InputSplit>>, Collection<InputSplit>> expectAllRemainingInputSplits = this::flattenCollection;

Expand Down Expand Up @@ -1618,7 +1621,8 @@ public void testTriggerSavepointTimeout() throws Exception {
final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build();
final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);

final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory(
final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(
configuration,
jobManagerSharedServices.getRestartStrategyFactory());

final JobMaster jobMaster = new JobMaster(
Expand Down
Loading

0 comments on commit e62a04a

Please sign in to comment.