Skip to content

Commit

Permalink
[FLINK-8350][config] replace "taskmanager.tmp.dirs" with "io.tmp.dirs"
Browse files Browse the repository at this point in the history
This replaces "taskmanager.tmp.dirs" with the new "io.tmp.dirs"
configuration parameter to define temporary directories in (cluster)
environments for all components, i.e. JobManager, JobMaster, Dispatcher,...

Please note that this (kind of internal and thus undocumented) configuration
parameter is set by our YARN and Mesos integrations.

[FLINK-8350][cluster] initialise "io.tmp.dirs" for JobManager as well

In a YARN and Mesos environment, this initialises Flink's temporary directory
configuration with YARN/Mesos application-specific paths for JobManager,
JobMaster, Dispatcher, etc. components as well (Mesos integration actually still
lacks a proper integration of this, but once done, the new hooks fall in place
just fine).
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Jan 18, 2018
1 parent 46ed5e3 commit 76abcaa
Show file tree
Hide file tree
Showing 21 changed files with 168 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ public final class ConfigConstants {
/**
* The config parameter defining the directories for temporary files, separated by
* ",", "|", or the system's {@link java.io.File#pathSeparator}.
*
* @deprecated Use {@link CoreOptions#TMP_DIRS} instead
*/
@Deprecated
public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dirs";

/**
Expand Down Expand Up @@ -1338,7 +1341,10 @@ public final class ConfigConstants {

/**
* The default directory for temporary files of the task manager.
*
* @deprecated {@link CoreOptions#TMP_DIRS} provides the default value now
*/
@Deprecated
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import java.io.File;

/**
* Utility class for {@link Configuration} related helper functions.
*/
public class ConfigurationUtils {

/**
* Extracts the task manager directories for temporary files as defined by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
*
* @param configuration configuration object
* @return array of configured directories (in order)
*/
public static String[] parseTempDirectories(Configuration configuration) {
return configuration.getString(CoreOptions.TMP_DIRS).split(",|" + File.pathSeparator);
}

// Make sure that we cannot instantiate this class
private ConfigurationUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.PublicEvolving;

import static org.apache.flink.configuration.ConfigOptions.key;

/**
* The set of configuration options for core parameters.
*/
Expand Down Expand Up @@ -96,6 +98,19 @@ public class CoreOptions {
.key("env.java.opts.taskmanager")
.defaultValue("");

// ------------------------------------------------------------------------
// generic io
// ------------------------------------------------------------------------

/**
* The config parameter defining the directories for temporary files, separated by
* ",", "|", or the system's {@link java.io.File#pathSeparator}.
*/
public static final ConfigOption<String> TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");

// ------------------------------------------------------------------------
// program
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.mesos.configuration.MesosOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
import org.apache.flink.mesos.util.MesosConfiguration;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
Expand All @@ -36,6 +39,7 @@
import org.slf4j.Logger;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;
Expand Down Expand Up @@ -152,4 +156,32 @@ public static void applyOverlays(
overlay.configure(containerSpec);
}

/**
* Loads the global configuration, adds the given dynamic properties configuration, and sets
* the temp directory paths.
*
* @param dynamicProperties dynamic properties to integrate
* @param log logger instance
* @return the loaded and adapted global configuration
*/
public static Configuration loadConfiguration(Configuration dynamicProperties, Logger log) {
Configuration configuration =
GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);

// read the environment variables
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);

// configure local directory
if (configuration.contains(CoreOptions.TMP_DIRS)) {
log.info("Overriding Mesos' temporary file directories with those " +
"specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS));
}
else if (tmpDirs != null) {
log.info("Setting directories for temporary files to: {}", tmpDirs);
configuration.setString(CoreOptions.TMP_DIRS, tmpDirs);
}

return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
Expand Down Expand Up @@ -218,7 +217,7 @@ public static void main(String[] args) {
}

Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
Configuration configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);

MesosJobClusterEntrypoint clusterEntrypoint = new MesosJobClusterEntrypoint(configuration, dynamicProperties);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager;
import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters;
Expand Down Expand Up @@ -192,10 +191,11 @@ public static void main(String[] args) {
}

Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
Configuration configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
Configuration configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);

MesosSessionClusterEntrypoint clusterEntrypoint = new MesosSessionClusterEntrypoint(configuration, dynamicProperties);

clusterEntrypoint.startCluster();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
package org.apache.flink.mesos.entrypoint;

import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
Expand Down Expand Up @@ -76,28 +74,15 @@ public static void main(String[] args) throws Exception {
Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
LOG.debug("Mesos dynamic properties: {}", dynamicProperties);

configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
}
catch (Throwable t) {
LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
System.exit(INIT_ERROR_EXIT_CODE);
return;
}

// read the environment variables
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);

