Skip to content

Commit

Permalink
[FLINK-21134] Introduce 'scheduler-mode' config option
Browse files Browse the repository at this point in the history
This closes apache#14790
  • Loading branch information
rmetzger committed Feb 9, 2021
1 parent 7e76fc2 commit 1711f12
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 0 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/expert_scheduling_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>scheduler-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td><p>Enum</p>Possible values: [REACTIVE]</td>
<td>Determines the mode of the scheduler. Note that <span markdown="span">`scheduler-mode`</span>=<span markdown="span">`REACTIVE`</span> is only supported by standalone application deployments, not by active resource managers (YARN, Kubernetes) or session clusters.</td>
</tr>
<tr>
<td><h5>slot.idle.timeout</h5></td>
<td style="word-wrap: break-word;">50000</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/_includes/generated/job_manager_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@
<td>Integer</td>
<td>The max number of completed jobs that can be kept in the job store.</td>
</tr>
<tr>
<td><h5>scheduler-mode</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td><p>Enum</p>Possible values: [REACTIVE]</td>
<td>Determines the mode of the scheduler. Note that <span markdown="span">`scheduler-mode`</span>=<span markdown="span">`REACTIVE`</span> is only supported by standalone application deployments, not by active resource managers (YARN, Kubernetes) or session clusters.</td>
</tr>
<tr>
<td><h5>slot.idle.timeout</h5></td>
<td style="word-wrap: break-word;">50000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public static void main(String[] args) {
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}

@Override
protected boolean supportsReactiveMode() {
return true;
}

@VisibleForTesting
static Configuration loadConfigurationFromClusterConfig(
StandaloneApplicationClusterConfiguration clusterConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;

/** Configuration options for the JobManager. */
Expand Down Expand Up @@ -358,6 +359,19 @@ public class JobManagerOptions {
.list(text("'ng': new generation scheduler"))
.build());

@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
public static final ConfigOption<SchedulerExecutionMode> SCHEDULER_MODE =
key("scheduler-mode")
.enumType(SchedulerExecutionMode.class)
.defaultValue(null)
.withDescription(
Description.builder()
.text(
"Determines the mode of the scheduler. Note that %s=%s is only supported by standalone application deployments, not by active resource managers (YARN, Kubernetes) or session clusters.",
code("scheduler-mode"),
code(SchedulerExecutionMode.REACTIVE.name()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.Experimental;

/** Enum for controlling whether REACTIVE mode is enabled or not. */
@Experimental
public enum SchedulerExecutionMode {
REACTIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JMXServerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginManager;
Expand Down Expand Up @@ -151,6 +153,16 @@ protected ClusterEntrypoint(Configuration configuration) {
this.configuration = generateClusterConfiguration(configuration);
this.terminationFuture = new CompletableFuture<>();

if (configuration.get(JobManagerOptions.SCHEDULER_MODE) == SchedulerExecutionMode.REACTIVE
&& !supportsReactiveMode()) {
final String msg =
"Reactive mode is configured for an unsupported cluster type. At the moment, reactive mode is only supported by standalone application clusters (bin/standalone-job.sh).";
// log message as well, otherwise the error is only shown in the .out file of the
// cluster
LOG.error(msg);
throw new IllegalConfigurationException(msg);
}

shutDownHook =
ShutdownHookUtil.addShutdownHook(
this::cleanupDirectories, getClass().getSimpleName(), LOG);
Expand Down Expand Up @@ -203,6 +215,10 @@ public void startCluster() throws ClusterEntrypointException {
}
}

protected boolean supportsReactiveMode() {
return false;
}

private void configureFileSystems(Configuration configuration, PluginManager pluginManager) {
LOG.info("Install default filesystem.");
FileSystem.initialize(configuration, pluginManager);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;

import org.junit.Test;

import java.io.IOException;

import static org.junit.Assert.fail;

/** Tests for the {@link ClusterEntrypoint}. */
public class ClusterEntrypointTest {

@Test(expected = IllegalConfigurationException.class)
public void testStandaloneSessionClusterEntrypointDeniedInReactiveMode() {
Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
new MockEntryPoint(configuration);
fail("Entrypoint initialization is supposed to fail");
}

private static class MockEntryPoint extends ClusterEntrypoint {

protected MockEntryPoint(Configuration configuration) {
super(configuration);
}

@Override
protected DispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration)
throws IOException {
throw new UnsupportedOperationException("Not needed for this test");
}

@Override
protected ArchivedExecutionGraphStore createSerializableExecutionGraphStore(
Configuration configuration, ScheduledExecutor scheduledExecutor)
throws IOException {
throw new UnsupportedOperationException("Not needed for this test");
}

@Override
protected boolean supportsReactiveMode() {
return false;
}
}
}

0 comments on commit 1711f12

Please sign in to comment.