Skip to content

Commit

Permalink
[FLINK-10397] Remove CoreOptions#MODE
Browse files Browse the repository at this point in the history
Removes the MODE option used to switch between the new and legacy mode.

This closes apache#6752.
  • Loading branch information
tillrohrmann committed Sep 28, 2018
1 parent b0d5e99 commit a8434d6
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 224 deletions.
5 changes: 0 additions & 5 deletions docs/_includes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@
<td style="word-wrap: break-word;">'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.</td>
<td></td>
</tr>
<tr>
<td><h5>mode</h5></td>
<td style="word-wrap: break-word;">"new"</td>
<td>Switch to select the execution mode. Possible values are 'new' and 'legacy'.</td>
</tr>
<tr>
<td><h5>parallelism.default</h5></td>
<td style="word-wrap: break-word;">1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.JobExecutorService;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
Expand Down Expand Up @@ -125,39 +124,28 @@ public void start() throws Exception {
}

private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
final JobExecutorService newJobExecutorService;
if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}

if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}
final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();

final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumTaskManagers(
configuration.getInteger(
ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
ConfigConstants.DEFAULT_LOCAL_NUMBER_TASK_MANAGER))
.setRpcServiceSharing(RpcServiceSharing.SHARED)
.setNumSlotsPerTaskManager(
configuration.getInteger(
TaskManagerOptions.NUM_TASK_SLOTS, 1))
.build();

final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();

configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

newJobExecutorService = miniCluster;
} else {
final LocalFlinkMiniCluster localFlinkMiniCluster = new LocalFlinkMiniCluster(configuration, true);
localFlinkMiniCluster.start();

newJobExecutorService = localFlinkMiniCluster;
}
final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);
miniCluster.start();

configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

