Skip to content

Commit

Permalink
[FLINK-2812] [streaming] KeySelectorUtil interacts well with type ext…
Browse files Browse the repository at this point in the history
…raction

The interaction is tested in the AggregationFunctionTest and the scala DataStreamTest amongst others.

Closes apache#1155
  • Loading branch information
mbalassi committed Oct 6, 2015
1 parent c414ea9 commit e494c27
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;

/**
* Utility class that contains helper methods to manipulating {@link KeySelector} for streaming.
Expand All @@ -47,12 +49,14 @@ public static <X> KeySelector<X, Tuple> getSelectorForKeys(Keys<X> keys, TypeInf

// use ascending order here, the code paths for that are usually a slight bit faster
boolean[] orders = new boolean[numKeyFields];
TypeInformation[] typeInfos = new TypeInformation[numKeyFields];
for (int i = 0; i < numKeyFields; i++) {
orders[i] = true;
typeInfos[i] = compositeType.getTypeAt(logicalKeyPositions[i]);
}

TypeComparator<X> comparator = compositeType.createComparator(logicalKeyPositions, orders, 0, executionConfig);
return new ComparableKeySelector<X>(comparator, numKeyFields);
return new ComparableKeySelector<>(comparator, numKeyFields, new TupleTypeInfo<>(typeInfos));
}


Expand All @@ -70,7 +74,7 @@ public static <X, K> KeySelector<X, K> getSelectorForOneKey(Keys<X> keys, Partit

TypeComparator<X> comparator = ((CompositeType<X>) typeInfo).createComparator(
logicalKeyPositions, new boolean[1], 0, executionConfig);
return new OneKeySelector<X, K>(comparator);
return new OneKeySelector<>(comparator);
}

/**
Expand Down Expand Up @@ -111,21 +115,23 @@ public K getKey(IN value) throws Exception {
*
* @param <IN> The type from which the key is extracted.
*/
public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple> {
public static final class ComparableKeySelector<IN> implements KeySelector<IN, Tuple>, ResultTypeQueryable<Tuple> {

private static final long serialVersionUID = 1L;

private final TypeComparator<IN> comparator;
private final int keyLength;
private final TupleTypeInfo tupleTypeInfo;

/** Reusable array to hold the key objects. Since this is initially empty (all positions
* are null), it does not have any serialization problems */
@SuppressWarnings("NonSerializableFieldInSerializableClass")
private final Object[] keyArray;

public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength) {
public ComparableKeySelector(TypeComparator<IN> comparator, int keyLength, TupleTypeInfo tupleTypeInfo) {
this.comparator = comparator;
this.keyLength = keyLength;
this.tupleTypeInfo = tupleTypeInfo;
keyArray = new Object[keyLength];
}

Expand All @@ -139,6 +145,10 @@ public Tuple getKey(IN value) throws Exception {
return key;
}

@Override
public TypeInformation<Tuple> getProducedType() {
return tupleTypeInfo;
}
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.scala.operators.ScalaCsvOutputFormat
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.collector.selector.OutputSelector
Expand Down Expand Up @@ -230,8 +231,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K] = {

val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.keyBy(keyExtractor)
}
Expand All @@ -256,8 +258,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {

val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.partitionByHash(keyExtractor)
}
Expand Down Expand Up @@ -293,8 +296,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
: DataStream[T] = {
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] {
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.partitionCustom(partitioner, keyExtractor)
}
Expand Down

0 comments on commit e494c27

Please sign in to comment.