Skip to content

Commit

Permalink
[FLINK-9121] [flip6] Remove Flip6 prefixes and other references
Browse files Browse the repository at this point in the history
Remove Flip6 prefixes and references to make Flip-6 the proper default:

Rename categories Flip6 -> New and OldAndFlip6 -> LegacyAndNew

Remove Flip-6 from documentation

Remove Flip-6 from start up scripts

This closes apache#5801.
  • Loading branch information
tillrohrmann committed Apr 2, 2018
1 parent 3f1d43d commit 143f37d
Show file tree
Hide file tree
Showing 127 changed files with 409 additions and 410 deletions.
10 changes: 5 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,27 @@ matrix:
- jdk: "oraclejdk8"
env:
- TEST="core"
- PROFILE="-Dhadoop.version=2.8.3 -Dflip6"
- PROFILE="-Dhadoop.version=2.8.3 -Dnew"
- CACHE_NAME=JDK8_H280_CO
- jdk: "oraclejdk8"
env:
- TEST="libraries"
- PROFILE="-Dhadoop.version=2.8.3 -Dflip6"
- PROFILE="-Dhadoop.version=2.8.3 -Dnew"
- CACHE_NAME=JDK8_H280_L
- jdk: "oraclejdk8"
env:
- TEST="connectors"
- PROFILE="-Dhadoop.version=2.8.3 -Dflip6 -Pinclude-kinesis"
- PROFILE="-Dhadoop.version=2.8.3 -Dnew -Pinclude-kinesis"
- CACHE_NAME=JDK8_H280_CN
- jdk: "oraclejdk8"
env:
- TEST="tests"
- PROFILE="-Dhadoop.version=2.8.3 -Dflip6"
- PROFILE="-Dhadoop.version=2.8.3 -Dnew"
- CACHE_NAME=JDK8_H280_T
- jdk: "oraclejdk8"
env:
- TEST="misc"
- PROFILE="-Dhadoop.version=2.8.3 -Dflip6 -Dinclude_hadoop_aws"
- PROFILE="-Dhadoop.version=2.8.3 -Dnew -Dinclude_hadoop_aws"
- CACHE_NAME=JDK8_H280_M
- jdk: "openjdk8"
env:
Expand Down
4 changes: 0 additions & 4 deletions docs/monitoring/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,10 +674,6 @@ Response:
{"jobid": "869a9868d49c679e7355700e0857af85"}
~~~

## FLIP-6

The following is the REST API documentation for FLIP-6.

### Dispatcher

{% include generated/rest_dispatcher.html %}
Expand Down
10 changes: 5 additions & 5 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,15 +378,15 @@ You have to configure `jobmanager.archive.fs.dir` in order to archive terminated

{% include generated/history_server_configuration.html %}

## Flip-6
### Slot Manager

- `mode`: Execution mode of Flink. Possible values are `old` and `flip6`. In order to start the Flip-6 components, you have to specify `flip6` (DEFAULT: `old`).
The configuration keys in this section are relevant for the SlotManager running in the ResourceManager

### Slot Manager (Flip-6)
{% include generated/slot_manager_configuration.html %}

The configuration keys in this section are relevant for the SlotManager running in the Flip-6 ResourceManager
## Legacy

{% include generated/slot_manager_configuration.html %}
- `mode`: Execution mode of Flink. Possible values are `legacy` and `new`. In order to start the legacy components, you have to specify `legacy` (DEFAULT: `new`).

## Background

Expand Down
4 changes: 3 additions & 1 deletion docs/ops/state/large_state_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,12 @@ files also no additional disk space is consumed for task-local recovery with inc

### Allocation-preserving scheduling

Task-local recovery assumes allocation-preserving task scheduling under failures, which was introduced as part of FLIP-6 and works as follows. Each task remembers its previous
Task-local recovery assumes allocation-preserving task scheduling under failures, which works as follows. Each task remembers its previous
allocation and *requests the exact same slot* to restart in recovery. If this slot is not available, the task will request a *new, fresh slot* from the resource manager. This way,
if a task manager is no longer available, a task that cannot return to its previous location *will not drive other recovering tasks out of their previous slots*. Our reasoning is
that the previous slot can only disappear when a task manager is no longer available, and in this case *some* tasks have to request a new slot anyways. With our scheduling strategy
we give the maximum number of tasks a chance to recover from their local state and avoid the cascading effect of tasks stealing their previous slots from one another.

