Skip to content

Commit

Permalink
[FLINK-4048] Remove Hadoop from DataSet API
Browse files Browse the repository at this point in the history
This removes all Hadoop-related methods from ExecutionEnvironment (there
are already equivalent methods in flink-hadoop-compatibility (see
HadoopUtils and HadoopInputs, etc.). This also removes Hadoop-specific
tests from flink-tests because these are duplicated by tests in
flink-hadoop-compatibility.

This also removes Hadoop-specic example code from flink-examples: the
DistCp example and related code.
  • Loading branch information
aljoscha committed Sep 27, 2017
1 parent 58320e8 commit 4beff13
Show file tree
Hide file tree
Showing 31 changed files with 56 additions and 272 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapred

import org.apache.flink.annotation.Public
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase
import org.apache.hadoop.mapred.{JobConf, InputFormat}
import org.apache.hadoop.mapred.{InputFormat, JobConf}

@Public
class HadoopInputFormat[K, V](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapred

import org.apache.flink.annotation.Public
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase
import org.apache.hadoop.mapred.{OutputCommitter, JobConf, OutputFormat}
import org.apache.hadoop.mapred.{JobConf, OutputCommitter, OutputFormat}

@Public
class HadoopOutputFormat[K, V](mapredOutputFormat: OutputFormat[K, V], job: JobConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@
import org.apache.hadoop.util.Progressable;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

import java.io.IOException;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -56,7 +56,7 @@ public void testOpen() throws Exception {

OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
JobConf jobConf = spy(new JobConf());
JobConf jobConf = Mockito.spy(new JobConf());
when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);

HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
Expand Down Expand Up @@ -146,7 +146,7 @@ public void testWriteRecord() throws Exception {
public void testFinalizeGlobal() throws Exception {
OutputFormat<String, Long> dummyOutputFormat = mock(DummyOutputFormat.class);
DummyOutputCommitter outputCommitter = mock(DummyOutputCommitter.class);
JobConf jobConf = spy(new JobConf());
JobConf jobConf = Mockito.spy(new JobConf());
when(jobConf.getOutputCommitter()).thenReturn(outputCommitter);

HadoopOutputFormat<String, Long> outputFormat = new HadoopOutputFormat<>(dummyOutputFormat, jobConf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.test.hadoop.mapred;
package org.apache.flink.test.hadoopcompatibility.mapred;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
* limitations under the License.
*/

package org.apache.flink.test.hadoop.mapred;
package org.apache.flink.test.hadoopcompatibility.mapred;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OperatingSystem;

import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -79,8 +81,8 @@ private void internalRun(boolean isTestDeprecatedAPI) throws Exception {
DataSet<Tuple2<LongWritable, Text>> input;

if (isTestDeprecatedAPI) {
input = env.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath);
input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
} else {
input = env.createInput(readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
Expand Down Expand Up @@ -118,4 +120,17 @@ public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exce
words.output(hadoopOutputFormat);
env.execute("Hadoop Compat WordCount");
}

static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
* limitations under the License.
*/

package org.apache.flink.test.hadoop.mapreduce;
package org.apache.flink.test.hadoopcompatibility.mapreduce;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.flink.test.testdata.WordCountData;
import org.apache.flink.test.testfunctions.Tokenizer;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OperatingSystem;

import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -78,8 +80,8 @@ private void internalRun(boolean isTestDeprecatedAPI) throws Exception {

DataSet<Tuple2<LongWritable, Text>> input;
if (isTestDeprecatedAPI) {
input = env.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath);
input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
} else {
input = env.createInput(readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, textPath));
Expand Down Expand Up @@ -118,4 +120,17 @@ public Tuple2<Text, LongWritable> map(Tuple2<String, Integer> value) throws Exce
words.output(hadoopOutputFormat);
env.execute("Hadoop Compat WordCount");
}

static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.hadoop.mapred
package org.apache.flink.api.hadoopcompatibility.scala

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.test.testdata.WordCountData
import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
Expand Down Expand Up @@ -52,7 +53,9 @@ class WordCountMapredITCase extends JavaProgramTestBase {

val input =
if (testDeprecatedAPI) {
env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
} else {
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
classOf[Text], textPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
* limitations under the License.
*/

package org.apache.flink.api.scala.hadoop.mapreduce
package org.apache.flink.api.hadoopcompatibility.scala

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.test.testdata.WordCountData
import org.apache.flink.test.util.{TestBaseUtils, JavaProgramTestBase}
import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
import org.apache.flink.util.OperatingSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
Expand Down Expand Up @@ -63,7 +64,9 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {

val input =
if (testDeprecatedAPI) {
env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
env.createInput(
HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
} else {
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
classOf[Text], textPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.io.IteratorInputFormat;
Expand Down Expand Up @@ -61,8 +60,6 @@
import org.apache.flink.util.Visitor;

import com.esotericsoftware.kryo.Serializer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -582,109 +579,6 @@ public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat, TypeInformat
return new DataSource<>(this, inputFormat, producedType, Utils.getCallLocationName());
}

// ----------------------------------- Hadoop Input Format ---------------------------------------

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String, JobConf)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);

org.apache.hadoop.mapred.FileInputFormat.addInputPath(job, new org.apache.hadoop.fs.Path(inputPath));

return result;
}

/**
* Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
* A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readSequenceFile(Class<K>, Class<V>, String)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
* {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V>, Class<K>, Class<V>, JobConf)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);

return this.createInput(hadoopInputFormat);
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
* given inputName is set on the given job.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String, Job)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);

org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath(job, new org.apache
.hadoop.fs.Path(inputPath));

return result;
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
* {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>, Class<K>, Class<V>, String)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
}

/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
*
* @deprecated Please use {@code org.apache.flink.hadoopcompatibility.HadoopInputs#createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V>, Class<K>, Class<V>, Job)}
* from the flink-hadoop-compatibility module.
*/
@Deprecated
@PublicEvolving
public <K, V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K, V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);

return this.createInput(hadoopInputFormat);
}

// ----------------------------------- Collection ---------------------------------------

/**
Expand Down
Loading

0 comments on commit 4beff13

Please sign in to comment.