diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java index f9244d38998c8..e91cd992bee14 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java @@ -112,6 +112,23 @@ public static Configuration getHadoopConfiguration(org.apache.flink.configuratio return result; } + public static boolean isCredentialsConfigured(UserGroupInformation ugi, boolean useTicketCache) throws Exception { + if (UserGroupInformation.isSecurityEnabled()) { + if (useTicketCache && !ugi.hasKerberosCredentials()) { + // a delegation token is an adequate substitute in most cases + if (!HadoopUtils.hasHDFSDelegationToken()) { + LOG.error("Hadoop security is enabled, but current login user has neither Kerberos credentials " + + "nor delegation tokens!"); + return false; + } else { + LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials, " + + "use delegation token instead. Flink application will terminate after token expires."); + } + } + } + return true; + } + /** * Indicates whether the current user has an HDFS delegation token. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 1e045ad0f7a84..b9250e6610ada 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -137,19 +137,10 @@ public void install() throws SecurityInstallException { loginUser = UserGroupInformation.getLoginUser(); } - if (UserGroupInformation.isSecurityEnabled()) { - // note: UGI::hasKerberosCredentials inaccurately reports false - // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786), - // so we check only in ticket cache scenario. - if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) { - // a delegation token is an adequate substitute in most cases - if (!HadoopUtils.hasHDFSDelegationToken()) { - LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials"); - } - } - } + boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured( + loginUser, securityConfig.useTicketCache()); - LOG.info("Hadoop user set to {}", loginUser); + LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured); } catch (Throwable ex) { throw new SecurityInstallException("Unable to set the Hadoop login user", ex); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index 4a905ae30725d..31bd6a1371866 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.ShutdownHookUtil; @@ -432,12 +433,11 @@ private ClusterClientProvider deployInternal( // so we check only in ticket cache scenario. boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE); - UserGroupInformation loginUser = UserGroupInformation.getCurrentUser(); - if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS - && useTicketCache && !loginUser.hasKerberosCredentials()) { - LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials"); + boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured( + UserGroupInformation.getCurrentUser(), useTicketCache); + if (!isCredentialsConfigured) { throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " + - "does not have Kerberos credentials"); + "does not have Kerberos credentials or delegation tokens!"); } }