diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java index 4bb022a67f8f1..e6c504222b1a1 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java @@ -312,10 +312,11 @@ public void addIterationSink(String componentName, String iterationTail, String int parallelism, long waitTime) { addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism); + iterationIds.put(componentName, iterationID); iterationIDtoSinkName.put(iterationID, componentName); + setBytesFrom(iterationTail, componentName); - // setInTypeWrappersFrom(iterationTail, componentName); iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime); if (LOG.isDebugEnabled()) { @@ -576,44 +577,21 @@ public void setOutputSelector(String componentName, byte[] serializedOutputS * to */ public void setBytesFrom(String from, String to) { - operatorNames.put(to, operatorNames.get(from)); serializedFunctions.put(to, serializedFunctions.get(from)); - setTypeWrappersFrom(from, to); - } - - public void setTypeWrappersFrom(String from, String to) { - setInToOutTypeWrappersFrom(from, to); - setOutToOutTypeWrappersFrom(from, to); - } - - public void setInToOutTypeWrappersFrom(String from, String to) { - // TODO rename function typeWrapperIn1.put(to, typeWrapperOut1.get(from)); typeWrapperIn2.put(to, typeWrapperOut2.get(from)); - } - - public void setOutToOutTypeWrappersFrom(String from, String to) { - // TODO rename function typeWrapperOut1.put(to, typeWrapperOut1.get(from)); typeWrapperOut2.put(to, typeWrapperOut2.get(from)); } - public void setInToInTypeWrappersFrom(String from, String to) { - // TODO rename function - typeWrapperIn1.put(to, typeWrapperIn1.get(from)); - typeWrapperIn2.put(to, typeWrapperIn2.get(from)); - } - public TypeInformation getInTypeInfo(String id) { - // TODO System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id)); return typeWrapperIn1.get(id).getTypeInfo(); } public TypeInformation getOutTypeInfo(String id) { - // TODO return typeWrapperOut1.get(id).getTypeInfo(); } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java index 0aa5de6c06013..bcedac93f746c 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java @@ -202,9 +202,8 @@ public TypeInformation getOutputType() { BatchReduceInvokable invokable = getReduceInvokable(aggregate); SingleOutputStreamOperator returnStream = dataStream.addFunction("batchReduce", - aggregate, null, null, invokable); + aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable); - dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId()); return returnStream; } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index bebda9133cee5..0e1ae5756126d 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -47,6 +47,7 @@ import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis; import org.apache.flink.streaming.api.invokable.SinkInvokable; import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable; +import org.apache.flink.streaming.api.invokable.operator.CounterInvokable; import org.apache.flink.streaming.api.invokable.operator.FilterInvokable; import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable; import org.apache.flink.streaming.api.invokable.operator.MapInvokable; @@ -60,6 +61,7 @@ import org.apache.flink.streaming.partitioner.ShufflePartitioner; import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; +import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper; import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper; import org.apache.flink.types.TypeInformation; @@ -86,7 +88,7 @@ public class DataStream { protected List userDefinedNames; protected boolean selectAll; protected StreamPartitioner partitioner; - protected TypeSerializerWrapper outTypeWrapper; + protected final TypeSerializerWrapper outTypeWrapper; protected List> mergedStreams; protected final JobGraphBuilder jobGraphBuilder; @@ -556,15 +558,26 @@ public WindowDataStream window(long windowSize) { public SingleOutputStreamOperator max() { return max(0); } + + /** + * Applies an aggregation that gives the count of the data point. + * + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator count() { + TypeSerializerWrapper inTypeWrapper = outTypeWrapper; + TypeSerializerWrapper outTypeWrapper = new ObjectTypeWrapper(new Long(0)); + + return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable()); + } protected SingleOutputStreamOperator aggregate(AggregationFunction aggregate) { StreamReduceInvokable invokable = new StreamReduceInvokable(aggregate); - SingleOutputStreamOperator returnStream = addFunction("reduce", aggregate, null, - null, invokable); + SingleOutputStreamOperator returnStream = addFunction("reduce", aggregate, outTypeWrapper, + outTypeWrapper, invokable); - this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId()); return returnStream; } @@ -599,9 +612,7 @@ public WindowDataStream window(long windowSize) { public DataStreamSink print() { DataStream inputStream = this.copy(); PrintSinkFunction printFunction = new PrintSinkFunction(); - DataStreamSink returnStream = addSink(inputStream, printFunction, null); - - jobGraphBuilder.setInToOutTypeWrappersFrom(inputStream.getId(), returnStream.getId()); + DataStreamSink returnStream = addSink(inputStream, printFunction, outTypeWrapper); return returnStream; } @@ -721,8 +732,7 @@ public DataStreamSink writeAsText(String path, int batchSize, OUT endTuple) private DataStreamSink writeAsText(DataStream inputStream, String path, WriteFormatAsText format, long millis, OUT endTuple) { DataStreamSink returnStream = addSink(inputStream, new WriteSinkFunctionByMillis( - path, format, millis, endTuple), null); - jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + path, format, millis, endTuple), inputStream.outTypeWrapper); jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } @@ -749,8 +759,7 @@ private DataStreamSink writeAsText(DataStream inputStream, String path private DataStreamSink writeAsText(DataStream inputStream, String path, WriteFormatAsText format, int batchSize, OUT endTuple) { DataStreamSink returnStream = addSink(inputStream, - new WriteSinkFunctionByBatches(path, format, batchSize, endTuple), null); - jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + new WriteSinkFunctionByBatches(path, format, batchSize, endTuple), inputStream.outTypeWrapper); jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } @@ -873,8 +882,7 @@ public DataStreamSink writeAsCsv(String path, int batchSize, OUT endTuple) private DataStreamSink writeAsCsv(DataStream inputStream, String path, WriteFormatAsCsv format, long millis, OUT endTuple) { DataStreamSink returnStream = addSink(inputStream, new WriteSinkFunctionByMillis( - path, format, millis, endTuple)); - jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + path, format, millis, endTuple), inputStream.outTypeWrapper); jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } @@ -901,8 +909,7 @@ private DataStreamSink writeAsCsv(DataStream inputStream, String path, private DataStreamSink writeAsCsv(DataStream inputStream, String path, WriteFormatAsCsv format, int batchSize, OUT endTuple) { DataStreamSink returnStream = addSink(inputStream, - new WriteSinkFunctionByBatches(path, format, batchSize, endTuple), null); - jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId()); + new WriteSinkFunctionByBatches(path, format, batchSize, endTuple), inputStream.outTypeWrapper); jobGraphBuilder.setMutability(returnStream.getId(), false); return returnStream; } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 30826d3282493..e30d31601d386 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -243,9 +243,8 @@ protected GroupedDataStream(GroupedDataStream dataStream) { GroupedReduceInvokable invokable = new GroupedReduceInvokable(aggregate, keyPosition); SingleOutputStreamOperator returnStream = addFunction("groupReduce", aggregate, - null, null, invokable); + outTypeWrapper, outTypeWrapper, invokable); - this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId()); return returnStream; } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java new file mode 100644 index 0000000000000..29903b1bdf106 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokable.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable; + +public class CounterInvokable extends StreamOperatorInvokable { + private static final long serialVersionUID = 1L; + + Long count = 0L; + + public CounterInvokable() { + super(null); + } + + @Override + protected void immutableInvoke() throws Exception { + while ((reuse = recordIterator.next(reuse)) != null) { + callUserFunctionAndLogException(); + resetReuse(); + } + } + + @Override + protected void mutableInvoke() throws Exception { + while ((reuse = recordIterator.next(reuse)) != null) { + callUserFunctionAndLogException(); + } + } + + @Override + protected void callUserFunction() throws Exception { + collector.collect(++count); + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java new file mode 100644 index 0000000000000..2124eb7d28dc1 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.operator; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; + +import org.apache.flink.streaming.util.MockInvokable; +import org.junit.Test; + +public class CounterInvokableTest { + + @Test + public void counterTest() { + CounterInvokable invokable = new CounterInvokable(); + + List expected = Arrays.asList(1L, 2L, 3L); + List actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three")); + + assertEquals(expected, actual); + } +}