Skip to content

Commit

Permalink
[streaming] Added counter aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
ghermann authored and mbalassi committed Sep 20, 2014
1 parent b6ffdba commit 03a28cb
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,11 @@ public void addIterationSink(String componentName, String iterationTail, String
int parallelism, long waitTime) {

addComponent(componentName, StreamIterationSink.class, null, null, null, parallelism);

iterationIds.put(componentName, iterationID);
iterationIDtoSinkName.put(iterationID, componentName);

setBytesFrom(iterationTail, componentName);
// setInTypeWrappersFrom(iterationTail, componentName);
iterationWaitTime.put(iterationIDtoSinkName.get(iterationID), waitTime);

if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -576,44 +577,21 @@ public <T> void setOutputSelector(String componentName, byte[] serializedOutputS
* to
*/
public void setBytesFrom(String from, String to) {

operatorNames.put(to, operatorNames.get(from));
serializedFunctions.put(to, serializedFunctions.get(from));

setTypeWrappersFrom(from, to);
}

public void setTypeWrappersFrom(String from, String to) {
setInToOutTypeWrappersFrom(from, to);
setOutToOutTypeWrappersFrom(from, to);
}

public void setInToOutTypeWrappersFrom(String from, String to) {
// TODO rename function
typeWrapperIn1.put(to, typeWrapperOut1.get(from));
typeWrapperIn2.put(to, typeWrapperOut2.get(from));
}

public void setOutToOutTypeWrappersFrom(String from, String to) {
// TODO rename function
typeWrapperOut1.put(to, typeWrapperOut1.get(from));
typeWrapperOut2.put(to, typeWrapperOut2.get(from));
}

public void setInToInTypeWrappersFrom(String from, String to) {
// TODO rename function
typeWrapperIn1.put(to, typeWrapperIn1.get(from));
typeWrapperIn2.put(to, typeWrapperIn2.get(from));
}

public TypeInformation<?> getInTypeInfo(String id) {
// TODO
System.out.println("DEBUG TypeInfo " + typeWrapperIn1.get(id));
return typeWrapperIn1.get(id).getTypeInfo();
}

public TypeInformation<?> getOutTypeInfo(String id) {
// TODO
return typeWrapperOut1.get(id).getTypeInfo();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ public TypeInformation<OUT> getOutputType() {
BatchReduceInvokable<OUT> invokable = getReduceInvokable(aggregate);

SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
aggregate, null, null, invokable);
aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);

dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId());
return returnStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
Expand All @@ -60,6 +61,7 @@
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
import org.apache.flink.types.TypeInformation;

Expand All @@ -86,7 +88,7 @@ public class DataStream<OUT> {
protected List<String> userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
protected TypeSerializerWrapper<OUT> outTypeWrapper;
protected final TypeSerializerWrapper<OUT> outTypeWrapper;
protected List<DataStream<OUT>> mergedStreams;

protected final JobGraphBuilder jobGraphBuilder;
Expand Down Expand Up @@ -556,15 +558,26 @@ public WindowDataStream<OUT> window(long windowSize) {
public SingleOutputStreamOperator<OUT, ?> max() {
return max(0);
}

/**
* Applies an aggregation that gives the count of the data point.
*
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<Long, ?> count() {
TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));

return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable<OUT>());
}

protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {

StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);

SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, null,
null, invokable);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, outTypeWrapper,
outTypeWrapper, invokable);

this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
return returnStream;
}

Expand Down Expand Up @@ -599,9 +612,7 @@ public WindowDataStream<OUT> window(long windowSize) {
public DataStreamSink<OUT> print() {
DataStream<OUT> inputStream = this.copy();
PrintSinkFunction<OUT> printFunction = new PrintSinkFunction<OUT>();
DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, null);

jobGraphBuilder.setInToOutTypeWrappersFrom(inputStream.getId(), returnStream.getId());
DataStreamSink<OUT> returnStream = addSink(inputStream, printFunction, outTypeWrapper);

return returnStream;
}
Expand Down Expand Up @@ -721,8 +732,7 @@ public DataStreamSink<OUT> writeAsText(String path, int batchSize, OUT endTuple)
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
path, format, millis, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
path, format, millis, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
Expand All @@ -749,8 +759,7 @@ private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
Expand Down Expand Up @@ -873,8 +882,7 @@ public DataStreamSink<OUT> writeAsCsv(String path, int batchSize, OUT endTuple)
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, long millis, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream, new WriteSinkFunctionByMillis<OUT>(
path, format, millis, endTuple));
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
path, format, millis, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
Expand All @@ -901,8 +909,7 @@ private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), null);
jobGraphBuilder.setBytesFrom(inputStream.getId(), returnStream.getId());
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,8 @@ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);

SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
null, null, invokable);
outTypeWrapper, outTypeWrapper, invokable);

this.jobGraphBuilder.setTypeWrappersFrom(getId(), returnStream.getId());
return returnStream;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.invokable.operator;

import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;

public class CounterInvokable<IN> extends StreamOperatorInvokable<IN, Long> {
private static final long serialVersionUID = 1L;

Long count = 0L;

public CounterInvokable() {
super(null);
}

@Override
protected void immutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
callUserFunctionAndLogException();
resetReuse();
}
}

@Override
protected void mutableInvoke() throws Exception {
while ((reuse = recordIterator.next(reuse)) != null) {
callUserFunctionAndLogException();
}
}

@Override
protected void callUserFunction() throws Exception {
collector.collect(++count);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.invokable.operator;

import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.List;

import org.apache.flink.streaming.util.MockInvokable;
import org.junit.Test;

public class CounterInvokableTest {

@Test
public void counterTest() {
CounterInvokable<String> invokable = new CounterInvokable<String>();

List<Long> expected = Arrays.asList(1L, 2L, 3L);
List<Long> actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three"));

assertEquals(expected, actual);
}
}

0 comments on commit 03a28cb

Please sign in to comment.