Skip to content

Commit

Permalink
add Unit-test to check format and change some code styles
Browse files Browse the repository at this point in the history
  • Loading branch information
qmlmoon authored and rmetzger committed Feb 7, 2014
1 parent bcb6459 commit 8dd2470
Show file tree
Hide file tree
Showing 8 changed files with 535 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,99 +4,76 @@
import java.util.Collection;
import java.util.Iterator;

import eu.stratosphere.api.common.operators.util.SerializableIterator;
import eu.stratosphere.core.io.GenericInputSplit;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.ValueUtil;

/**
* input format for java collection input. It can accept collection data or serializable iterator
*
*
*/
public class CollectionInputFormat extends GenericInputFormat<Record> implements UnsplittableInput {


private static final long serialVersionUID = 1L;

private transient boolean end;

private Collection<Object> steam; //input data as collection

private SerializableIterator<Object> serializableIter; //input data as serializable iterator

private transient Iterator<Object> it;

private transient Object currObject;

private Collection<?> dataSet; //input data as collection

private Iterator<?> serializableIter; //input data as serializable iterator

private transient Iterator<?> it;

@Override
public boolean reachedEnd() throws IOException {
return this.end;
return !it.hasNext();
}

/**
* get the next Object
*/
public boolean readObject() {
if (it.hasNext()) {
currObject = it.next();
return true;
}
else {
return false;
}
}

/**
* decode the record from one Object. The record could have multiple fields.
*/
public boolean readRecord(Record target, Object b) {
target.clear();
//check whether the record field is one-dimensional or multi-dimensional
if (b.getClass().isArray()) {
for (Object s : (Object[])b){
target.addField(ValueUtil.toStratosphere(s));
}
}
else if (b instanceof Collection) {
@SuppressWarnings("unchecked")
Iterator<Object> tmp_it = ((Collection<Object>) b).iterator();
while (tmp_it.hasNext())
{
Object s = tmp_it.next();
target.addField(ValueUtil.toStratosphere(s));
}
}
else {
target.setField(0, ValueUtil.toStratosphere(b));
}
return true;
}

@Override
public void open(GenericInputSplit split) throws IOException {
this.partitionNumber = split.getSplitNumber();
if (serializableIter != null)
super.open(split);
if (serializableIter != null) {
it = serializableIter;
else
it = this.steam.iterator();
}
else {
it = this.dataSet.iterator();
}
}

@Override
public boolean nextRecord(Record record) throws IOException {
if (readObject()) {
return readRecord(record, this.currObject);
} else {
this.end = true;
if (it.hasNext()) {
record.clear();
Object b = it.next();
//check whether the record field is one-dimensional or multi-dimensional
if (b.getClass().isArray()) {
for (Object s : (Object[])b){
record.addField(ValueUtil.toStratosphere(s));
}
}
else if (b instanceof Collection) {
@SuppressWarnings("unchecked")
Iterator<Object> tmpIter = ((Collection<Object>) b).iterator();
while (tmpIter.hasNext()) {
Object s = tmpIter.next();
record.addField(ValueUtil.toStratosphere(s));
}
}
else {
record.setField(0, ValueUtil.toStratosphere(b));
}
return true;
} else {
return false;
}
}
public void setData(Collection<Object> data) {
this.steam = data;

public void setData(Collection<?> data) {
this.dataSet = data;
this.serializableIter = null;
}
public void setIter(SerializableIterator<Object> iter) {

public<T extends Iterator<?>,Serializable> void setIter(T iter) {
this.serializableIter = iter;
}

Expand Down
Loading

0 comments on commit 8dd2470

Please sign in to comment.