Skip to content

Commit

Permalink
rename hadoop-compatibility,
Browse files Browse the repository at this point in the history
more generics
  • Loading branch information
rmetzger committed Feb 9, 2014
1 parent 91ab016 commit 169e456
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 106 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.hadoopcompat;
package eu.stratosphere.hadoopcompatibility;

import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.hadoopcompat;
package eu.stratosphere.hadoopcompatibility;


import org.apache.hadoop.mapred.InputFormat;
Expand All @@ -7,8 +7,8 @@
import com.google.common.base.Preconditions;

import eu.stratosphere.api.common.operators.GenericDataSource;
import eu.stratosphere.hadoopcompat.datatypes.DefaultHadoopTypeConverter;
import eu.stratosphere.hadoopcompat.datatypes.HadoopTypeConverter;
import eu.stratosphere.hadoopcompatibility.datatypes.DefaultHadoopTypeConverter;
import eu.stratosphere.hadoopcompatibility.datatypes.HadoopTypeConverter;

/**
* The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
Expand All @@ -26,7 +26,7 @@
* * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Stratosphere's standard types.
*
*/
public class HadoopDataSource extends GenericDataSource<HadoopInputFormatWrapper> {
public class HadoopDataSource<K,V> extends GenericDataSource<HadoopInputFormatWrapper<K,V>> {

private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>";

Expand All @@ -39,23 +39,23 @@ public class HadoopDataSource extends GenericDataSource<HadoopInputFormatWrapper
* @param name Name of the DataSource
* @param conv Definition of a custom type converter {@link DefaultHadoopTypeConverter}.
*/
public HadoopDataSource(InputFormat hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter conv) {
super(new HadoopInputFormatWrapper(hadoopFormat, jobConf, conv),name);
public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
super(new HadoopInputFormatWrapper<K,V>(hadoopFormat, jobConf, conv),name);
Preconditions.checkNotNull(hadoopFormat);
Preconditions.checkNotNull(jobConf);
Preconditions.checkNotNull(conv);
this.name = name;
this.jobConf = jobConf;
}

public HadoopDataSource(InputFormat hadoopFormat, JobConf jobConf, String name) {
this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter() );
public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name) {
this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter<K,V>() );
}
public HadoopDataSource(InputFormat hadoopFormat, JobConf jobConf) {
public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) {
this(hadoopFormat, jobConf, DEFAULT_NAME);
}

