Skip to content

Commit

Permalink
[FLINK-14794][runtime] Fixing KeyedStream#transform to properly relay
Browse files Browse the repository at this point in the history
type information if invoked with factories.
  • Loading branch information
Arvid Heise authored and aljoscha committed Nov 15, 2019
1 parent 1f28a24 commit 749086d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1244,10 +1244,11 @@ public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperatorFactory<T, R> operatorFactory) {

return doTransform(operatorName, outTypeInfo, operatorFactory);
}

private <R> SingleOutputStreamOperator<R> doTransform(
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
Expand Down Expand Up @@ -256,11 +256,12 @@ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
// ------------------------------------------------------------------------

@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName,
TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
protected <R> SingleOutputStreamOperator<R> doTransform(
final String operatorName,
final TypeInformation<R> outTypeInfo,
final StreamOperatorFactory<R> operatorFactory) {

SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator);
SingleOutputStreamOperator<R> returnStream = super.doTransform(operatorName, outTypeInfo, operatorFactory);

// inject the key selector and key type
OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
Expand Down

0 comments on commit 749086d

Please sign in to comment.