Skip to content

Commit

Permalink
[FLINK-15561] Unify Kerberos credentials checking
Browse files Browse the repository at this point in the history
Before, we had duplicate code in HadoopModule and YarnClusterDescriptor,
now we use the same code for both. That code is refactored to a util.
  • Loading branch information
walterddr authored and aljoscha committed Feb 11, 2020
1 parent 3766daa commit 57c3396
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ public static Configuration getHadoopConfiguration(org.apache.flink.configuratio
return result;
}

public static boolean isCredentialsConfigured(UserGroupInformation ugi, boolean useTicketCache) throws Exception {
if (UserGroupInformation.isSecurityEnabled()) {
if (useTicketCache && !ugi.hasKerberosCredentials()) {
// a delegation token is an adequate substitute in most cases
if (!HadoopUtils.hasHDFSDelegationToken()) {
LOG.error("Hadoop security is enabled, but current login user has neither Kerberos credentials " +
"nor delegation tokens!");
return false;
} else {
LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials, " +
"use delegation token instead. Flink application will terminate after token expires.");
}
}
}
return true;
}

/**
* Indicates whether the current user has an HDFS delegation token.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,10 @@ public void install() throws SecurityInstallException {
loginUser = UserGroupInformation.getLoginUser();
}

if (UserGroupInformation.isSecurityEnabled()) {
// note: UGI::hasKerberosCredentials inaccurately reports false
// for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
// so we check only in ticket cache scenario.
if (securityConfig.useTicketCache() && !loginUser.hasKerberosCredentials()) {
// a delegation token is an adequate substitute in most cases
if (!HadoopUtils.hasHDFSDelegationToken()) {
LOG.warn("Hadoop security is enabled but current login user does not have Kerberos credentials");
}
}
}
boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
loginUser, securityConfig.useTicketCache());

LOG.info("Hadoop user set to {}", loginUser);
LOG.info("Hadoop user set to {}, credentials check status: {}", loginUser, isCredentialsConfigured);

} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
Expand Down Expand Up @@ -432,12 +433,11 @@ private ClusterClientProvider<ApplicationId> deployInternal(
// so we check only in ticket cache scenario.
boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
&& useTicketCache && !loginUser.hasKerberosCredentials()) {
LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
UserGroupInformation.getCurrentUser(), useTicketCache);
if (!isCredentialsConfigured) {
throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
"does not have Kerberos credentials");
"does not have Kerberos credentials or delegation tokens!");
}
}

Expand Down

0 comments on commit 57c3396

Please sign in to comment.