Skip to content

Commit

Permalink
Really add POJO support and nested keys for Scala API
Browse files Browse the repository at this point in the history
This also adds more integration tests, but not all tests of the Java API
have been ported to Scala yet.
  • Loading branch information
aljoscha authored and rmetzger committed Oct 8, 2014
1 parent 598ae37 commit 6be8555
Show file tree
Hide file tree
Showing 22 changed files with 1,587 additions and 393 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object ConnectedComponents {
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array("_1")) {
(s, ws) =>

// apply the step logic: join with the edges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.LinkedList;
import java.util.List;

import com.google.common.base.Joiner;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -306,7 +307,12 @@ public int[] computeLogicalKeyPositions() {
}
return Ints.toArray(logicalKeys);
}


@Override
public String toString() {
Joiner join = Joiner.on('.');
return "ExpressionKeys: " + join.join(keyFields);
}
}

private static String[] removeDuplicates(String[] in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,9 @@ private void checkWCPojoAsserts(TypeInformation<?> typeInfo) {
Assert.assertEquals(typeInfo.getTypeClass(), WC.class);
Assert.assertEquals(typeInfo.getArity(), 2);
}


// Kryo is required for this, so disable for now.
@Ignore
@Test
public void testPojoAllPublic() {
TypeInformation<?> typeForClass = TypeExtractor.createTypeInfo(AllPublic.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,11 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
/**
* Creates a new DataSet containing the distinct elements of this DataSet. The decision whether
* two elements are distinct or not is made based on only the specified fields.
*
* This only works on CaseClass DataSets
*/
def distinct(firstField: String, otherFields: String*): DataSet[T] = {
val fieldIndices = fieldNames2Indices(javaSet.getType, firstField +: otherFields.toArray)
wrap(new DistinctOperator[T](
javaSet,
new Keys.ExpressionKeys[T](fieldIndices, javaSet.getType, true)))
new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType)))
}

/**
Expand Down Expand Up @@ -615,8 +612,6 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* This only works on CaseClass DataSets.
*/
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T] = {
// val fieldIndices = fieldNames2Indices(javaSet.getType, firstField +: otherFields.toArray)

new GroupedDataSet[T](
this,
new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType))
Expand Down Expand Up @@ -862,10 +857,8 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
*/
def iterateDelta[R: ClassTag](workset: DataSet[R], maxIterations: Int, keyFields: Array[String])(
stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])) = {
val fieldIndices = fieldNames2Indices(javaSet.getType, keyFields)


val key = new ExpressionKeys[T](fieldIndices, javaSet.getType, false)
val key = new ExpressionKeys[T](keyFields, javaSet.getType)
val iterativeSet = new DeltaIteration[T, R](
javaSet.getExecutionEnvironment,
javaSet.getType,
Expand Down Expand Up @@ -931,12 +924,10 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
* significant amount of time.
*/
def partitionByHash(firstField: String, otherFields: String*): DataSet[T] = {
val fieldIndices = fieldNames2Indices(javaSet.getType, firstField +: otherFields.toArray)

val op = new PartitionOperator[T](
javaSet,
PartitionMethod.HASH,
new Keys.ExpressionKeys[T](fieldIndices, javaSet.getType, false))
new Keys.ExpressionKeys[T](firstField +: otherFields.toArray, javaSet.getType))
wrap(op)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GroupedDataSet[T: ClassTag](

// These are for optional secondary sort. They are only used
// when using a group-at-a-time reduce function.
private val groupSortKeyPositions = mutable.MutableList[Int]()
private val groupSortKeyPositions = mutable.MutableList[Either[Int, String]]()
private val groupSortOrders = mutable.MutableList[Order]()

/**
Expand All @@ -64,7 +64,7 @@ class GroupedDataSet[T: ClassTag](
if (field >= set.getType.getArity) {
throw new IllegalArgumentException("Order key out of tuple bounds.")
}
groupSortKeyPositions += field
groupSortKeyPositions += Left(field)
groupSortOrders += order
this
}
Expand All @@ -76,9 +76,7 @@ class GroupedDataSet[T: ClassTag](
* This only works on CaseClass DataSets.
*/
def sortGroup(field: String, order: Order): GroupedDataSet[T] = {
val fieldIndex = fieldNames2Indices(set.getType, Array(field))(0)

groupSortKeyPositions += fieldIndex
groupSortKeyPositions += Right(field)
groupSortOrders += order
this
}
Expand All @@ -88,14 +86,32 @@ class GroupedDataSet[T: ClassTag](
*/
private def maybeCreateSortedGrouping(): Grouping[T] = {
if (groupSortKeyPositions.length > 0) {
val grouping = new SortedGrouping[T](
set.javaSet,
keys,
groupSortKeyPositions(0),
groupSortOrders(0))
val grouping = groupSortKeyPositions(0) match {
case Left(pos) =>
new SortedGrouping[T](
set.javaSet,
keys,
pos,
groupSortOrders(0))

case Right(field) =>
new SortedGrouping[T](
set.javaSet,
keys,
field,
groupSortOrders(0))

}
// now manually add the rest of the keys
for (i <- 1 until groupSortKeyPositions.length) {
grouping.sortGroup(groupSortKeyPositions(i), groupSortOrders(i))
groupSortKeyPositions(i) match {
case Left(pos) =>
grouping.sortGroup(pos, groupSortOrders(i))

case Right(field) =>
grouping.sortGroup(field, groupSortOrders(i))

}
}
grouping
} else {
Expand Down Expand Up @@ -209,7 +225,7 @@ class GroupedDataSet[T: ClassTag](
}
}
wrap(
new GroupReduceOperator[T, R](createUnsortedGrouping(),
new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
implicitly[TypeInformation[R]], reducer))
}

Expand All @@ -227,7 +243,7 @@ class GroupedDataSet[T: ClassTag](
}
}
wrap(
new GroupReduceOperator[T, R](createUnsortedGrouping(),
new GroupReduceOperator[T, R](maybeCreateSortedGrouping(),
implicitly[TypeInformation[R]], reducer))
}

Expand Down
Loading

0 comments on commit 6be8555

Please sign in to comment.