// configure local directory
String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
if (flinkTempDirs != null) {
LOG.info("Overriding Mesos temporary file directories with those " +
"specified in the Flink config: {}", flinkTempDirs);
}
else if (tmpDirs != null) {
LOG.info("Setting directories for temporary files to: {}", tmpDirs);
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
}

// configure the filesystems
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.fs.FileSystem;
Expand Down Expand Up @@ -157,7 +156,7 @@ protected int run(final String[] args) {
CommandLine cmd = parser.parse(ALL_OPTIONS, args);

final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
final Configuration config = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
final Configuration config = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);

// configure the filesystems
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
package org.apache.flink.mesos.runtime.clusterframework;

import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.mesos.entrypoint.MesosEntrypointUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.security.SecurityConfiguration;
Expand Down Expand Up @@ -76,28 +75,15 @@ public static void runTaskManager(String[] args, final Class<? extends TaskManag
final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd);
LOG.debug("Mesos dynamic properties: {}", dynamicProperties);

configuration = GlobalConfiguration.loadConfigurationWithDynamicProperties(dynamicProperties);
configuration = MesosEntrypointUtils.loadConfiguration(dynamicProperties, LOG);
}
catch (Throwable t) {
LOG.error("Failed to load the TaskManager configuration and dynamic properties.", t);
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
return;
}

// read the environment variables
final Map<String, String> envs = System.getenv();
final String tmpDirs = envs.get(MesosConfigKeys.ENV_FLINK_TMP_DIR);

// configure local directory
String flinkTempDirs = configuration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, null);
if (flinkTempDirs != null) {
LOG.info("Overriding Mesos temporary file directories with those " +
"specified in the Flink config: {}", flinkTempDirs);
}
else if (tmpDirs != null) {
LOG.info("Setting directories for temporary files to: {}", tmpDirs);
configuration.setString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY, tmpDirs);
}

// configure the filesystems
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
import org.apache.flink.util.StringUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -135,10 +134,9 @@ private static BlobStoreService createFileSystemBlobStore(Configuration configur
/**
* Creates a local storage directory for a blob service under the configuration parameter given
* by {@link BlobServerOptions#STORAGE_DIRECTORY}. If this is <tt>null</tt> or empty, we will
* fall back to the TaskManager temp directories (given by
* {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}; which in turn falls back to
* {@link ConfigConstants#DEFAULT_TASK_MANAGER_TMP_PATH} currently set to
* <tt>java.io.tmpdir</tt>) and choose one among them at random.
* fall back to Flink's temp directories (given by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}) and choose one among them at
* random.
*
* @param config
* Flink configuration
Expand All @@ -154,7 +152,7 @@ static File initLocalStorageDirectory(Configuration config) throws IOException {

File baseDir;
if (StringUtils.isNullOrWhitespaceOnly(basePath)) {
final String[] tmpDirPaths = TaskManagerServicesConfiguration.parseTempDirectories(config);
final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(config);
baseDir = new File(tmpDirPaths[RANDOM.nextInt(tmpDirPaths.length)]);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
Expand Down Expand Up @@ -147,7 +148,7 @@ public static TaskManagerConfiguration fromConfiguration(Configuration configura
numberSlots = 1;
}

final String[] tmpDirPaths = TaskManagerServicesConfiguration.parseTempDirectories(configuration);
final String[] tmpDirPaths = ConfigurationUtils.parseTempDirectories(configuration);

final Time timeout;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.configuration.TaskManagerOptions;
Expand All @@ -35,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
Expand Down Expand Up @@ -184,7 +184,7 @@ public static TaskManagerServicesConfiguration fromConfiguration(
slots = 1;
}

final String[] tmpDirs = parseTempDirectories(configuration);
final String[] tmpDirs = ConfigurationUtils.parseTempDirectories(configuration);

final NetworkEnvironmentConfiguration networkConfig = parseNetworkEnvironmentConfiguration(
configuration,
Expand Down Expand Up @@ -434,19 +434,6 @@ private static QueryableStateConfiguration parseQueryableStateConfiguration(Conf
numStateServerQueryThreads);
}

/**
* Extracts the task manager directories for temporary files as defined by
* {@link ConfigConstants#TASK_MANAGER_TMP_DIR_KEY}.
*
* @param configuration configuration object
* @return array of configured directories (in order)
*/
public static String[] parseTempDirectories(Configuration configuration) {
return configuration.getString(
ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
}

/**
* Validates a condition for a config parameter and displays a standard exception, if the
* the condition does not hold.
Expand Down
Loading

0 comments on commit 76abcaa

Please sign in to comment.