return newJobExecutorService;
return miniCluster;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.optimizer.DataStatistics;
Expand Down Expand Up @@ -151,11 +149,7 @@ public int getDefaultParallelism() {
public void start() throws Exception {
synchronized (lock) {
if (client == null) {
if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
client = new StandaloneClusterClient(clientConfiguration);
} else {
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
}
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
client.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ public class CliFrontend {

private final int defaultParallelism;

private final boolean isNewMode;

public CliFrontend(
Configuration configuration,
List<CustomCommandLine<?>> customCommandLines) throws Exception {
Expand All @@ -147,8 +145,6 @@ public CliFrontend(

this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);

this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -233,7 +229,7 @@ private <T> void runProgram(
final ClusterClient<T> client;

// directly deploy the job if the cluster is started in job mode and detached
if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {
if (clusterId == null && runOptions.getDetachedMode()) {
int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();

final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
Expand Down Expand Up @@ -1200,11 +1196,7 @@ public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration co
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}

if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
customCommandLines.add(new DefaultCLI(configuration));
} else {
customCommandLines.add(new LegacyCLI(configuration));
}
customCommandLines.add(new DefaultCLI(configuration));

return customCommandLines;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,21 @@
package org.apache.flink.client.cli;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.TestLogger;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.List;

/**
* Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode.
* Base test class for {@link CliFrontend} tests.
*/
@RunWith(Parameterized.class)
public abstract class CliFrontendTestBase extends TestLogger {
@Parameterized.Parameter
public String mode;

@Parameterized.Parameters(name = "Mode = {0}")
public static List<String> parameters() {
return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE);
}

protected Configuration getConfiguration() {
final Configuration configuration = GlobalConfiguration
.loadConfiguration(CliFrontendTestUtils.getConfigDir());
configuration.setString(CoreOptions.MODE, mode);
return configuration;
}

static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
switch (configuration.getString(CoreOptions.MODE)) {
case CoreOptions.LEGACY_MODE:
return new LegacyCLI(configuration);
case CoreOptions.NEW_MODE:
return new DefaultCLI(configuration);
}
throw new IllegalStateException();
return new DefaultCLI(configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,26 +304,4 @@ public static ConfigOption<Long> fileSystemConnectionLimitTimeout(String scheme)
public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeout(String scheme) {
return ConfigOptions.key("fs." + scheme + ".limit.stream-timeout").defaultValue(0L);
}

// ------------------------------------------------------------------------
// Distributed architecture
// ------------------------------------------------------------------------

/**
* Constant value for the new execution mode.
*/
public static final String NEW_MODE = "new";

/**
* Constant value for the old execution mode.
*/
public static final String LEGACY_MODE = "legacy";

/**
* Switch to select the execution mode. Possible values are {@link CoreOptions#NEW_MODE}
* and {@link CoreOptions#LEGACY_MODE}.
*/
public static final ConfigOption<String> MODE = key("mode")
.defaultValue(NEW_MODE)
.withDescription("Switch to select the execution mode. Possible values are 'new' and 'legacy'.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,36 +139,24 @@ object FlinkShell {
}
}

private type LocalCluster = Either[StandaloneMiniCluster, MiniCluster]

def fetchConnectionInfo(
configuration: Configuration,
config: Config
): (String, Int, Option[Either[LocalCluster , ClusterClient[_]]]) = {
): (String, Int, Option[Either[MiniCluster , ClusterClient[_]]]) = {
config.executionMode match {
case ExecutionMode.LOCAL => // Local mode
val config = configuration
config.setInteger(JobManagerOptions.PORT, 0)

val (miniCluster, port) = config.getString(CoreOptions.MODE) match {
case CoreOptions.LEGACY_MODE => {
val cluster = new StandaloneMiniCluster(config)

(Left(cluster), cluster.getPort)
}
case CoreOptions.NEW_MODE => {
val miniClusterConfig = new MiniClusterConfiguration.Builder()
.setConfiguration(config)
.build()
val cluster = new MiniCluster(miniClusterConfig)
cluster.start()

(Right(cluster), cluster.getRestAddress.getPort)
}
}
val miniClusterConfig = new MiniClusterConfiguration.Builder()
.setConfiguration(config)
.build()
val cluster = new MiniCluster(miniClusterConfig)
cluster.start()
val port = cluster.getRestAddress.getPort

println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n")
("localhost", port, Some(Left(miniCluster)))
("localhost", port, Some(Left(cluster)))

case ExecutionMode.REMOTE => // Remote mode
if (config.host.isEmpty || config.port.isEmpty) {
Expand Down Expand Up @@ -211,8 +199,7 @@ object FlinkShell {
val (repl, cluster) = try {
val (host, port, cluster) = fetchConnectionInfo(configuration, config)
val conf = cluster match {
case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration
case Some(Left(Right(_))) => configuration
case Some(Left(_)) => configuration
case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration
case None => configuration
}
Expand Down Expand Up @@ -242,8 +229,7 @@ object FlinkShell {
} finally {
repl.closeInterpreter()
cluster match {
case Some(Left(Left(legacyMiniCluster))) => legacyMiniCluster.close()
case Some(Left(Right(newMiniCluster))) => newMiniCluster.close()
case Some(Left(miniCluster)) => miniCluster.close()
case Some(Right(yarnCluster)) => yarnCluster.shutdown()
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ object ScalaShellITCase {

@BeforeClass
def beforeAll(): Unit = {
configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
// set to different than default so not to interfere with ScalaShellLocalStartupITCase
configuration.setInteger(RestOptions.PORT, 8082)
val miniConfig = new MiniClusterConfiguration.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.graph.StreamGraph;
Expand Down Expand Up @@ -206,11 +204,7 @@ protected JobExecutionResult executeRemotely(StreamGraph streamGraph, List<URL>

final ClusterClient<?> client;
try {
if (CoreOptions.LEGACY_MODE.equals(configuration.getString(CoreOptions.MODE))) {
client = new StandaloneClusterClient(configuration);
} else {
client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
}
client = new RestClusterClient<>(configuration, "RemoteStreamEnvironment");
}
catch (Exception e) {
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
Expand Down Expand Up @@ -1653,13 +1652,9 @@ public static LocalStreamEnvironment createLocalEnvironment(int parallelism) {
public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) {
final LocalStreamEnvironment currentEnvironment;

if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {
currentEnvironment = new LocalStreamEnvironment(configuration);
} else {
currentEnvironment = new LegacyLocalStreamEnvironment(configuration);
}

currentEnvironment = new LocalStreamEnvironment(configuration);
currentEnvironment.setParallelism(parallelism);

return currentEnvironment;
}

Expand Down
Loading

0 comments on commit a8434d6

Please sign in to comment.