Allocation-preserving scheduling does not work with Flink's legacy mode.

{% top %}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void start() throws Exception {

private JobExecutorService createJobExecutorService(Configuration configuration) throws Exception {
final JobExecutorService newJobExecutorService;
if (CoreOptions.FLIP6_MODE.equals(configuration.getString(CoreOptions.MODE))) {
if (CoreOptions.NEW_MODE.equals(configuration.getString(CoreOptions.MODE))) {

configuration.setInteger(RestOptions.REST_PORT, 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public int getDefaultParallelism() {
public void start() throws Exception {
synchronized (lock) {
if (client == null) {
if (CoreOptions.OLD_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
if (CoreOptions.LEGACY_MODE.equals(clientConfiguration.getString(CoreOptions.MODE))) {
client = new StandaloneClusterClient(clientConfiguration);
} else {
client = new RestClusterClient<>(clientConfiguration, "RemoteExecutor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class CliFrontend {

private final int defaultParallelism;

private final boolean flip6;
private final boolean isNewMode;

public CliFrontend(
Configuration configuration,
Expand All @@ -144,7 +144,7 @@ public CliFrontend(
this.clientTimeout = AkkaUtils.getClientTimeout(this.configuration);
this.defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);

this.flip6 = CoreOptions.FLIP6_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
this.isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(configuration.getString(CoreOptions.MODE));
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -225,7 +225,7 @@ private <T> void runProgram(
final ClusterClient<T> client;

// directly deploy the job if the cluster is started in job mode and detached
if (flip6 && clusterId == null && runOptions.getDetachedMode()) {
if (isNewMode && clusterId == null && runOptions.getDetachedMode()) {
int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();

final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
Expand Down Expand Up @@ -1162,10 +1162,10 @@ public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration co
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}

if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.FLIP6_MODE)) {
customCommandLines.add(new Flip6DefaultCLI(configuration));
} else {
if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
customCommandLines.add(new DefaultCLI(configuration));
} else {
customCommandLines.add(new LegacyCLI(configuration));
}

return customCommandLines;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.util.FlinkException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.Nullable;

Expand All @@ -39,13 +40,17 @@ public DefaultCLI(Configuration configuration) {

@Override
public boolean isActive(CommandLine commandLine) {
// always active because we can try to read a JobManager address from the config
return true;
}

@Override
public String getId() {
return "Default CLI";
return "default";
}

@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,41 @@
package org.apache.flink.client.cli;

import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor;
import org.apache.flink.client.deployment.LegacyStandaloneClusterDescriptor;
import org.apache.flink.client.deployment.StandaloneClusterId;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;

import javax.annotation.Nullable;

/**
* The default CLI which is used for interaction with standalone clusters.
* The CLI which is used for interaction with the legacy standalone clusters.
*/
public class Flip6DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterId> {
public class LegacyCLI extends AbstractCustomCommandLine<StandaloneClusterId> {

public Flip6DefaultCLI(Configuration configuration) {
public LegacyCLI(Configuration configuration) {
super(configuration);
}

@Override
public boolean isActive(CommandLine commandLine) {
// always active because we can try to read a JobManager address from the config
return true;
}

@Override
public String getId() {
return "flip6";
return "legacy";
}

@Override
public void addGeneralOptions(Options baseOptions) {
super.addGeneralOptions(baseOptions);
}

@Override
public Flip6StandaloneClusterDescriptor createClusterDescriptor(
public LegacyStandaloneClusterDescriptor createClusterDescriptor(
CommandLine commandLine) throws FlinkException {
final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);

return new Flip6StandaloneClusterDescriptor(effectiveConfiguration);
return new LegacyStandaloneClusterDescriptor(effectiveConfiguration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,56 +18,55 @@

package org.apache.flink.client.deployment;

import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/**
* A deployment descriptor for an existing cluster.
*/
public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {
public class LegacyStandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterId> {

private final Configuration config;

public Flip6StandaloneClusterDescriptor(Configuration config) {
this.config = Preconditions.checkNotNull(config);
public LegacyStandaloneClusterDescriptor(Configuration config) {
this.config = config;
}

@Override
public String getClusterDescription() {
String host = config.getString(JobManagerOptions.ADDRESS, "");
int port = config.getInteger(JobManagerOptions.PORT, -1);
return "FLIP-6 Standalone cluster at " + host + ":" + port;
return "Standalone cluster at " + host + ":" + port;
}

@Override
public RestClusterClient<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
public StandaloneClusterClient retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
try {
return new RestClusterClient<>(config, standaloneClusterId);
return new StandaloneClusterClient(config);
} catch (Exception e) {
throw new ClusterRetrieveException("Couldn't retrieve FLIP-6 standalone cluster", e);
throw new ClusterRetrieveException("Couldn't retrieve standalone cluster", e);
}
}

@Override
public RestClusterClient<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) {
throw new UnsupportedOperationException("Can't deploy a FLIP-6 standalone cluster.");
public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) {
throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
}

@Override
public RestClusterClient<StandaloneClusterId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) {
throw new UnsupportedOperationException("Can't deploy a standalone FLIP-6 per-job cluster.");
public StandaloneClusterClient deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) {
throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
}

@Override
public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
throw new UnsupportedOperationException("Cannot terminate a Flip-6 standalone cluster.");
throw new UnsupportedOperationException("Cannot terminate standalone clusters.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.flink.client.deployment;

import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/**
* A deployment descriptor for an existing cluster.
Expand All @@ -32,7 +33,7 @@ public class StandaloneClusterDescriptor implements ClusterDescriptor<Standalone
private final Configuration config;

public StandaloneClusterDescriptor(Configuration config) {
this.config = config;
this.config = Preconditions.checkNotNull(config);
}

@Override
Expand All @@ -43,30 +44,30 @@ public String getClusterDescription() {
}

@Override
public StandaloneClusterClient retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
public RestClusterClient<StandaloneClusterId> retrieve(StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
try {
return new StandaloneClusterClient(config);
return new RestClusterClient<>(config, standaloneClusterId);
} catch (Exception e) {
throw new ClusterRetrieveException("Couldn't retrieve standalone cluster", e);
}
}

@Override
public StandaloneClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) {
public RestClusterClient<StandaloneClusterId> deploySessionCluster(ClusterSpecification clusterSpecification) {
throw new UnsupportedOperationException("Can't deploy a standalone cluster.");
}

@Override
public StandaloneClusterClient deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) {
public RestClusterClient<StandaloneClusterId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) {
throw new UnsupportedOperationException("Can't deploy a standalone per-job cluster.");
}

@Override
public void terminateCluster(StandaloneClusterId clusterId) throws FlinkException {
throw new UnsupportedOperationException("Cannot terminate standalone clusters.");
throw new UnsupportedOperationException("Cannot terminate a standalone cluster.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void setup() throws Exception {
final Configuration configuration = new Configuration();
frontend = new CliFrontend(
configuration,
Collections.singletonList(new DefaultCLI(configuration)));
Collections.singletonList(new LegacyCLI(configuration)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.List;

/**
* Base test class for {@link CliFrontend} tests that wraps the Flip-6 vs. non-Flip-6 modes.
* Base test class for {@link CliFrontend} tests that wraps the new vs. legacy mode.
*/
@RunWith(Parameterized.class)
public abstract class CliFrontendTestBase extends TestLogger {
Expand All @@ -39,7 +39,7 @@ public abstract class CliFrontendTestBase extends TestLogger {

@Parameterized.Parameters(name = "Mode = {0}")
public static List<String> parameters() {
return Arrays.asList(CoreOptions.OLD_MODE, CoreOptions.FLIP6_MODE);
return Arrays.asList(CoreOptions.LEGACY_MODE, CoreOptions.NEW_MODE);
}

protected Configuration getConfiguration() {
Expand All @@ -51,10 +51,10 @@ protected Configuration getConfiguration() {

static AbstractCustomCommandLine<?> getCli(Configuration configuration) {
switch (configuration.getString(CoreOptions.MODE)) {
case CoreOptions.OLD_MODE:
case CoreOptions.LEGACY_MODE:
return new LegacyCLI(configuration);
case CoreOptions.NEW_MODE:
return new DefaultCLI(configuration);
case CoreOptions.FLIP6_MODE:
return new Flip6DefaultCLI(configuration);
}
throw new IllegalStateException();
}
Expand Down
Loading

0 comments on commit 143f37d

Please sign in to comment.