diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html index a6b04b6aec532..40dfc09d3612b 100644 --- a/docs/_includes/generated/yarn_config_configuration.html +++ b/docs/_includes/generated/yarn_config_configuration.html @@ -7,6 +7,11 @@ + +
yarn.application-attempt-failures-validity-interval
+ 10000 + Time window in milliseconds which defines the number of application attempt failures when restarting the AM. Failures which fall outside of this window are not being considered. Set this value to -1 in order to count globally. See here for more information. +
yarn.application-attempts
(none) diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 20b5417bc589b..0f244961dc217 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -35,7 +35,6 @@ import org.apache.flink.configuration.SecurityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.plugin.PluginUtils; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; @@ -1283,7 +1282,10 @@ private void activateHighAvailabilitySupport(ApplicationSubmissionContext appCon ApplicationSubmissionContextReflector reflector = ApplicationSubmissionContextReflector.getInstance(); reflector.setKeepContainersAcrossApplicationAttempts(appContext, true); - reflector.setAttemptFailuresValidityInterval(appContext, AkkaUtils.getTimeout(flinkConfiguration).toMillis()); + + reflector.setAttemptFailuresValidityInterval( + appContext, + flinkConfiguration.getLong(YarnConfigOptions.APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL)); } private void setApplicationTags(final ApplicationSubmissionContext appContext) throws InvocationTargetException, diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java index 40a59297c8967..0f46a572256b3 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java @@ -101,6 +101,19 @@ public class YarnConfigOptions { " and the YARN Client will loose the connection. Also, the JobManager address will change and you’ll need" + " to set the JM host:port manually. It is recommended to leave this option at 1."); + /** + * The config parameter defining the attemptFailuresValidityInterval of Yarn application. + */ + public static final ConfigOption APPLICATION_ATTEMPT_FAILURE_VALIDITY_INTERVAL = + key("yarn.application-attempt-failures-validity-interval") + .defaultValue(10000L) + .withDescription(Description.builder() + .text("Time window in milliseconds which defines the number of application attempt failures when restarting the AM. " + + "Failures which fall outside of this window are not being considered. " + + "Set this value to -1 in order to count globally. " + + "See %s for more information.", link("https://hortonworks.com/blog/apache-hadoop-yarn-hdp-2-2-fault-tolerance-features-long-running-services/", "here")) + .build()); + /** * The heartbeat interval between the Application Master and the YARN Resource Manager. */