Skip to content

Commit

Permalink
[FLINK-11781][yarn] Remove "DISABLED" as possible value for yarn.per-…
Browse files Browse the repository at this point in the history
…job-cluster.include-user-jar

Remove this feature because it is broken since Flink 1.5

This closes apache#7883.
  • Loading branch information
GJL committed Mar 6, 2019
1 parent 9ac14a6 commit 6f84008
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<tr>
<td><h5>yarn.per-job-cluster.include-user-jar</h5></td>
<td style="word-wrap: break-word;">"ORDER"</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the jar to be included in the user class path instead.</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER").</td>
</tr>
<tr>
<td><h5>yarn.properties-file.location</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Lightweight configuration object which stores key/value pairs.
*/
Expand Down Expand Up @@ -608,6 +611,33 @@ public String getValue(ConfigOption<?> configOption) {
return o == null ? null : o.toString();
}

/**
* Returns the value associated with the given config option as an enum.
*
* @param enumClass The return enum class
* @param configOption The configuration option
* @throws IllegalArgumentException If the string associated with the given config option cannot
* be parsed as a value of the provided enum class.
*/
@PublicEvolving
public <T extends Enum<T>> T getEnum(
final Class<T> enumClass,
final ConfigOption<String> configOption) {
checkNotNull(enumClass, "enumClass must not be null");
checkNotNull(configOption, "configOption must not be null");

final String configValue = getString(configOption);
try {
return Enum.valueOf(enumClass, configValue.toUpperCase(Locale.ROOT));
} catch (final IllegalArgumentException | NullPointerException e) {
final String errorMessage = String.format("Value for config option %s must be one of %s (was %s)",
configOption.key(),
Arrays.toString(enumClass.getEnumConstants()),
configValue);
throw new IllegalArgumentException(errorMessage, e);
}
}

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ public String getValue(ConfigOption<?> configOption) {
return this.backingConfig.getValue(prefixOption(configOption, prefix));
}

@Override
public <T extends Enum<T>> T getEnum(final Class<T> enumClass, final ConfigOption<String> configOption) {
return this.backingConfig.getEnum(enumClass, prefixOption(configOption, prefix));
}

@Override
public void addAllToProperties(Properties props) {
// only add keys with our prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@

import org.junit.Test;

import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -403,4 +405,56 @@ public void testRemove(){
assertEquals("Wrong expectation about size", cfg.keySet().size(), 0);
assertFalse("Expected 'unexistedOption' is not removed", cfg.removeConfig(unexistedOption));
}

@Test
public void testShouldParseValidStringToEnum() {
final ConfigOption<String> configOption = createStringConfigOption();

final Configuration configuration = new Configuration();
configuration.setString(configOption.key(), TestEnum.VALUE1.toString());

final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption);
assertEquals(TestEnum.VALUE1, parsedEnumValue);
}

@Test
public void testShouldParseValidStringToEnumIgnoringCase() {
final ConfigOption<String> configOption = createStringConfigOption();

final Configuration configuration = new Configuration();
configuration.setString(configOption.key(), TestEnum.VALUE1.toString().toLowerCase());

final TestEnum parsedEnumValue = configuration.getEnum(TestEnum.class, configOption);
assertEquals(TestEnum.VALUE1, parsedEnumValue);
}

@Test
public void testThrowsExceptionIfTryingToParseInvalidStringForEnum() {
final ConfigOption<String> configOption = createStringConfigOption();

final Configuration configuration = new Configuration();
final String invalidValueForTestEnum = "InvalidValueForTestEnum";
configuration.setString(configOption.key(), invalidValueForTestEnum);

try {
configuration.getEnum(TestEnum.class, configOption);
fail("Expected exception not thrown");
} catch (IllegalArgumentException e) {
final String expectedMessage = "Value for config option " +
configOption.key() + " must be one of [VALUE1, VALUE2] (was " +
invalidValueForTestEnum + ")";
assertThat(e.getMessage(), containsString(expectedMessage));
}
}

enum TestEnum {
VALUE1,
VALUE2
}

private static ConfigOption<String> createStringConfigOption() {
return ConfigOptions
.key("test-string-key")
.noDefaultValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -763,19 +763,14 @@ public ApplicationReport startAppMaster(
localResources,
envShipFileList);

List<String> userClassPaths;
if (userJarInclusion != YarnConfigOptions.UserJarInclusion.DISABLED) {
userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
} else {
userClassPaths = Collections.emptyList();
}
final List<String> userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);

if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
Expand Down Expand Up @@ -1602,15 +1597,16 @@ protected ContainerLaunchContext setupApplicationMasterContainer(
}

private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(org.apache.flink.configuration.Configuration config) {
String configuredUserJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
try {
return YarnConfigOptions.UserJarInclusion.valueOf(configuredUserJarInclusion.toUpperCase());
} catch (IllegalArgumentException e) {
LOG.warn("Configuration parameter {} was configured with an invalid value {}. Falling back to default ({}).",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key(),
configuredUserJarInclusion,
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
return YarnConfigOptions.UserJarInclusion.valueOf(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.defaultValue());
throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(config);

return config.getEnum(YarnConfigOptions.UserJarInclusion.class, YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
}

private static void throwIfUserTriesToDisableUserJarInclusionInSystemClassPath(final Configuration config) {
final String userJarInclusion = config.getString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
if ("DISABLED".equalsIgnoreCase(userJarInclusion)) {
throw new IllegalArgumentException(String.format("Config option %s cannot be set to DISABLED anymore (see FLINK-11781)",
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR.key()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public class YarnConfigOptions {
.defaultValue("ORDER")
.withDescription("Defines whether user-jars are included in the system class path for per-job-clusters as" +
" well as their positioning in the path. They can be positioned at the beginning (\"FIRST\"), at the" +
" end (\"LAST\"), or be positioned based on their name (\"ORDER\"). Setting this parameter to" +
" \"DISABLED\" causes the jar to be included in the user class path instead.");
" end (\"LAST\"), or be positioned based on their name (\"ORDER\").");

/**
* The vcores exposed by YARN.
Expand Down Expand Up @@ -156,7 +155,6 @@ private YarnConfigOptions() {}

/** @see YarnConfigOptions#CLASSPATH_INCLUDE_USER_JAR */
public enum UserJarInclusion {
DISABLED,
FIRST,
LAST,
ORDER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import java.util.Set;

import static junit.framework.TestCase.assertTrue;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

/**
Expand Down Expand Up @@ -89,6 +91,27 @@ public static void tearDownClass() {
yarnClient.stop();
}

/**
* @see <a href="https://issues.apache.org/jira/browse/FLINK-11781">FLINK-11781</a>
*/
@Test
public void testThrowsExceptionIfUserTriesToDisableUserJarInclusionInSystemClassPath() {
final Configuration configuration = new Configuration();
configuration.setString(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR, "DISABLED");

try {
new YarnClusterDescriptor(
configuration,
yarnConfiguration,
temporaryFolder.getRoot().getAbsolutePath(),
yarnClient,
true);
fail("Expected exception not thrown");
} catch (final IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("cannot be set to DISABLED anymore"));
}
}

@Test
public void testFailIfTaskSlotsHigherThanMaxVcores() throws ClusterDeploymentException {
final Configuration flinkConfiguration = new Configuration();
Expand Down

0 comments on commit 6f84008

Please sign in to comment.