Skip to content

Commit

Permalink
[streaming] reduce operator added to DataStream and ConnectedDataStre…
Browse files Browse the repository at this point in the history
…am + grouped batch and windowReduce operators reworked
  • Loading branch information
gyfora authored and StephanEwen committed Aug 29, 2014
1 parent cad38ed commit 90ad6b0
Show file tree
Hide file tree
Showing 12 changed files with 416 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;

Expand Down Expand Up @@ -152,6 +154,30 @@ public GroupedConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPos
return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
}

/**
* Applies a CoReduce transformation on the data stream. The transformation calls
* {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
* each element of the first input and {@link CoReduceFunction#reduce2} and
* {@link CoReduceFunction#map2} for each element of the second input.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every two
* element of each input DataStream.
* @return The transformed DataStream.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {

FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoReduceFunction.class, 2);

return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
}

protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.RichFilterFunction;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichGroupReduceFunction;
import org.apache.flink.api.java.functions.RichMapFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
Expand All @@ -46,6 +48,7 @@
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
Expand Down Expand Up @@ -289,6 +292,20 @@ public DataStream<OUT> distribute() {
return addFunction("flatMap", flatMapper, inTypeWrapper,
outTypeWrapper, new FlatMapInvokable<OUT, R>(flatMapper));
}

/**
* Applies a reduce transformation on the data stream. The user can also extend the {@link RichReduceFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
*
* @param reducer
* The {@link ReduceFunction} that will be called for every
* element of the input values.
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return addFunction("reduce", reducer, new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0),
new FunctionTypeWrapper<OUT>(reducer, ReduceFunction.class, 0), new StreamReduceInvokable<OUT>(reducer));
}

public GroupedDataStream<OUT> groupBy(int keyPosition) {
return new GroupedDataStream<OUT>(this, keyPosition);
Expand Down
Loading

0 comments on commit 90ad6b0

Please sign in to comment.