Skip to content

Commit

Permalink
create parallel collections and add java value to stratosphere value …
Browse files Browse the repository at this point in the history
…interface
  • Loading branch information
qmlmoon authored and rmetzger committed Feb 7, 2014
1 parent 97147a3 commit bcb6459
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package eu.stratosphere.api.common.io;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;

import eu.stratosphere.api.common.operators.util.SerializableIterator;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.ValueUtil;

/**
* input format for java collection input. It can accept collection data or serializable iterator
*
*/
public class CollectionInputFormat extends GenericInputFormat<Record> implements UnsplittableInput {


private static final long serialVersionUID = 1L;

private transient boolean end;

private Collection<Object> steam; //input data as collection

private SerializableIterator<Object> serializableIter; //input data as serializable iterator

private transient Iterator<Object> it;

private transient Object currObject;

@Override
public boolean reachedEnd() throws IOException {
return this.end;
}

/**
* get the next Object
*/
public boolean readObject() {
if (it.hasNext()) {
currObject = it.next();
return true;
}
else {
return false;
}
}

/**
* decode the record from one Object. The record could have multiple fields.
*/
public boolean readRecord(Record target, Object b) {
target.clear();
//check whether the record field is one-dimensional or multi-dimensional
if (b.getClass().isArray()) {
for (Object s : (Object[])b){
target.addField(ValueUtil.toStratosphere(s));
}
}
else if (b instanceof Collection) {
@SuppressWarnings("unchecked")
Iterator<Object> tmp_it = ((Collection<Object>) b).iterator();
while (tmp_it.hasNext())
{
Object s = tmp_it.next();
target.addField(ValueUtil.toStratosphere(s));
}
}
else {
target.setField(0, ValueUtil.toStratosphere(b));
}
return true;
}

@Override
public void open(GenericInputSplit split) throws IOException {
this.partitionNumber = split.getSplitNumber();
if (serializableIter != null)
it = serializableIter;
else
it = this.steam.iterator();
}

@Override
public boolean nextRecord(Record record) throws IOException {
if (readObject()) {
return readRecord(record, this.currObject);
} else {
this.end = true;
return false;
}
}

public void setData(Collection<Object> data) {
this.steam = data;
this.serializableIter = null;
}

public void setIter(SerializableIterator<Object> iter) {
this.serializableIter = iter;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package eu.stratosphere.api.common.operators;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

import eu.stratosphere.api.common.io.CollectionInputFormat;
import eu.stratosphere.api.common.io.GenericInputFormat;
import eu.stratosphere.api.common.operators.util.SerializableIterator;

/**
* Operator for input nodes which reads data from collection or iterator.
*
*/
public class CollectionDataSource extends GenericDataSource<GenericInputFormat<?>> {

private static String DEFAULT_NAME = "<Unnamed Collection Data Source>";


// --------------------------------------------------------------------------------------------

/**
* Creates a new instance for the given input using the given input format.
*
* @param f The {@link CollectionInputFormat} implementation used to read the data.
* @param data The input data. It should be a collection, an array or a {@link SerializableIterator}.
* @param name The given name for the Pact, used in plans, logs and progress messages.
*/
public CollectionDataSource(CollectionInputFormat f, Collection<Object> data, String name) {
super(f, name);
checkFormat(data);
f.setData(data);
}

/**
* Creates a new instance for the given input using the given input format. The contract has the default name.
* The input types will be checked. If the input types don't agree, an exception will occur.
*
* @param f The {@link CollectionInputFormat} implementation used to read the data.
* @param data The input data. It should be a collection, an array or a {@link SerializableIterator}.
*/
@SuppressWarnings("unchecked")
public CollectionDataSource(CollectionInputFormat f, Object[] data) {
super(f, DEFAULT_NAME);
if (data.length == 1 && data[0] instanceof SerializableIterator) {
f.setIter((SerializableIterator<Object>)data[0]);
}
else if (data.length == 1 && data[0] instanceof Collection) {
f.setData((Collection<Object>)data[0]);
}
else {
Collection<Object> tmp = new ArrayList<Object>();
for (Object o : data) {
tmp.add(o);
}
checkFormat(tmp);
f.setData(tmp);
}

}


/**
* Creates a new instance for the given input using the given input format. The contract has the default name.
* The input types will be checked. If the input types don't agree, an exception will occur.
*
* @param args The input data. It should be a collection, an array or a {@link SerializableIterator}.
*/
public CollectionDataSource(Object... args) {
this(new CollectionInputFormat(), args);
}

public CollectionDataSource(Object[][] args) {
this(new CollectionInputFormat(), args);
}

public CollectionDataSource(Collection<Object> args) {
this(new CollectionInputFormat(), args, DEFAULT_NAME);
}




// --------------------------------------------------------------------------------------------
/*
* check whether the input field has the same type
*/
private void checkFormat(Collection<Object> c) {
String type = null;
List<String> typeList = new ArrayList<String>();
Iterator<Object> it = c.iterator();
while (it.hasNext()) {
Object o = it.next();
//check the input types for 1-dimension
if (type != null && !type.equals(o.getClass().getName())) {
throw new RuntimeException("elements of input list should have the same type");
}
else {
type = o.getClass().getName();
}

//check the input types for 2-dimension array
if (typeList.size() == 0 && o.getClass().isArray()) {
for (Object s: (Object[])o) {
typeList.add(s.getClass().getName());
}
}
else if (o.getClass().isArray()) {
int index = 0;
if (((Object[])o).length != typeList.size()) {
throw new RuntimeException("elements of input list should have the same size");
}
for (Object s:(Object[])o)
if (!s.getClass().getName().equals(typeList.get(index++))) {
throw new RuntimeException("elements of input list should have the same type");
}
}

//check the input types for 2-dimension collection
if (typeList.size() == 0 && o instanceof Collection) {
@SuppressWarnings("unchecked")
Iterator<Object> tmpIt = ((Collection<Object>) o).iterator();
while (tmpIt.hasNext())
{
Object s = tmpIt.next();
typeList.add(s.getClass().getName());
}
}
else if (o instanceof Collection) {
int index = 0;
@SuppressWarnings("unchecked")
Iterator<Object> tmpIt = ((Collection<Object>) o).iterator();
while (tmpIt.hasNext()) {
Object s = tmpIt.next();
if (!s.getClass().getName().equals(typeList.get(index++))) {
throw new RuntimeException("elements of input list should have the same type");
}
}

if (index != typeList.size()) {
throw new RuntimeException("elements of input list should have the same size");
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package eu.stratosphere.api.common.operators.util;

import java.io.Serializable;
import java.util.Iterator;

/**
* Interface for serializable iterator
*
*/
public abstract class SerializableIterator<E> implements Iterator<E>, Serializable {

/**
*
*/
private static final long serialVersionUID = 1L;

@Override
public final void remove() {}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package eu.stratosphere.types;

/**
* convert the java.lang type into stratosphere type
*/
public class ValueUtil {

public static Value toStratosphere(Object java) {

if (java instanceof java.lang.Boolean)
return new BooleanValue(((java.lang.Boolean)java).booleanValue());
if (java instanceof java.lang.Integer)
return new IntValue(((java.lang.Integer)java).intValue());
if (java instanceof java.lang.Byte)
return new ByteValue(((java.lang.Byte)java).byteValue());
if (java instanceof java.lang.Character)
return new CharValue(((java.lang.Character)java).charValue());
if (java instanceof java.lang.Double)
return new DoubleValue(((java.lang.Double)java).doubleValue());
if (java instanceof java.lang.Float)
return new FloatValue(((java.lang.Float)java).floatValue());
if (java instanceof java.lang.Long)
return new LongValue(((java.lang.Long)java).longValue());
if (java instanceof java.lang.Short)
return new ShortValue(((java.lang.Short)java).shortValue());
if (java instanceof java.lang.String)
return new StringValue(((java.lang.String)java).toString());
if (java == null)
return NullValue.getInstance();
throw new IllegalArgumentException("unsupported java value");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import eu.stratosphere.types.parser._
import eu.stratosphere.api.common.io.InputFormat
import eu.stratosphere.api.common.operators.GenericDataSource
import eu.stratosphere.api.common.operators.FileDataSource
import eu.stratosphere.api.common.operators.{CollectionDataSource => JavaCollectionDataSource}
import eu.stratosphere.configuration.Configuration
import eu.stratosphere.api.common.io.FileInputFormat
import eu.stratosphere.api.common.io.GenericInputFormat
import eu.stratosphere.api.scala.operators.TextInputFormat
import collection.JavaConversions._
import eu.stratosphere.api.common.operators.util.SerializableIterator;

object DataSource {

Expand Down Expand Up @@ -60,13 +63,55 @@ object DataSource {
}
}

object CollectionDataSource {
/*
constructor for collection input
*/
def apply[Out: UDT](data: Iterable[Out]):DataSet[Out] with OutputHintable[Out] = {
/*
reuse the java implementation of collection data by adding scala operator
*/
val js:java.util.Collection[Out] = data
val ret = new JavaCollectionDataSource(js)
with ScalaOperator[Out]{

val udf = new UDF0(implicitly[UDT[Out]])
override def getUDF = udf

}

new DataSet[Out](ret) with OutputHintable[Out] {}
}

/*
constructor for {@link SerializableIterator} input
*/
def apply[Out: UDT](data: SerializableIterator[Out]) = {

/*
reuse the java implementation of collection data by adding scala operator
*/
val ret = new JavaCollectionDataSource(data)
with ScalaOperator[Out]{

val udf = new UDF0(implicitly[UDT[Out]])
override def getUDF = udf

}

new DataSet[Out](ret) with OutputHintable[Out] {}
}
}



trait ScalaInputFormat[Out] { this: InputFormat[_, _] =>
def getUDF: UDF0[Out]
def persistConfiguration(config: Configuration) = {}
def configure(config: Configuration)
}


object TextFile {
def apply(url: String): DataSet[String] with OutputHintable[String] = DataSource(url, TextInputFormat())
}
Loading

0 comments on commit bcb6459

Please sign in to comment.