Skip to content

Commit

Permalink
[FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables t…
Browse files Browse the repository at this point in the history
…o ConfigurationUtils#getPrefixedKeyValuePairs

This closes apache#11458 .
  • Loading branch information
zhengcanbin committed Mar 25, 2020
1 parent f4b68c4 commit f80c384
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,21 @@ public static Map<String, String> parseTmResourceJvmParams(String jvmParamsStr)
return configs;
}

/**
* Extract and parse Flink configuration properties with a given name prefix and
* return the result as a Map.
*/
public static Map<String, String> getPrefixedKeyValuePairs(String prefix, Configuration configuration) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, String> entry: configuration.toMap().entrySet()) {
if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
String key = entry.getKey().substring(prefix.length());
result.put(key, entry.getValue());
}
}
return result;
}

// Make sure that we cannot instantiate this class
private ConfigurationUtils() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void testPropertiesToConfiguration() {
}

@Test
public void testHideSensitiveValues() {
public void testHideSensitiveValues() {
final Map<String, String> keyValuePairs = new HashMap<>();
keyValuePairs.put("foobar", "barfoo");
final String secretKey1 = "secret.key";
Expand All @@ -74,4 +74,22 @@ public void testHideSensitiveValues() {
assertThat(hiddenSensitiveValues, is(equalTo(expectedKeyValuePairs)));
}

@Test
public void testGetPrefixedKeyValuePairs() {
final String prefix = "test.prefix.";
final Map<String, String> expectedKeyValuePairs = new HashMap<String, String>() {
{
put("k1", "v1");
put("k2", "v2");
}
};

final Configuration configuration = new Configuration();
expectedKeyValuePairs.forEach((k, v) -> configuration.setString(prefix + k, v));

final Map<String, String> resultKeyValuePairs = ConfigurationUtils.getPrefixedKeyValuePairs(prefix, configuration);

assertThat(resultKeyValuePairs, is(equalTo(expectedKeyValuePairs)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.clusterframework.BootstrapTools;

import io.fabric8.kubernetes.api.model.LocalObjectReference;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -163,13 +162,4 @@ public Optional<String> getLocalHadoopConfigurationDirectory() {

return Optional.empty();
}

/**
* Extract container customized environment variable properties with a given name prefix.
* @param envPrefix the given property name prefix
* @return a Map storing with customized environment variable key/value pairs.
*/
protected Map<String, String> getPrefixedEnvironments(String envPrefix) {
return BootstrapTools.getEnvironmentVariables(envPrefix, flinkConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -63,7 +64,7 @@ public Map<String, String> getLabels() {

@Override
public Map<String, String> getEnvironments() {
return getPrefixedEnvironments(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX);
return ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, flinkConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,22 +713,4 @@ public static int calculateHeapSize(int memory, Configuration conf) {
}
return memory - heapLimit;
}

/**
* Method to extract environment variables from the flinkConfiguration based on the given prefix String.
*
* @param envPrefix Prefix for the environment variables key
* @param flinkConfiguration The Flink config to get the environment variable definition from
*/
public static Map<String, String> getEnvironmentVariables(String envPrefix, Configuration flinkConfiguration) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, String> entry: flinkConfiguration.toMap().entrySet()) {
if (entry.getKey().startsWith(envPrefix) && entry.getKey().length() > envPrefix.length()) {
// remove prefix
String key = entry.getKey().substring(envPrefix.length());
result.put(key, entry.getValue());
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.MemorySize;
Expand Down Expand Up @@ -531,7 +532,7 @@ public void testGetEnvironmentVariables() {
Configuration testConf = new Configuration();
testConf.setString("containerized.master.env.LD_LIBRARY_PATH", "/usr/lib/native");

Map<String, String> res = BootstrapTools.getEnvironmentVariables("containerized.master.env.", testConf);
Map<String, String> res = ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", testConf);

Assert.assertEquals(1, res.size());
Map.Entry<String, String> entry = res.entrySet().iterator().next();
Expand All @@ -544,7 +545,7 @@ public void testGetEnvironmentVariablesErroneous() {
Configuration testConf = new Configuration();
testConf.setString("containerized.master.env.", "/usr/lib/native");

Map<String, String> res = BootstrapTools.getEnvironmentVariables("containerized.master.env.", testConf);
Map<String, String> res = ConfigurationUtils.getPrefixedKeyValuePairs("containerized.master.env.", testConf);

Assert.assertEquals(0, res.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand Down Expand Up @@ -947,7 +948,7 @@ private ApplicationReport startAppMaster(
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
appMasterEnv.putAll(
BootstrapTools.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());

Expand Down

0 comments on commit f80c384

Please sign in to comment.