Skip to content

Commit

Permalink
Fix the external packaged program test after merge with the new java api
Browse files Browse the repository at this point in the history
Relax generic bound in tuple type infor to work around bug in Java 6 compiler type inference.
Local file system is used by default when URI is missing
Removed final declaration from Tuple classes to allow inheritance (Fabian Hueske)
Fix in TupleType comparator generation (Fabian Hueske)
Cogroup Implementation (Markus Holzemer)
Added missing Apache license header (Fabian Hueske)
Initial commit for join operator for new Java API (Fabian Hueske)
Added cross operator to new Java API (Fabian Hueske)
First part of CoGroup (Markus Holzemer)
Added join example prog and fix in join operator (Fabian Hueske)
Unified operator translation (Fabian Hueske)
Add Cross Prog for API implementation (Fabian Hueske)
Implement CsvInputFormat for new Java API, including tests.
Fix logging messages in data source and sink tasks.
Fix error in data source task with new InputFormat siignatures
Add group reduce to new java api.
Finaliz Key Extractors for new Java API
Fix bug in intitialization of broadcast serializers.
Added first set of comparators.
Added generic type serialization via avro
Add first stab at KeyExtractor implementation (Aljoscha Krettek)
Add trivial parts of serialization (Aljoscha Krettek)
Connected serializers to the typeutils.
Fix tests to run with the NEW-API changes (Aljoscha Krettek)
Scala Post pass got confused because of ReduceNode renaming. (Aljoscha Krettek)
Some input format mishaps. (Aljoscha Krettek)
First set of new simple type serializers.
Connect first paths of new Java API to optimizer.
Add name() to Operator plus some other stuff (Aljoscha Krettek)
Clean up moved annotations
Add CoGroup, Cross, and Union operators (Aljoscha Krettek)
Make reduce operators children of UdfOperator (Aljoscha Krettek)
Adjust input formats to new signature
Adjusted, extended, and tested parsers for new java api compatibility.
First draft of new java api.
Removing dependency to Combinable annotation. (Jesus Camacho Rodriguez)
Moving all annotations to Java API. (Jesus Camacho Rodriguez)
Moving all annotations from the core to the Java API package. Created SemanticProperties in the core to store properties inferred currently from Java annotations. (Jesus Camacho Rodriguez)
Moving all annotations to Java API (Jesus Camacho Rodriguez)
  • Loading branch information
