Skip to content

Commit

Permalink
[FLINK-2555] Properly pass security credentials in the Hadoop Input/O…
Browse files Browse the repository at this point in the history
…utput format wrappers

This is needed because the Hadoop IF/OF's are using Hadoop's FileSystem stack, which is using
the security credentials passed in the JobConf / Job class in the getSplits() method.

Note that access to secured Hadoop 1.x using Hadoop IF/OF's is not possible with this change.
This limitation is due to missing methods in the old APIs.

- Add some comments & change dependency scope to test
  • Loading branch information
rmetzger authored and uce committed Aug 26, 2015
1 parent 6e1de98 commit b264b01
Show file tree
Hide file tree
Showing 21 changed files with 285 additions and 59 deletions.
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ under the License.
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
1 change: 0 additions & 1 deletion flink-dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ under the License.
<!--Build uber jar-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
Expand Down
3 changes: 1 addition & 2 deletions flink-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ under the License.
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
Expand Down Expand Up @@ -79,7 +78,7 @@ under the License.
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.27</version>
<version>0.36</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,8 @@ public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
* given inputName is set on the given job.
*/
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);

org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
.hadoop.fs.Path(inputPath));
Expand All @@ -582,15 +582,15 @@ public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
* {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
*/
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapredInputFormat, key, value, inputPath, Job.getInstance());
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapredInputFormat, key, value, job);
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V>(mapreduceInputFormat, key, value, job);

return this.createInput(hadoopInputFormat);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.common;

import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.core.io.InputSplit;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

