Skip to content

Commit

Permalink
[FLINK-20927][yarn] Update configuration option in YarnConfigOptions …
Browse files Browse the repository at this point in the history
…class
  • Loading branch information
yuruguo authored and rmetzger committed Jan 15, 2021
1 parent bfb8d68 commit 49f8af0
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 26 deletions.
6 changes: 3 additions & 3 deletions docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@
</tr>
<tr>
<td><h5>yarn.per-job-cluster.include-user-jar</h5></td>
<td style="word-wrap: break-word;">"ORDER"</td>
<td>String</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). "DISABLED" means the user-jars are excluded from the system class path.</td>
<td style="word-wrap: break-word;">ORDER</td>
<td><p>Enum</p>Possible values: [DISABLED, FIRST, LAST, ORDER]</td>
<td>Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning (FIRST), at the end (LAST), or be positioned based on their name (ORDER). DISABLED means the user-jars are excluded from the system class path.</td>
</tr>
<tr>
<td><h5>yarn.properties-file.location</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private Configuration createDefaultConfiguration(
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.set(DeploymentOptions.TARGET, YarnDeploymentTarget.APPLICATION.getName());
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString());
configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);
configuration.set(PipelineOptions.JARS, Collections.singletonList(userJar.toString()));

return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ private Configuration getDefaultConfiguration() {
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.setString(
CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED.toString());
configuration.set(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED);

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private Configuration createDefaultConfiguration(
configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(768));
configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1g"));
configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
configuration.setString(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion.toString());
configuration.set(CLASSPATH_INCLUDE_USER_JAR, userJarInclusion);

return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,9 +1727,7 @@ ContainerLaunchContext setupApplicationMasterContainer(

private static YarnConfigOptions.UserJarInclusion getUserJarInclusionMode(
org.apache.flink.configuration.Configuration config) {
return config.getEnum(
YarnConfigOptions.UserJarInclusion.class,
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
return config.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
}

private static boolean isUsrLibDirIncludedInShipFiles(List<File> shipFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion.DISABLED;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion.FIRST;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion.LAST;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.UserJarInclusion.ORDER;

/**
* This class holds configuration constants used by Flink's YARN runners.
Expand All @@ -40,28 +44,33 @@ public class YarnConfigOptions {
/** The vcores used by YARN application master. */
public static final ConfigOption<Integer> APP_MASTER_VCORES =
key("yarn.appmaster.vcores")
.intType()
.defaultValue(1)
.withDescription(
"The number of virtual cores (vcores) used by YARN application master.");

/**
* Defines whether user-jars are included in the system class path for per-job-clusters as well
* as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the
* end ("LAST"), or be positioned based on their name ("ORDER"). "DISABLED" means the user-jars
* are excluded from the system class path.
* as their positioning in the path. They can be positioned at the beginning (FIRST), at the end
* (LAST), or be positioned based on their name (ORDER). DISABLED means the user-jars are
* excluded from the system class path.
*/
public static final ConfigOption<String> CLASSPATH_INCLUDE_USER_JAR =
public static final ConfigOption<UserJarInclusion> CLASSPATH_INCLUDE_USER_JAR =
key("yarn.per-job-cluster.include-user-jar")
.defaultValue("ORDER")
.enumType(UserJarInclusion.class)
.defaultValue(ORDER)
.withDescription(
"Defines whether user-jars are included in the system class path for per-job-clusters as"
+ " well as their positioning in the path. They can be positioned at the beginning (\"FIRST\"), at the"
+ " end (\"LAST\"), or be positioned based on their name (\"ORDER\"). \"DISABLED\" means the user-jars"
+ " are excluded from the system class path.");
String.format(
"Defines whether user-jars are included in the system class path for per-job-clusters as"
+ " well as their positioning in the path. They can be positioned at the beginning (%s), at the"
+ " end (%s), or be positioned based on their name (%s). %s means the user-jars"
+ " are excluded from the system class path.",
FIRST.name(), LAST.name(), ORDER.name(), DISABLED.name()));

/** The vcores exposed by YARN. */
public static final ConfigOption<Integer> VCORES =
key("yarn.containers.vcores")
.intType()
.defaultValue(-1)
.withDescription(
Description.builder()
Expand Down Expand Up @@ -101,6 +110,7 @@ public class YarnConfigOptions {
/** The config parameter defining the attemptFailuresValidityInterval of Yarn application. */
public static final ConfigOption<Long> APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL =
key("yarn.application-attempt-failures-validity-interval")
.longType()
.defaultValue(10000L)
.withDescription(
Description.builder()
Expand All @@ -117,6 +127,7 @@ public class YarnConfigOptions {
/** The heartbeat interval between the Application Master and the YARN Resource Manager. */
public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
key("yarn.heartbeat.interval")
.intType()
.defaultValue(5)
.withDeprecatedKeys("yarn.heartbeat-delay")
.withDescription(
Expand All @@ -128,6 +139,7 @@ public class YarnConfigOptions {
*/
public static final ConfigOption<Integer> CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS =
key("yarn.heartbeat.container-request-interval")
.intType()
.defaultValue(500)
.withDescription(
new Description.DescriptionBuilder()
Expand All @@ -153,6 +165,7 @@ public class YarnConfigOptions {
*/
public static final ConfigOption<String> PROPERTIES_FILE_LOCATION =
key("yarn.properties-file.location")
.stringType()
.noDefaultValue()
.withDescription(
"When a Flink job is submitted to YARN, the JobManager’s host and the number of available"
Expand All @@ -168,6 +181,7 @@ public class YarnConfigOptions {
*/
public static final ConfigOption<String> APPLICATION_MASTER_PORT =
key("yarn.application-master.port")
.stringType()
.defaultValue("0")
.withDescription(
"With this configuration option, users can specify a port, a range of ports or a list of ports"
Expand All @@ -189,6 +203,7 @@ public class YarnConfigOptions {
*/
public static final ConfigOption<Integer> APPLICATION_PRIORITY =
key("yarn.application.priority")
.intType()
.defaultValue(-1)
.withDescription(
"A non-negative integer indicating the priority for submitting a Flink YARN application. It"
Expand All @@ -215,6 +230,7 @@ public class YarnConfigOptions {
/** A comma-separated list of strings to use as YARN application tags. */
public static final ConfigOption<String> APPLICATION_TAGS =
key("yarn.tags")
.stringType()
.defaultValue("")
.withDescription(
"A comma-separated list of tags to apply to the Flink YARN application.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ public static void logYarnEnvironmentInformation(Map<String, String> env, Logger

public static Optional<File> getUsrLibDir(final Configuration configuration) {
final YarnConfigOptions.UserJarInclusion userJarInclusion =
configuration.getEnum(
YarnConfigOptions.UserJarInclusion.class,
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
configuration.get(YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR);
final Optional<File> userLibDir = tryFindUserLibDirectory();

checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,7 @@ public void testEnvironmentEmptyPluginsShipping() {
public void testDisableSystemClassPathIncludeUserJarAndWithIllegalShipDirectoryName()
throws IOException {
final Configuration configuration = new Configuration();
configuration.setString(
CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED.toString());
configuration.set(CLASSPATH_INCLUDE_USER_JAR, YarnConfigOptions.UserJarInclusion.DISABLED);

final YarnClusterDescriptor yarnClusterDescriptor =
createYarnClusterDescriptor(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public class YarnJobClusterEntrypointTest {
public void testCreateDispatcherResourceManagerComponentFactoryFailIfUsrLibDirDoesNotExist()
throws IOException {
final Configuration configuration = new Configuration();
configuration.setString(
configuration.set(
YarnConfigOptions.CLASSPATH_INCLUDE_USER_JAR,
YarnConfigOptions.UserJarInclusion.DISABLED.toString());
YarnConfigOptions.UserJarInclusion.DISABLED);
final YarnJobClusterEntrypoint yarnJobClusterEntrypoint =
new YarnJobClusterEntrypoint(configuration);
try {
Expand Down

0 comments on commit 49f8af0

Please sign in to comment.