Skip to content

Commit

Permalink
[FLINK-2977] Using reflection to load HBase Kerberos tokens
Browse files Browse the repository at this point in the history
This closes apache#1342
  • Loading branch information
nielsbasjes authored and rmetzger committed Nov 16, 2015
1 parent feb6994 commit aba3779
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
10 changes: 9 additions & 1 deletion flink-dist/src/main/flink-bin/bin/config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,15 @@ if [ -n "$HADOOP_HOME" ]; then
fi
fi

INTERNAL_HADOOP_CLASSPATHS="$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR"
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"

if [ -n "${HBASE_CONF_DIR}" ]; then
# Setup the HBase classpath.
INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:`hbase classpath`"

# We add the HBASE_CONF_DIR last to ensure the right config directory is used.
INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
fi

# Auxilliary function which extracts the name of host from a line which
# also potentialy includes topology information and the taskManager type
Expand Down
55 changes: 53 additions & 2 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;

import org.apache.hadoop.mapreduce.security.TokenCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
Expand All @@ -34,6 +34,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
Expand Down Expand Up @@ -116,6 +117,8 @@ public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths
Credentials credentials = new Credentials();
// for HDFS
TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
// for HBase
obtainTokenForHBase(credentials, conf);
// for user
UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();

Expand All @@ -135,7 +138,55 @@ public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(securityTokens);
}


/**
* Obtain Kerberos security token for HBase.
*/
private static void obtainTokenForHBase(Credentials credentials, Configuration conf) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Attempting to obtain Kerberos security token for HBase");
try {
// ----
// Intended call: HBaseConfiguration.addHbaseResources(conf);
Class
.forName("org.apache.hadoop.hbase.HBaseConfiguration")
.getMethod("addHbaseResources", Configuration.class )
.invoke(null, conf);
// ----

LOG.info("HBase security setting: {}", conf.get("hbase.security.authentication"));

if (!"kerberos".equals(conf.get("hbase.security.authentication"))) {
LOG.info("HBase has not been configured to use Kerberos.");
return;
}

LOG.info("Obtaining Kerberos security token for HBase");
// ----
// Intended call: Token<AuthenticationTokenIdentifier> token = TokenUtil.obtainToken(conf);
Token<?> token = (Token<?>) Class
.forName("org.apache.hadoop.hbase.security.token.TokenUtil")
.getMethod("obtainToken", Configuration.class)
.invoke(null, conf);
// ----

if (token == null) {
LOG.error("No Kerberos security token for HBase available");
return;
}

credentials.addToken(token.getService(), token);
LOG.info("Added HBase Kerberos security token to credentials.");
} catch ( ClassNotFoundException
| NoSuchMethodException
| IllegalAccessException
| InvocationTargetException e) {
LOG.info("HBase is not available (not packaged with this application): {} : \"{}\".",
e.getClass().getSimpleName(), e.getMessage());
}
}
}

public static void logFilesInCurrentDirectory(final Logger logger) {
new File(".").list(new FilenameFilter() {

Expand Down

0 comments on commit aba3779

Please sign in to comment.