Skip to content

Commit

Permalink
[FLINK-4770] [core] Introduce 'CoreOptions'
Browse files Browse the repository at this point in the history
The CoreOptions should hold all essential configuration values that are not specific to
JobManager, TaskManager or any feature area, like HighAvailability or Security.

Examples for that are
  - default java options
  - default parallelism
  - default state backend
  • Loading branch information
StephanEwen committed Feb 20, 2017
1 parent 544f534 commit a404796
Show file tree
Hide file tree
Showing 19 changed files with 86 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.security.SecurityUtils;
Expand Down Expand Up @@ -216,7 +217,7 @@ private static void startSecureFlinkClusterWithRecoveryModeEnabled() {
config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH, hdfsURI + "/flink/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI + "/flink/recovery");
config.setString("state.backend.fs.checkpointdir", hdfsURI + "/flink/checkpoints");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,11 @@ public final class ConfigConstants {
// ----------------------------- Streaming --------------------------------

/**
* State backend for checkpoints;
* State backend for checkpoints
*
* @deprecated Use {@link CoreOptions#STATE_BACKEND} instead.
*/
@Deprecated
public static final String STATE_BACKEND = "state.backend";

// ----------------------------- Miscellaneous ----------------------------
Expand All @@ -756,7 +759,11 @@ public final class ConfigConstants {
*/
@Deprecated
public static final String FLINK_BASE_DIR_PATH_KEY = "flink.base.dir.path";


/**
* @deprecated Use {@link CoreOptions#FLINK_JVM_OPTIONS} instead.
*/
@Deprecated
public static final String FLINK_JVM_OPTIONS = "env.java.opts";

// --------------------------- High Availability --------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;

@PublicEvolving
public class CoreOptions {

/**
*
*/
public static final ConfigOption<String> FLINK_JVM_OPTIONS = ConfigOptions
.key("env.java.opts")
.defaultValue("");

public static final ConfigOption<Integer> DEFAULT_PARALLELISM_KEY = ConfigOptions
.key("parallelism.default")
.defaultValue(-1);

public static final ConfigOption<String> STATE_BACKEND = ConfigOptions
.key("state.backend")
.noDefaultValue();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -230,7 +230,7 @@ public void testBlobServerRecovery() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "ZOOKEEPER");
config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobRecoveryITCase.testBlobServerRecovery(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.cli.Option;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
Expand Down Expand Up @@ -358,7 +359,7 @@ public static String getTaskManagerShellCommand(
.put("jvmmem", "-Xms" + tmParams.taskManagerHeapSizeMB() + "m " +
"-Xmx" + tmParams.taskManagerHeapSizeMB() + "m " +
"-XX:MaxDirectMemorySize=" + tmParams.taskManagerDirectMemoryLimitMB() + "m");
String javaOpts = flinkConfig.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
String javaOpts = flinkConfig.getString(CoreOptions.FLINK_JVM_OPTIONS);
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if(hasKrb5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.runtime.blob;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class BlobRecoveryITCase {
public void testBlobServerRecovery() throws Exception {
Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getPath());

testBlobServerRecovery(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.junit.Test;

import java.util.HashMap;
Expand Down Expand Up @@ -209,7 +210,7 @@ public void testGetTaskManagerShellCommand() {
true, true, true, this.getClass()));

// logback + log4j, with/out krb5, different JVM opts
cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
assertEquals(
java + " " + jvmmem +
" " + jvmOpts +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.flink.runtime.execution.librarycache;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.blob.BlobClient;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void testRecoveryRegisterAndDownload() throws Exception {

Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.getRoot().getAbsolutePath());

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
Expand Down Expand Up @@ -80,7 +81,7 @@ public static Configuration configureZooKeeperHA(
config.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, connTimeout);

// File system state backend
config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
Expand Down Expand Up @@ -694,7 +694,7 @@ private AbstractStateBackend createStateBackend() throws Exception {
} else {
// see if we have a backend specified in the configuration
Configuration flinkConfig = getEnvironment().getTaskManagerInfo().getConfiguration();
String backendName = flinkConfig.getString(ConfigConstants.STATE_BACKEND, null);
String backendName = flinkConfig.getString(CoreOptions.STATE_BACKEND, null);

if (backendName == null) {
LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
Expand Down Expand Up @@ -731,7 +731,7 @@ private AbstractStateBackend createStateBackend() throws Exception {
throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
} catch (ClassCastException e) {
throw new IllegalConfigurationException("The class configured under '" +
ConfigConstants.STATE_BACKEND + "' is not a valid state backend factory (" +
CoreOptions.STATE_BACKEND.key() + "' is not a valid state backend factory (" +
backendName + ')');
} catch (Throwable t) {
throw new IllegalConfigurationException("Cannot create configured state backend", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.StreamStateHandle;
Expand Down Expand Up @@ -1074,7 +1074,7 @@ public void process(Integer key,

private static StreamTask<?, ?> createMockTask() {
Configuration configuration = new Configuration();
configuration.setString(ConfigConstants.STATE_BACKEND, "jobmanager");
configuration.setString(CoreOptions.STATE_BACKEND, "jobmanager");

StreamTask<?, ?> task = mock(StreamTask.class);
when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
Expand Down Expand Up @@ -192,7 +192,7 @@ public void testEarlyCanceling() throws Exception {
@Test
public void testStateBackendLoadingAndClosing() throws Exception {
Configuration taskManagerConfig = new Configuration();
taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());

StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
Expand All @@ -216,7 +216,7 @@ public void testStateBackendLoadingAndClosing() throws Exception {
@Test
public void testStateBackendClosingOnFailure() throws Exception {
Configuration taskManagerConfig = new Configuration();
taskManagerConfig.setString(ConfigConstants.STATE_BACKEND, MockStateBackend.class.getName());
taskManagerConfig.setString(CoreOptions.STATE_BACKEND, MockStateBackend.class.getName());

StreamConfig cfg = new StreamConfig(new Configuration());
cfg.setStreamOperator(new StreamSource<>(new MockSourceFunction()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.instance.ActorGateway;
Expand Down Expand Up @@ -105,7 +106,7 @@ public static void setup() throws Exception {
final File checkpointDir = temporaryFolder.newFolder();
final File savepointDir = temporaryFolder.newFolder();

config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointDir.toURI().toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.SubtaskState;
Expand Down Expand Up @@ -168,7 +169,7 @@ public void testTriggerSavepointAndResume() throws Exception {
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created temporary savepoint directory: " + savepointDir + ".");

config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
Expand Down Expand Up @@ -701,7 +702,7 @@ public Integer map(Integer value) throws Exception {
fail("Test setup failed: failed to create temporary directories.");
}

config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand Down Expand Up @@ -90,7 +91,7 @@ public void setup() throws Exception {
LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
LOG.info("Created savepoint directory: " + savepointDir + ".");

config.setString(ConfigConstants.STATE_BACKEND, "memory");
config.setString(CoreOptions.STATE_BACKEND, "memory");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
config.setString(FsStateBackendFactory.MEMORY_THRESHOLD_CONF_KEY, "0");
config.setString("state.savepoints.dir", savepointDir.toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
Expand Down Expand Up @@ -97,7 +98,7 @@ public static void setUp() throws Exception {
parallelism = 4;

// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening again.
config.setString(ConfigConstants.STATE_BACKEND, "filesystem");
config.setString(CoreOptions.STATE_BACKEND, "filesystem");
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -118,7 +119,7 @@ public void testMultipleAMKill() throws Exception {
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.loadConfiguration());
flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
"@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM" +
"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.SecurityOptions;
Expand Down Expand Up @@ -1232,8 +1233,7 @@ protected ContainerLaunchContext setupApplicationMasterContainer(boolean hasLogb
// ------------------ Prepare Application Master Container ------------------------------

// respect custom JVM options in the YAML file
String javaOpts =
flinkConfiguration.getString(ConfigConstants.FLINK_JVM_OPTIONS, "");
String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
//applicable only for YarnMiniCluster secure test run
//krb5.conf file will be available as local resource in JM/TM container
if (hasKrb5) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -203,7 +204,7 @@ public void testSetupApplicationMasterContainer() {
.getCommands().get(0));

// logback + log4j, with/out krb5, different JVM opts
cfg.setString(ConfigConstants.FLINK_JVM_OPTIONS, jvmOpts);
cfg.setString(CoreOptions.FLINK_JVM_OPTIONS, jvmOpts);
assertEquals(
java + " " + jvmmem +
" " + jvmOpts +
Expand Down

0 comments on commit a404796

Please sign in to comment.