/**
* A common base for both "mapred" and "mapreduce" Hadoop input formats.
*/
public abstract class HadoopInputFormatCommonBase<T, SPITTYPE extends InputSplit> extends RichInputFormat<T, SPITTYPE> {
protected transient Credentials credentials;

protected HadoopInputFormatCommonBase(Credentials creds) {
this.credentials = creds;
}

protected void write(ObjectOutputStream out) throws IOException {
this.credentials.write(out);
}

public void read(ObjectInputStream in) throws IOException {
this.credentials = new Credentials();
credentials.readFields(in);
}

/**
* This method only exists because there is no UserGroupInformation.getCredentials() method
* in Hadoop 1.x
*
* Note that this method returns "null" in Hadoop 1.x environments.
*
* @param ugi The user information
* @return new credentials object from the user information. MAY RETURN NULL!
*/
public static Credentials getCredentialsFromUGI(UserGroupInformation ugi) {
Method getCredentialsMethod = null;
for(Method m: ugi.getClass().getMethods()) {
if(m.getName().equals("getCredentials")) {
getCredentialsMethod = m;
break;
}
}
if(getCredentialsMethod == null) {
return null;
} else {
try {
return (Credentials) getCredentialsMethod.invoke(ugi);
} catch (InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Unable to get credentials from UserGroupInformation. This is only supported by Hadoop 2.2.0+");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.hadoop.common;

import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.hadoop.security.Credentials;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

/**
* A common base for both "mapred" and "mapreduce" Hadoop output formats.
*
* The base is taking care of handling (serializing) security credentials.
*/
public abstract class HadoopOutputFormatCommonBase<T> extends RichOutputFormat<T> {
protected transient Credentials credentials;

protected HadoopOutputFormatCommonBase(Credentials creds) {
this.credentials = creds;
}

protected void write(ObjectOutputStream out) throws IOException {
this.credentials.write(out);
}

public void read(ObjectInputStream in) throws IOException {
this.credentials = new Credentials();
credentials.readFields(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.hadoop.mapred.JobConf;

/**
* Wrapper for using HadoopInputFormats (mapred-variant) with Flink.
*
* The IF is returning a Tuple2<K,V>.
*
* @param <K> Type of the key
* @param <V> Type of the value.
*/
public class HadoopInputFormat<K, V> extends HadoopInputFormatBase<K, V, Tuple2<K,V>> implements ResultTypeQueryable<Tuple2<K,V>> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
* limitations under the License.
*/


package org.apache.flink.api.java.hadoop.mapred;

import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit;
Expand All @@ -36,6 +35,8 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,7 +46,14 @@
import java.io.ObjectOutputStream;
import java.util.ArrayList;

public abstract class HadoopInputFormatBase<K, V, T> extends RichInputFormat<T, HadoopInputSplit> {
/**
* Common base for Java and Scala API for using Hadoop input formats with Flink.
*
* @param <K> Type of key
* @param <V> Type of value
* @param <T> The type iself
*/
public abstract class HadoopInputFormatBase<K, V, T> extends HadoopInputFormatCommonBase<T, HadoopInputSplit> {

private static final long serialVersionUID = 1L;

Expand All @@ -64,7 +72,7 @@ public abstract class HadoopInputFormatBase<K, V, T> extends RichInputFormat<T,
protected transient boolean hasNext;

public HadoopInputFormatBase(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
super();
super(job.getCredentials());
this.mapredInputFormat = mapredInputFormat;
this.keyClass = key;
this.valueClass = value;
Expand Down Expand Up @@ -225,6 +233,7 @@ private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apac
// --------------------------------------------------------------------------------------------

private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
out.writeUTF(mapredInputFormat.getClass().getName());
out.writeUTF(keyClass.getName());
out.writeUTF(valueClass.getName());
Expand All @@ -233,6 +242,8 @@ private void writeObject(ObjectOutputStream out) throws IOException {

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
super.read(in);

String hadoopInputFormatClassName = in.readUTF();
String keyClassName = in.readUTF();
String valueClassName = in.readUTF();
Expand All @@ -256,5 +267,12 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new RuntimeException("Unable to find value class.", e);
}
ReflectionUtils.setConf(mapredInputFormat, jobConf);

jobConf.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if(currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCommitter;

/**
* Wrapper for using HadoopOutputFormats (mapred-variant) with Flink.
*
* The IF is returning a Tuple2<K,V>.
*
* @param <K> Type of the key
* @param <V> Type of the value.
*/
public class HadoopOutputFormat<K,V> extends HadoopOutputFormatBase<K, V, Tuple2<K, V>> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
* limitations under the License.
*/


package org.apache.flink.api.java.hadoop.mapred;

import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.hadoop.common.HadoopOutputFormatCommonBase;
import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable;
import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
Expand All @@ -34,14 +33,24 @@
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

import static org.apache.flink.api.java.hadoop.common.HadoopInputFormatCommonBase.getCredentialsFromUGI;

public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T> implements FinalizeOnMaster {
/**
* Common base for the mapred HadoopOutputFormat wrappers. There are implementations for Java and Scala.
*
* @param <K> Type of Key
* @param <V> Type of Value
* @param <T> Record type.
*/
public abstract class HadoopOutputFormatBase<K, V, T> extends HadoopOutputFormatCommonBase<T> implements FinalizeOnMaster {

private static final long serialVersionUID = 1L;

Expand All @@ -50,9 +59,9 @@ public abstract class HadoopOutputFormatBase<K, V, T> extends RichOutputFormat<T
protected transient RecordWriter<K,V> recordWriter;
private transient OutputCommitter outputCommitter;
private transient TaskAttemptContext context;
private transient JobContext jobContext;

public HadoopOutputFormatBase(org.apache.hadoop.mapred.OutputFormat<K, V> mapredOutputFormat, JobConf job) {
super(job.getCredentials());
this.mapredOutputFormat = mapredOutputFormat;
HadoopUtils.mergeHadoopConf(job);
this.jobConf = job;
Expand Down Expand Up @@ -108,8 +117,9 @@ public void open(int taskNumber, int numTasks) throws IOException {

this.outputCommitter = this.jobConf.getOutputCommitter();

JobContext jobContext;
try {
this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -151,12 +161,14 @@ public void finalizeGlobal(int parallelism) throws IOException {
// --------------------------------------------------------------------------------------------

private void writeObject(ObjectOutputStream out) throws IOException {
super.write(out);
out.writeUTF(mapredOutputFormat.getClass().getName());
jobConf.write(out);
}

@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
super.read(in);
String hadoopOutputFormatName = in.readUTF();
if(jobConf == null) {
jobConf = new JobConf();
Expand All @@ -168,5 +180,11 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
throw new RuntimeException("Unable to instantiate the hadoop output format", e);
}
ReflectionUtils.setConf(mapredOutputFormat, jobConf);

jobConf.getCredentials().addAll(this.credentials);
Credentials currentUserCreds = getCredentialsFromUGI(UserGroupInformation.getCurrentUser());
if(currentUserCreds != null) {
jobConf.getCredentials().addAll(currentUserCreds);
}
}
}
Loading

0 comments on commit b264b01

Please sign in to comment.