StephanEwen committed Mar 6, 2014
1 parent ddc1dc9 commit 0038c9d
Show file tree
Hide file tree
Showing 415 changed files with 23,028 additions and 2,606 deletions.
6 changes: 6 additions & 0 deletions stratosphere-addons/array-datamodel/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-java</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>eu.stratosphere</groupId>
<artifactId>stratosphere-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.api.common.operators.base.ReduceOperatorBase;
import eu.stratosphere.api.common.operators.base.ReduceOperatorBase.Combinable;
import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFields;
import eu.stratosphere.api.java.record.io.TextInputFormat;
import eu.stratosphere.arraymodel.ArrayModelPlan;
Expand All @@ -44,13 +43,19 @@
*/
public class WordCountArrayTuples implements Program, ProgramDescription {

private static final long serialVersionUID = 1L;


/**
* Converts a Record containing one string in to multiple string/integer pairs.
* The string is tokenized by whitespaces. For each token a new record is emitted,
* where the token is the first field and an Integer(1) is the second field.
*/

public static class TokenizeLine extends MapFunction {

private static final long serialVersionUID = 1L;

// initialize reusable mutable objects
private final StringValue word = new StringValue();
private final IntValue one = new IntValue(1);
Expand Down Expand Up @@ -82,10 +87,11 @@ public void map(Value[] record, Collector<Value[]> collector) {
* Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code>
* in the record. The other fields are not modified.
*/
@Combinable
@ConstantFields(0)
public static class CountWords extends ReduceFunction {

private static final long serialVersionUID = 1L;

private final IntValue cnt = new IntValue();
private final Value[] result = new Value[] { null, cnt };

Expand Down Expand Up @@ -118,7 +124,8 @@ public Plan getPlan(String... args) {
MapOperatorBase<TokenizeLine> mapper = new MapOperatorBase<TokenizeLine>(TokenizeLine.class, "Tokenize Lines");
mapper.setInput(source);

ReduceOperatorBase<CountWords> reducer = new ReduceOperatorBase<CountWords>(CountWords.class, new int[] {0}, "Count Words");
GroupReduceOperatorBase<CountWords> reducer = new GroupReduceOperatorBase<CountWords>(CountWords.class, new int[] {0}, "Count Words");
reducer.setCombinable(true);
reducer.setInput(mapper);

FileDataSink out = new FileDataSink(new StringIntOutputFormat(), output, reducer, "Word Counts");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
*/
public abstract class AbstractArrayModelFunction extends AbstractFunction {

private static final long serialVersionUID = 1L;

public Class<? extends Value>[] getDataTypes(int input) {
Method m = getUDFMethod();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@


public abstract class CoGroupFunction extends AbstractArrayModelFunction implements GenericCoGrouper<Value[], Value[], Value[]> {


private static final long serialVersionUID = 1L;

/**
* This method must be implemented to provide a user implementation of a
* matcher. It is called for each two key-value pairs that share the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

public abstract class CrossFunction extends AbstractArrayModelFunction implements GenericCrosser<Value[], Value[], Value[]> {

private static final long serialVersionUID = 1L;

/**
* This method must be implemented to provide a user implementation of a cross.
* It is called for each element of the Cartesian product of both input sets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public abstract class JoinFunction extends AbstractArrayModelFunction implements GenericJoiner<Value[], Value[], Value[]> {

private static final long serialVersionUID = 1L;

/**
* This method must be implemented to provide a user implementation of a matcher.
* It is called for each two records that share the same key and come from different inputs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

import java.lang.reflect.Method;

import eu.stratosphere.api.common.functions.GenericMapper;
import eu.stratosphere.api.common.functions.GenericCollectorMap;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;

public abstract class MapFunction extends AbstractArrayModelFunction implements GenericMapper<Value[], Value[]> {
public abstract class MapFunction extends AbstractArrayModelFunction implements GenericCollectorMap<Value[], Value[]> {

private static final long serialVersionUID = 1L;

/**
* This method must be implemented to provide a user implementation of a mapper.
* It is called for each individual record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import java.lang.reflect.Method;
import java.util.Iterator;

import eu.stratosphere.api.common.functions.GenericReducer;
import eu.stratosphere.api.common.functions.GenericGroupReduce;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;


public abstract class ReduceFunction extends AbstractArrayModelFunction implements GenericReducer<Value[], Value[]> {

public abstract class ReduceFunction extends AbstractArrayModelFunction implements GenericGroupReduce<Value[], Value[]> {

private static final long serialVersionUID = 1L;

/**
* The central function to be implemented for a reducer. The function receives per call one
* key and all the values that belong to that key. Each key is guaranteed to be processed by exactly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@
import java.lang.reflect.Method;
import java.util.Iterator;

import eu.stratosphere.api.common.functions.GenericReducer;
import eu.stratosphere.api.common.functions.GenericGroupReduce;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.CopyableValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.InstantiationUtil;


public abstract class ReduceWithKeyFunction extends AbstractArrayModelFunction implements GenericReducer<Value[], Value[]> {
public abstract class ReduceWithKeyFunction extends AbstractArrayModelFunction implements GenericGroupReduce<Value[], Value[]> {

private static final long serialVersionUID = 1L;

public abstract void reduce(Value key, Iterator<Value[]> records, Collector<Value[]> out);

public void combine(Value key, Iterator<Value[]> records, Collector<Value[]> out) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void configure(Configuration parameters) {

// --------------------------------------------------------------------------------------------

public boolean readRecord(Value[] target, byte[] bytes, int offset, int numBytes) {
public Value[] readRecord(Value[] reuse, byte[] bytes, int offset, int numBytes) {
StringValue str = this.theString;

if (this.ascii) {
Expand All @@ -97,11 +97,11 @@ public boolean readRecord(Value[] target, byte[] bytes, int offset, int numBytes
byte[] copy = new byte[numBytes];
System.arraycopy(bytes, offset, copy, 0, numBytes);
LOG.warn("Line could not be encoded: " + Arrays.toString(copy), e);
return false;
return null;
}
}

target[0] = str;
return true;
reuse[0] = str;
return reuse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package eu.stratosphere.arraymodel.operators;

import eu.stratosphere.api.common.operators.base.ReduceOperatorBase;
import eu.stratosphere.api.common.operators.base.GroupReduceOperatorBase;
import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeObjectWrapper;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
Expand All @@ -29,7 +29,7 @@
*
* @see ReduceFunction
*/
public class ReduceWithKeyOperator extends ReduceOperatorBase<ReduceWithKeyFunction>
public class ReduceWithKeyOperator extends GroupReduceOperatorBase<ReduceWithKeyFunction>
{
public ReduceWithKeyOperator(Class<? extends ReduceWithKeyFunction> udf, int keyPosition, String name) {
super(new UserCodeClassWrapper<ReduceWithKeyFunction>(udf), new int[] {keyPosition}, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.avro.util.Utf8;


public final class DataOutputEncoder extends Encoder {
public final class DataOutputEncoder extends Encoder implements java.io.Serializable {

private static final long serialVersionUID = 1L;

private DataOutput out;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.stratosphere.api.java.record.io.avro;
package eu.stratosphere.api.avro;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -9,18 +9,16 @@


/**
* Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache
* licenced as well)
*
* The wrapper keeps track of the position!
* Code copy pasted from org.apache.avro.mapred.FSInput (which is Apache licensed as well)
*
* The wrapper keeps track of the position in the data stream.
*/
public class FSDataInputStreamWrapper implements Closeable, SeekableInput {
private final FSDataInputStream stream;
private final long len;
private long pos;

FSDataInputStreamWrapper(final FSDataInputStream stream, final int len) {
public FSDataInputStreamWrapper(final FSDataInputStream stream, final int len) {
this.stream = stream;
this.len = len;
this.pos = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http:https://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.api.java.io;

import java.io.IOException;

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.api.avro.FSDataInputStreamWrapper;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.util.InstantiationUtil;


public class AvroInputFormat<E> extends FileInputFormat<E> {

private static final long serialVersionUID = 1L;

private static final Log LOG = LogFactory.getLog(AvroInputFormat.class);


private final Class<E> avroValueType;

private final boolean reuseAvroValue;


private transient FileReader<E> dataFileReader;



public AvroInputFormat(Class<E> type) {
this(type, true);
}

public AvroInputFormat(Class<E> type, boolean reuseAvroValue) {
this.avroValueType = type;
this.reuseAvroValue = reuseAvroValue;
}

@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);

DatumReader<E> datumReader;
if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroValueType)) {
datumReader = new SpecificDatumReader<E>(avroValueType);
} else {
datumReader = new ReflectDatumReader<E>(avroValueType);
}

LOG.info("Opening split " + split);

SeekableInput in = new FSDataInputStreamWrapper(stream, (int) split.getLength());

dataFileReader = DataFileReader.openReader(in, datumReader);
dataFileReader.sync(split.getStart());
}

@Override
public boolean reachedEnd() throws IOException {
return !dataFileReader.hasNext();
}

@Override
public E nextRecord(E reuseValue) throws IOException {
if (!dataFileReader.hasNext()) {
return null;
}

if (!reuseAvroValue) {
reuseValue = InstantiationUtil.instantiate(avroValueType, Object.class);
}

reuseValue = dataFileReader.next(reuseValue);
return reuseValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.api.avro.AvroBaseValue;
import eu.stratosphere.api.avro.FSDataInputStreamWrapper;
import eu.stratosphere.api.java.record.io.FileInputFormat;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.types.Record;
Expand Down Expand Up @@ -92,14 +93,14 @@ public boolean reachedEnd() throws IOException {
}

@Override
public boolean nextRecord(Record record) throws IOException {
public Record nextRecord(Record record) throws IOException {
if (!dataFileReader.hasNext()) {
return false;
return null;
}

reuseAvroValue = dataFileReader.next(reuseAvroValue);
wrapper.datum(reuseAvroValue);
record.setField(0, wrapper);
return true;
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.api.avro.FSDataInputStreamWrapper;
import eu.stratosphere.api.java.record.io.FileInputFormat;
import eu.stratosphere.core.fs.FileInputSplit;
import eu.stratosphere.core.fs.FileStatus;
Expand Down Expand Up @@ -81,9 +82,9 @@ public boolean reachedEnd() throws IOException {
}

@Override
public boolean nextRecord(Record record) throws IOException {
public Record nextRecord(Record record) throws IOException {
if (!dataFileReader.hasNext()) {
return false;
return null;
}
if (record == null) {
throw new IllegalArgumentException("Empty PactRecord given");
Expand All @@ -96,11 +97,10 @@ public boolean nextRecord(Record record) throws IOException {
record.updateBinaryRepresenation();
}

return true;
return record;
}



@SuppressWarnings("unchecked")
private final Value convertAvroToPactValue(final Field field, final Object avroRecord) {
if (avroRecord == null) {
Expand Down
Loading

0 comments on commit 0038c9d

Please sign in to comment.