Skip to content

Commit

Permalink
Simplify Pojo/Tuple/CaseClass comparator extractKeys() method
Browse files Browse the repository at this point in the history
Also fixes a bug with Java/Scala interop: TupleTypeComparator was only checking
for nested Java Tuples and Pojos, not Scala Case classes.
  • Loading branch information
aljoscha committed Oct 9, 2014
1 parent 3eebaa8 commit f562d49
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,7 @@ public PojoComparator<T> duplicate() {
public int extractKeys(Object record, Object[] target, int index) {
int localIndex = index;
for (int i = 0; i < comparators.length; i++) {
if(comparators[i] instanceof CompositeTypeComparator) {
localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex) -1;
} else {
// non-composite case (= atomic). We can assume this to have only one key.
// comparators[i].extractKeys(accessField(keyFields[i], record), target, i);
target[localIndex] = accessField(keyFields[i], record);
}
localIndex++;
localIndex += comparators[i].extractKeys(accessField(keyFields[i], record), target, localIndex);
}
return localIndex - index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,7 @@ public void putNormalizedKey(T value, MemorySegment target, int offset, int numB
public int extractKeys(Object record, Object[] target, int index) {
int localIndex = index;
for(int i = 0; i < comparators.length; i++) {
// handle nested case
if(comparators[i] instanceof TupleComparator || comparators[i] instanceof PojoComparator) {
localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex) -1;
} else {
// flat
target[localIndex] = ((Tuple) record).getField(keyPositions[i]);
}
localIndex++;
localIndex += comparators[i].extractKeys(((Tuple) record).getField(keyPositions[i]), target, localIndex);
}
return localIndex - index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
*/
package org.apache.flink.api.scala.typeutils

import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer}
import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator,
TypeSerializer}
import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
import org.apache.flink.core.memory.MemorySegment
import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException}
Expand Down Expand Up @@ -140,9 +141,16 @@ class CaseClassComparator[T <: Product](
}

def extractKeys(value: AnyRef, target: Array[AnyRef], index: Int) = {
for (i <- 0 until keyPositions.length ) {
target(index + i) = value.asInstanceOf[T].productElement(keyPositions(i)).asInstanceOf[AnyRef]
val in = value.asInstanceOf[T]

var localIndex: Int = index
for (i <- 0 until comparators.length) {
localIndex += comparators(i).extractKeys(
in.productElement(keyPositions(i)),
target,
localIndex)
}
keyPositions.length

localIndex - index
}
}

0 comments on commit f562d49

Please sign in to comment.