diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index dd4a84ba80bd3..46a4cfc4af9cc 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -123,7 +123,7 @@ public DataStream getSecond() { * * @return The type of the first input */ - public TypeInformation getInputType1() { + public TypeInformation getType1() { return dataStream1.getType(); } @@ -132,7 +132,7 @@ public TypeInformation getInputType1() { * * @return The type of the second input */ - public TypeInformation getInputType2() { + public TypeInformation getType2() { return dataStream2.getType(); } @@ -244,10 +244,10 @@ public ConnectedDataStream groupBy(KeySelector keySelector1, public SingleOutputStreamOperator map(CoMapFunction coMapper) { TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coMapper, - CoMapFunction.class, false, true, getInputType1(), getInputType2(), + CoMapFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); - return addCoFunction("Co-Map", outTypeInfo, new CoStreamMap( + return transform("Co-Map", outTypeInfo, new CoStreamMap( clean(coMapper))); } @@ -271,10 +271,10 @@ CoMapFunction.class, false, true, getInputType1(), getInputType2(), CoFlatMapFunction coFlatMapper) { TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper, - CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(), + CoFlatMapFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); - return addCoFunction("Co-Flat Map", outTypeInfo, new CoStreamFlatMap( + return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap( clean(coFlatMapper))); } @@ -297,10 +297,10 @@ CoFlatMapFunction.class, false, true, getInputType1(), getInputType2(), public SingleOutputStreamOperator reduce(CoReduceFunction coReducer) { TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coReducer, - CoReduceFunction.class, false, true, getInputType1(), getInputType2(), + CoReduceFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); - return addCoFunction("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer))); + return transform("Co-Reduce", outTypeInfo, getReduceOperator(clean(coReducer))); } @@ -365,10 +365,10 @@ CoReduceFunction.class, false, true, getInputType1(), getInputType2(), } TypeInformation outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coWindowFunction, - CoWindowFunction.class, false, true, getInputType1(), getInputType2(), + CoWindowFunction.class, false, true, getType1(), getType2(), Utils.getCallLocationName(), true); - return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow( + return transform("Co-Window", outTypeInfo, new CoStreamWindow( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); } @@ -397,20 +397,20 @@ protected CoStreamOperator getReduceOperator( throw new IllegalArgumentException("Slide interval must be positive"); } - return addCoFunction("Co-Window", outTypeInfo, new CoStreamWindow( + return transform("Co-Window", outTypeInfo, new CoStreamWindow( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); } - public SingleOutputStreamOperator addCoFunction(String functionName, + public SingleOutputStreamOperator transform(String functionName, TypeInformation outTypeInfo, CoStreamOperator operator) { @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator returnStream = new SingleOutputStreamOperator( environment, functionName, outTypeInfo, operator); - dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getInputType1(), - getInputType2(), outTypeInfo, functionName); + dataStream1.streamGraph.addCoOperator(returnStream.getId(), operator, getType1(), + getType2(), outTypeInfo, functionName); dataStream1.connectGraph(dataStream1, returnStream.getId(), 1); dataStream1.connectGraph(dataStream2, returnStream.getId(), 2); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 94aca4839d725..f4d49654d2ebb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -93,7 +93,7 @@ *
    *
  • {@link DataStream#map},
  • *
  • {@link DataStream#filter}, or
  • - *
  • {@link DataStream#aggregate}.
  • + *
  • {@link DataStream#sum}.
  • *
* * @param diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java index c6ee36db0952d..53c35e0abe629 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -133,7 +133,7 @@ protected DiscretizedStream timeReduce(ReduceFunction reduceFunction) return reduced.discretizedStream .groupBy(new WindowKey()) .connect(numOfParts.groupBy(0)) - .addCoFunction( + .transform( "CoFlatMap", reduced.discretizedStream.getType(), new CoStreamFlatMap, Tuple2, StreamWindow>( diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 988fdfb42daf5..3e935f5f388fe 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -22,11 +22,10 @@ import java.util.Collection; import java.util.List; -import com.esotericsoftware.kryo.Serializer; - import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -34,6 +33,7 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.MissingTypeInfo; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.Client; @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType; import org.apache.flink.streaming.api.functions.source.FileReadFunction; import org.apache.flink.streaming.api.functions.source.FileSourceFunction; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; @@ -53,11 +54,12 @@ import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamSource; +import com.esotericsoftware.kryo.Serializer; + /** * {@link ExecutionEnvironment} for streaming jobs. An instance of it is * necessary to construct streaming topologies. @@ -420,7 +422,7 @@ public DataStreamSource readTextFile(String filePath, String charsetName public DataStream readFileStream(String filePath, long intervalMillis, WatchType watchType) { DataStream> source = addSource(new FileMonitoringFunction( - filePath, intervalMillis, watchType), null, "File Stream"); + filePath, intervalMillis, watchType), "File Stream"); return source.flatMap(new FileReadFunction()); } @@ -448,7 +450,7 @@ public DataStreamSource fromElements(OUT... data SourceFunction function = new FromElementsFunction(data); - return addSource(function, outTypeInfo, "Elements source"); + return addSource(function, "Elements source").returns(outTypeInfo); } /** @@ -475,7 +477,7 @@ public DataStreamSource fromCollection(Collectio TypeInformation outTypeInfo = TypeExtractor.getForObject(data.iterator().next()); SourceFunction function = new FromElementsFunction(data); - return addSource(function, outTypeInfo, "Collection Source"); + return addSource(function, "Collection Source").returns(outTypeInfo); } /** @@ -508,7 +510,7 @@ public DataStreamSource fromCollection(Collectio */ public DataStreamSource socketTextStream(String hostname, int port, char delimiter, long maxRetry) { - return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), null, + return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream"); } @@ -560,13 +562,13 @@ public DataStreamSource generateSequence(long from, long to) { if (from > to) { throw new IllegalArgumentException("Start of sequence must not be greater than the end"); } - return addSource(new GenSequenceFunction(from, to), null, "Sequence Source"); + return addSource(new GenSequenceFunction(from, to), "Sequence Source"); } private DataStreamSource addFileSource(InputFormat inputFormat, TypeInformation typeInfo) { FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - DataStreamSource returnStream = addSource(function, null, "File Source"); + DataStreamSource returnStream = addSource(function, "File Source"); streamGraph.setInputFormat(returnStream.getId(), inputFormat); return returnStream; } @@ -588,31 +590,7 @@ private DataStreamSource addFileSource(InputFormat inputForma * @return the data stream constructed */ public DataStreamSource addSource(SourceFunction function) { - return addSource(function, null); - } - - /** - * Ads a data source with a custom type information thus opening a - * {@link DataStream}. Only in very special cases does the user need to - * support type information. Otherwise use - * {@link #addSource(SourceFunction)}

By default sources have a - * parallelism of 1. To enable parallel execution, the user defined source - * should implement {@link ParallelSourceFunction} or extend - * {@link RichParallelSourceFunction}. In these cases the resulting source - * will have the parallelism of the environment. To change this afterwards - * call {@link DataStreamSource#setParallelism(int)} - * - * @param function - * the user defined function - * @param outTypeInfo - * the user defined type information for the stream - * @param - * type of the returned stream - * @return the data stream constructed - */ - public DataStreamSource addSource(SourceFunction function, - TypeInformation outTypeInfo) { - return addSource(function, outTypeInfo, "Custom Source"); + return addSource(function, "Custom source"); } /** @@ -623,8 +601,6 @@ public DataStreamSource addSource(SourceFunction function, * * @param function * the user defined function - * @param outTypeInfo - * the user defined type information for the stream * @param sourceName * Name of the data source * @param @@ -632,15 +608,18 @@ public DataStreamSource addSource(SourceFunction function, * @return the data stream constructed */ @SuppressWarnings("unchecked") - private DataStreamSource addSource(SourceFunction function, - TypeInformation outTypeInfo, String sourceName) { + private DataStreamSource addSource(SourceFunction function, String sourceName) { + + TypeInformation outTypeInfo; - if (outTypeInfo == null) { - if (function instanceof GenericSourceFunction) { - outTypeInfo = ((GenericSourceFunction) function).getType(); - } else { + if (function instanceof GenericSourceFunction) { + outTypeInfo = ((GenericSourceFunction) function).getType(); + } else { + try { outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class, function.getClass(), 0, null, null); + } catch (InvalidTypesException e) { + outTypeInfo = (TypeInformation) new MissingTypeInfo("Custom source", e); } } @@ -649,8 +628,8 @@ private DataStreamSource addSource(SourceFunction function, ClosureCleaner.clean(function, true); StreamOperator sourceOperator = new StreamSource(function); - return new DataStreamSource(this, sourceName, outTypeInfo, sourceOperator, - isParallel, sourceName); + return new DataStreamSource(this, sourceName, outTypeInfo, sourceOperator, isParallel, + sourceName); } // -------------------------------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java index f163b9ee82c4d..35cbaba6cc724 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.co.CoReduceFunction; import org.apache.flink.streaming.api.functions.co.CoWindowFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import org.junit.Test; @@ -42,6 +43,12 @@ public class TypeFillTest { public void test() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + try { + env.addSource(new TestSource()).print(); + fail(); + } catch (Exception e) { + } + DataStream source = env.generateSequence(1, 10); try { @@ -76,6 +83,7 @@ public void test() { } catch (Exception e) { } + env.addSource(new TestSource()).returns("Integer"); source.map(new TestMap()).returns(Long.class).print(); source.flatMap(new TestFlatMap()).returns("Long").print(); source.connect(source).map(new TestCoMap()).returns("Integer").print(); @@ -106,6 +114,19 @@ public String map(Long value) throws Exception { } + private class TestSource implements SourceFunction { + + @Override + public void run(Collector collector) throws Exception { + + } + + @Override + public void cancel() { + } + + } + private class TestMap implements MapFunction { @Override public O map(T value) throws Exception { diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala index fbd7a02a6a89a..47d8fd27847eb 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedDataStream.scala @@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala import java.util import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector -import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream} +import org.apache.flink.streaming.api.datastream.{ConnectedDataStream => JavaCStream, DataStream => JavaStream} import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoReduceFunction, CoWindowFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean import org.apache.flink.util.Collector @@ -54,8 +54,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { def map2(in2: IN2): R = clean(fun2)(in2) } - new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], - new CoStreamMap[IN1, IN2, R](comapper))) + map(comapper) } /** @@ -78,8 +77,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { throw new NullPointerException("Map function must not be null.") } - new DataStream(javaStream.addCoFunction("map", implicitly[TypeInformation[R]], - new CoStreamMap[IN1, IN2, R](coMapper))) + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -102,8 +101,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { if (coFlatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } - new DataStream[R](javaStream.addCoFunction("flatMap", implicitly[TypeInformation[R]], - new CoStreamFlatMap[IN1, IN2, R](coFlatMapper))) + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -235,13 +235,13 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * The function used for grouping the second input * @return @return The transformed { @link ConnectedDataStream} */ - def groupBy[K: TypeInformation](fun1: IN1 => _, fun2: IN2 => _): + def groupBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L): ConnectedDataStream[IN1, IN2] = { - val keyExtractor1 = new KeySelector[IN1, Any] { + val keyExtractor1 = new KeySelector[IN1, K] { def getKey(in: IN1) = clean(fun1)(in) } - val keyExtractor2 = new KeySelector[IN2, Any] { + val keyExtractor2 = new KeySelector[IN2, L] { def getKey(in: IN2) = clean(fun2)(in) } @@ -267,9 +267,9 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { if (coReducer == null) { throw new NullPointerException("Reduce function must not be null.") } - - new DataStream[R](javaStream.addCoFunction("coReduce", implicitly[TypeInformation[R]], - new CoStreamReduce[IN1, IN2, R](coReducer))) + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + javaStream.reduce(coReducer).returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -325,12 +325,16 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return The transformed { @link DataStream}. */ def windowReduce[R: TypeInformation: ClassTag](coWindowFunction: - CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long) = { + CoWindowFunction[IN1, IN2, R], windowSize: Long, slideInterval: Long): + DataStream[R] = { if (coWindowFunction == null) { throw new NullPointerException("CoWindow function must no be null") } - - javaStream.windowReduce(coWindowFunction, windowSize, slideInterval) + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + + javaStream.windowReduce(coWindowFunction, windowSize, slideInterval). + returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -351,7 +355,8 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return The transformed { @link DataStream}. */ def windowReduce[R: TypeInformation: ClassTag](coWindower: (Seq[IN1], Seq[IN2], - Collector[R]) => Unit, windowSize: Long, slideInterval: Long) = { + Collector[R]) => Unit, windowSize: Long, slideInterval: Long): + DataStream[R] = { if (coWindower == null) { throw new NullPointerException("CoWindow function must no be null") } @@ -361,7 +366,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { out: Collector[R]): Unit = clean(coWindower)(first, second, out) } - javaStream.windowReduce(coWindowFun, windowSize, slideInterval) + windowReduce(coWindowFun, windowSize, slideInterval) } /** @@ -388,7 +393,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return The type of the first input */ def getInputType1(): TypeInformation[IN1] = { - javaStream.getInputType1 + javaStream.getType1 } /** @@ -397,7 +402,7 @@ class ConnectedDataStream[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { * @return The type of the second input */ def getInputType2(): TypeInformation[IN2] = { - javaStream.getInputType2 + javaStream.getType2 } } diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 5a4b611cfa230..4ccb073373803 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -365,8 +365,8 @@ class DataStream[T](javaStream: JavaStream[T]) { val cleanFun = clean(fun) def map(in: T): R = cleanFun(in) } - - javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper)) + + map(mapper) } /** @@ -377,7 +377,8 @@ class DataStream[T](javaStream: JavaStream[T]) { throw new NullPointerException("Map function must not be null.") } - javaStream.transform("map", implicitly[TypeInformation[R]], new StreamMap[T, R](mapper)) + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -388,8 +389,9 @@ class DataStream[T](javaStream: JavaStream[T]) { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } - javaStream.transform("flatMap", implicitly[TypeInformation[R]], - new StreamFlatMap[T, R](flatMapper)) + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]] } /** @@ -430,12 +432,8 @@ class DataStream[T](javaStream: JavaStream[T]) { if (reducer == null) { throw new NullPointerException("Reduce function must not be null.") } - javaStream match { - case ds: GroupedDataStream[_] => javaStream.transform("reduce", - javaStream.getType(), new StreamGroupedReduce[T](reducer, ds.getKeySelector())) - case _ => javaStream.transform("reduce", javaStream.getType(), - new StreamReduce[T](reducer)) - } + + javaStream.reduce(reducer) } /** @@ -462,13 +460,9 @@ class DataStream[T](javaStream: JavaStream[T]) { if (folder == null) { throw new NullPointerException("Fold function must not be null.") } - javaStream match { - case ds: GroupedDataStream[_] => javaStream.transform("fold", - implicitly[TypeInformation[R]], new StreamGroupedFold[T,R](folder, ds.getKeySelector(), - initialValue, implicitly[TypeInformation[R]])) - case _ => javaStream.transform("fold", implicitly[TypeInformation[R]], - new StreamFold[T,R](folder, initialValue, implicitly[TypeInformation[R]])) - } + + val outType : TypeInformation[R] = implicitly[TypeInformation[R]] + javaStream.fold(initialValue, folder).returns(outType).asInstanceOf[JavaStream[R]] } /** diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 0217793ac6856..c7716ed0c86c9 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -261,7 +261,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { val sourceFunction = new FromElementsFunction[T](scala.collection.JavaConversions .asJavaCollection(data)) - javaEnv.addSource(sourceFunction, typeInfo) + javaEnv.addSource(sourceFunction).returns(typeInfo) } /** @@ -277,7 +277,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { Validate.notNull(function, "Function must not be null.") val cleanFun = StreamExecutionEnvironment.clean(function) val typeInfo = implicitly[TypeInformation[T]] - javaEnv.addSource(cleanFun, typeInfo) + javaEnv.addSource(cleanFun).returns(typeInfo) } /**