Skip to content

Commit

Permalink
[FLINK-1032] Rework support for POJO types in the Java API
Browse files Browse the repository at this point in the history
  • Loading branch information
rmetzger committed Oct 8, 2014
1 parent ec82d97 commit 926f835
Show file tree
Hide file tree
Showing 108 changed files with 3,844 additions and 951 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.Path;
import org.junit.Assert;
Expand All @@ -43,8 +43,8 @@ public void testTypeExtraction() {
TypeInformation<?> typeInfoDataSet = input.getType();


Assert.assertTrue(typeInfoDirect instanceof GenericTypeInfo);
Assert.assertTrue(typeInfoDataSet instanceof GenericTypeInfo);
Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo);
Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo);

Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass());
Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.junit.Assert;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.junit.After;
import org.junit.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
Expand Down Expand Up @@ -292,7 +293,7 @@ private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T>

TypeComparator<T> comparator;
if (typeInfo instanceof CompositeType) {
comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder);
comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0);
}
else if (typeInfo instanceof AtomicType) {
// handle grouping of atomic types
Expand All @@ -306,8 +307,8 @@ else if (typeInfo instanceof AtomicType) {
}

private static <T1 extends Tuple, T2 extends Tuple> TypePairComparatorFactory<T1,T2> createPairComparator(TypeInformation<?> typeInfo1, TypeInformation<?> typeInfo2) {
if (!(typeInfo1.isTupleType() && typeInfo2.isTupleType())) {
throw new RuntimeException("The runtime currently supports only keyed binary operations on tuples.");
if (!(typeInfo1.isTupleType() || typeInfo1 instanceof PojoTypeInfo) && (typeInfo2.isTupleType() || typeInfo2 instanceof PojoTypeInfo)) {
throw new RuntimeException("The runtime currently supports only keyed binary operations (such as joins) on tuples and POJO types.");
}

// @SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void testJavaApiWithDeferredSoltionSetUpdateWithMapper() {
// verify joinWithSolutionSet
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
assertEquals(new FieldList(0, 1), joinWithSolutionSetNode.getKeysForInput1());
assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());


// verify reducer
Expand Down Expand Up @@ -125,7 +125,7 @@ public void testRecordApiWithDeferredSoltionSetUpdateWithNonPreservingJoin() {
// verify joinWithSolutionSet
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
assertEquals(new FieldList(0, 1), joinWithSolutionSetNode.getKeysForInput1());
assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());

// verify reducer
assertEquals(ShipStrategyType.PARTITION_HASH, worksetReducer.getInput().getShipStrategy());
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testRecordApiWithDirectSoltionSetUpdate() {
// verify joinWithSolutionSet
assertEquals(ShipStrategyType.PARTITION_HASH, joinWithSolutionSetNode.getInput1().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, joinWithSolutionSetNode.getInput2().getShipStrategy());
assertEquals(new FieldList(0, 1), joinWithSolutionSetNode.getKeysForInput1());
assertEquals(new FieldList(1, 0), joinWithSolutionSetNode.getKeysForInput1());

// verify reducer
assertEquals(ShipStrategyType.FORWARD, worksetReducer.getInput().getShipStrategy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public InvalidProgramException() {
public InvalidProgramException(String message) {
super(message);
}

public InvalidProgramException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public InvalidTypesException() {
public InvalidTypesException(String message) {
super(message);
}

public InvalidTypesException(String message, Throwable e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
import org.apache.flink.api.common.operators.util.TypeComparable;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.types.Value;
import org.apache.flink.util.Visitor;
Expand Down Expand Up @@ -349,7 +349,7 @@ private <T> List<T> executeDeltaIteration(DeltaIterationBase<?, ?> iteration) th

int[] keyColumns = iteration.getSolutionSetKeyFields();
boolean[] inputOrderings = new boolean[keyColumns.length];
TypeComparator<T> inputComparator = ((CompositeType<T>) solutionType).createComparator(keyColumns, inputOrderings);
TypeComparator<T> inputComparator = ((CompositeType<T>) solutionType).createComparator(keyColumns, inputOrderings, 0);

Map<TypeComparable<T>, T> solutionMap = new HashMap<TypeComparable<T>, T>(solutionInputData.size());
// fill the solution from the initial input
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
Expand Down Expand Up @@ -227,13 +227,12 @@ protected List<OUT> executeOnCollections(List<IN1> input1, List<IN2> input2, Run
return result;
}

@SuppressWarnings("unchecked")
private <T> TypeComparator<T> getTypeComparator(TypeInformation<T> inputType, int[] inputKeys, boolean[] inputSortDirections) {
if (!(inputType instanceof CompositeType)) {
throw new InvalidProgramException("Input types of coGroup must be composite types.");
}

return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections);
return ((CompositeType<T>) inputType).createComparator(inputKeys, inputSortDirections, 0);
}

private static class CoGroupSortListIterator<IN1, IN2> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import com.google.common.base.Preconditions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -154,15 +156,18 @@ protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx,
}

if (inputType instanceof CompositeType) {
@SuppressWarnings("unchecked")
final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings);

Collections.sort(inputData, new Comparator<IN>() {
@Override
public int compare(IN o1, IN o2) {
return sortComparator.compare(o1, o2);
}
});
if(sortColumns.length == 0) { // => all reduce. No comparator
Preconditions.checkArgument(sortOrderings.length == 0);
} else {
final TypeComparator<IN> sortComparator = ((CompositeType<IN>) inputType).createComparator(sortColumns, sortOrderings, 0);

Collections.sort(inputData, new Comparator<IN>() {
@Override
public int compare(IN o1, IN o2) {
return sortComparator.compare(o1, o2);
}
});
}
}

FunctionUtils.setFunctionRuntimeContext(function, ctx);
Expand All @@ -188,7 +193,7 @@ public int compare(IN o1, IN o2) {
} else {
final TypeSerializer<IN> inputSerializer = inputType.createSerializer();
boolean[] keyOrderings = new boolean[keyColumns.length];
final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings);
final TypeComparator<IN> comparator = ((CompositeType<IN>) inputType).createComparator(keyColumns, keyOrderings, 0);

ListKeyGroupedIterator<IN> keyedIterator = new ListKeyGroupedIterator<IN>(inputData, inputSerializer, comparator, mutableObjectSafeMode);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
Expand Down Expand Up @@ -86,7 +86,7 @@ else if(leftInformation instanceof CompositeType){
boolean[] orders = new boolean[keyPositions.length];
Arrays.fill(orders, true);

leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders);
leftComparator = ((CompositeType<IN1>) leftInformation).createComparator(keyPositions, orders, 0);
}else{
throw new RuntimeException("Type information for left input of type " + leftInformation.getClass()
.getCanonicalName() + " is not supported. Could not generate a comparator.");
Expand All @@ -99,7 +99,7 @@ else if(leftInformation instanceof CompositeType){
boolean[] orders = new boolean[keyPositions.length];
Arrays.fill(orders, true);

rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders);
rightComparator = ((CompositeType<IN2>) rightInformation).createComparator(keyPositions, orders, 0);
}else{
throw new RuntimeException("Type information for right input of type " + rightInformation.getClass()
.getCanonicalName() + " is not supported. Could not generate a comparator.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.CompositeType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;

Expand Down Expand Up @@ -125,7 +125,6 @@ public ReduceOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<T, T

// --------------------------------------------------------------------------------------------

@SuppressWarnings("unchecked")
@Override
protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, boolean mutableObjectSafeMode) throws Exception {
// make sure we can handle empty inputs
Expand All @@ -151,7 +150,7 @@ protected List<T> executeOnCollections(List<T> inputData, RuntimeContext ctx, bo

if (inputColumns.length > 0) {
boolean[] inputOrderings = new boolean[inputColumns.length];
TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings);
TypeComparator<T> inputComparator = ((CompositeType<T>) inputType).createComparator(inputColumns, inputOrderings, 0);

Map<TypeComparable<T>, T> aggregateMap = new HashMap<TypeComparable<T>, T>(inputData.size() / 10);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public boolean isTupleType() {
public int getArity() {
return 1;
}

@Override
public int getTotalFields() {
return 1;
}

@Override
public Class<T> getTypeClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ public int getArity() {
return 1;
}

@Override
public int getTotalFields() {
return 1;
}

@Override
public Class<T> getTypeClass() {
return this.clazz;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public int getArity() {
return 0;
}

@Override
public int getTotalFields() {
return 0;
}

@Override
public Class<Nothing> getTypeClass() {
return Nothing.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public boolean isTupleType() {
public int getArity() {
return 1;
}

@Override
public int getTotalFields() {
return 1;
}

@Override
public Class<T> getTypeClass() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public abstract class TypeInformation<T> {
public abstract boolean isKeyType();

public abstract TypeSerializer<T> createSerializer();

/**
* @return The number of fields in this type, including its sub-fields (for compsite types)
*/
public abstract int getTotalFields();

}
Loading

0 comments on commit 926f835

Please sign in to comment.