public HadoopDataSource(InputFormat hadoopFormat) {
public HadoopDataSource(InputFormat<K,V> hadoopFormat) {
this(hadoopFormat, new JobConf(), DEFAULT_NAME);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,48 +1,50 @@
package eu.stratosphere.hadoopcompat;
package eu.stratosphere.hadoopcompatibility;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.WriteAbortedException;
import java.util.Map.Entry;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

import eu.stratosphere.api.common.io.InputFormat;
import eu.stratosphere.api.common.io.statistics.BaseStatistics;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.hadoopcompat.datatypes.DefaultHadoopTypeConverter;
import eu.stratosphere.hadoopcompat.datatypes.HadoopTypeConverter;
import eu.stratosphere.hadoopcompatibility.datatypes.HadoopTypeConverter;
import eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;

public class HadoopInputFormatWrapper implements InputFormat<Record, HadoopInputSplitWrapper> {
public class HadoopInputFormatWrapper<K, V> implements InputFormat<Record, HadoopInputSplitWrapper> {

private static final long serialVersionUID = 1L;

public org.apache.hadoop.mapred.InputFormat<Object, Object> hadoopInputFormat;
public HadoopTypeConverter converter;
public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat;
public HadoopTypeConverter<K,V> converter;
private String hadoopInputFormatName;
public JobConf jobConf;
public transient Object key;
public transient Object value;
public RecordReader<Object, Object> recordReader;
public transient K key;
public transient V value;
public RecordReader<K, V> recordReader;
private boolean fetched = false;
private boolean hasNext;

public HadoopInputFormatWrapper() {
super();
}

public HadoopInputFormatWrapper(org.apache.hadoop.mapred.InputFormat hadoopInputFormat, JobConf job, HadoopTypeConverter conv) {
public HadoopInputFormatWrapper(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) {
super();
this.hadoopInputFormat = hadoopInputFormat;
this.hadoopInputFormatName = hadoopInputFormat.getClass().getName();
this.jobConf = job;
this.converter = conv;
// merge hadoopConf into jobConf. This is necessary for the hdfs configuration
org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
for (Entry<String, String> e : hadoopConf) {
job.set(e.getKey(), e.getValue());
}
this.jobConf = job;
}

@Override
Expand All @@ -51,8 +53,7 @@ public void configure(Configuration parameters) {
}

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics)
throws IOException {
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return null;
}

Expand Down Expand Up @@ -128,24 +129,24 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE
}
jobConf.readFields(in);
try {
this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat) Class.forName(this.hadoopInputFormatName).newInstance();
this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(this.hadoopInputFormatName).newInstance();
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate the hadoop input format", e);
}
ReflectionUtils.setConf(hadoopInputFormat, jobConf);
converter = (HadoopTypeConverter) in.readObject();
converter = (HadoopTypeConverter<K,V>) in.readObject();
}

public void setJobConf(JobConf job) {
this.jobConf = job;
}


public org.apache.hadoop.mapred.InputFormat getHadoopInputFormat() {
public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
return hadoopInputFormat;
}

public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat hadoopInputFormat) {
public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat) {
this.hadoopInputFormat = hadoopInputFormat;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.hadoopcompat;
package eu.stratosphere.hadoopcompatibility;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.hadoopcompat.datatypes;
package eu.stratosphere.hadoopcompatibility.datatypes;

import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
Expand All @@ -7,8 +7,6 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import eu.stratosphere.types.BooleanValue;
import eu.stratosphere.types.ByteValue;
Expand All @@ -26,7 +24,7 @@
* Key will be in field 0, Value in field 1 of a Stratosphere Record.
*
*/
public class DefaultHadoopTypeConverter<K extends WritableComparable<?> , V extends Writable> implements HadoopTypeConverter<K, V> {
public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> {
private static final long serialVersionUID = 1L;

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package eu.stratosphere.hadoopcompat.datatypes;
package eu.stratosphere.hadoopcompatibility.datatypes;

import java.io.Serializable;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import eu.stratosphere.types.Record;


Expand All @@ -17,7 +14,7 @@
* Stratosphere provides a DefaultHadoopTypeConverter. Custom implementations should
* chain the type converters.
*/
public interface HadoopTypeConverter<K extends WritableComparable<?>, V extends Writable> extends Serializable {
public interface HadoopTypeConverter<K, V> extends Serializable {

/**
* Convert a Hadoop type to a Stratosphere type.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package eu.stratosphere.hadoopcompat.datatypes;
package eu.stratosphere.hadoopcompatibility.datatypes;

import org.apache.hadoop.io.WritableComparable;

import eu.stratosphere.types.Key;

public class WritableComparableWrapper<T extends WritableComparable<?>> extends WritableWrapper<T> implements Key {
public class WritableComparableWrapper<T extends WritableComparable> extends WritableWrapper<T> implements Key {
private static final long serialVersionUID = 1L;

public WritableComparableWrapper() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.hadoopcompat.datatypes;
package eu.stratosphere.hadoopcompatibility.datatypes;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -24,8 +24,8 @@ public WritableWrapper(T toWrap) {
wrappedType = toWrap.getClass().getCanonicalName();
}

public T value() {
return wrapped;
public <X extends Writable> X value() {
return (X) wrapped;
}

@Override
Expand All @@ -44,7 +44,7 @@ public void read(DataInput in) throws IOException {
Class wrClass = Class.forName(wrappedType, true, cl);
wrapped = (T) InstantiationUtil.instantiate(wrClass, Writable.class);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Error cerating the WritableWrapper");
throw new RuntimeException("Error creating the WritableWrapper", e);
}
wrapped.readFields(in);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package eu.stratosphere.hadoopcompat.datatypes;
package eu.stratosphere.hadoopcompatibility.datatypes;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;

public class WritableWrapperConverter<K extends WritableComparable<?> , V extends Writable> implements HadoopTypeConverter<K,V> {
public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> {
private static final long serialVersionUID = 1L;

@Override
Expand All @@ -22,5 +22,4 @@ private final Value convertKey(K in) {
private final Value convertValue(V in) {
return new WritableWrapper<V>(in);
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.hadoopcompat.example;
package eu.stratosphere.hadoopcompatibility.example;
/***********************************************************************************************************************
* Copyright (C) 2010-2013 by the Stratosphere project (https://stratosphere.eu)
*
Expand Down Expand Up @@ -37,9 +37,9 @@
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator.Combinable;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.hadoopcompat.HadoopDataSource;
import eu.stratosphere.hadoopcompat.HadoopInputFormatWrapper;
import eu.stratosphere.hadoopcompat.datatypes.WritableWrapperConverter;
import eu.stratosphere.hadoopcompatibility.HadoopDataSource;
import eu.stratosphere.hadoopcompatibility.HadoopInputFormatWrapper;
import eu.stratosphere.hadoopcompatibility.datatypes.WritableWrapperConverter;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
Expand Down Expand Up @@ -121,7 +121,7 @@ public Plan getPlan(String... args) {
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));

// Example with Wrapper Converter
HadoopDataSource sourceHadoopType = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>(new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>());
TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));

MapOperator mapper = MapOperator.builder(new TokenizeLine())
Expand Down
Loading

0 comments on commit 169e456

Please sign in to comment.