Skip to content

Commit

Permalink
[streaming] Added CoBatchGroupReduceInvokable, CoWindowGroupReduceInv…
Browse files Browse the repository at this point in the history
…okable and grouped variants
  • Loading branch information
szape authored and mbalassi committed Sep 20, 2014
1 parent d0dd513 commit b6ffdba
Show file tree
Hide file tree
Showing 23 changed files with 1,473 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoStreamReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
import org.apache.flink.types.TypeInformation;
Expand Down Expand Up @@ -196,7 +196,7 @@ public GroupedConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPos
CoReduceFunction.class, 2);

return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
new CoStreamReduceInvokable<IN1, IN2, OUT>(coReducer));
}

protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;

public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
Expand Down Expand Up @@ -61,7 +61,7 @@ protected GroupedConnectedDataStream(StreamExecutionEnvironment environment,
CoReduceFunction.class, 2);

return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
Expand Down Expand Up @@ -69,7 +69,7 @@ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return addFunction("groupReduce", reducer, new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new FunctionTypeWrapper<OUT>(reducer,
ReduceFunction.class, 0), new GroupReduceInvokable<OUT>(reducer, keyPosition));
ReduceFunction.class, 0), new GroupedReduceInvokable<OUT>(reducer, keyPosition));
}

/**
Expand Down Expand Up @@ -240,7 +240,7 @@ protected GroupedDataStream(GroupedDataStream<OUT> dataStream) {
@Override
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {

GroupReduceInvokable<OUT> invokable = new GroupReduceInvokable<OUT>(aggregate, keyPosition);
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(aggregate, keyPosition);

SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("groupReduce", aggregate,
null, null, invokable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.state.MutableTableState;

public class GroupReduceInvokable<IN> extends StreamReduceInvokable<IN> {
public class GroupedReduceInvokable<IN> extends StreamReduceInvokable<IN> {
private static final long serialVersionUID = 1L;

private int keyPosition;
private MutableTableState<Object, IN> values;
private IN reduced;

public GroupReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
public GroupedReduceInvokable(ReduceFunction<IN> reducer, int keyPosition) {
super(reducer);
this.keyPosition = keyPosition;
values = new MutableTableState<Object, IN>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;

public class CoBatchGroupReduceInvokable<IN1, IN2, OUT> extends CoGroupReduceInvokable<IN1, IN2, OUT> {

private static final long serialVersionUID = 1L;
protected long startCounter1;
protected long startCounter2;
protected long endCounter1;
protected long endCounter2;

public CoBatchGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
super(reduceFunction, windowSize1, windowSize2, slideInterval1, slideInterval2);
}

@Override
protected void handleStream1() throws Exception {
circularList1.add(reuse1);
if (windowStart1()) {
circularList1.newSlide();
}
if (windowEnd1()) {
reduce1();
circularList1.shiftWindow();
}
}

@Override
protected void handleStream2() throws Exception {
circularList2.add(reuse2);
if (windowStart2()) {
circularList2.newSlide();
}
if (windowEnd2()) {
reduce2();
circularList2.shiftWindow();
}
}

@Override
protected boolean windowStart1() throws Exception {
if (startCounter1 == slideInterval1) {
startCounter1 = 0;
return true;
}
return false;
}

@Override
protected boolean windowStart2() throws Exception {
if (startCounter2 == slideInterval2) {
startCounter2 = 0;
return true;
}
return false;
}

@Override
protected boolean windowEnd1() throws Exception {
if (endCounter1 == windowSize1) {
endCounter1 -= slideInterval1;
return true;
}
return false;
}

@Override
protected boolean windowEnd2() throws Exception {
if (endCounter2 == windowSize2) {
endCounter2 -= slideInterval2;
return true;
}
return false;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
startCounter1 = 0;
startCounter2 = 0;
endCounter1 = 0;
endCounter2 = 0;
}

@Override
protected void initialize1() {
startCounter1++;
endCounter1++;
}

@Override
protected void initialize2() {
startCounter2++;
endCounter2++;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,69 +17,123 @@

package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.state.MutableTableState;
import java.util.Iterator;

public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoReduceInvokable<IN1, IN2, OUT> {
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoGroupReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.CircularFifoList;
import org.apache.flink.streaming.state.StreamIterator;

public abstract class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;

private int keyPosition1;
private int keyPosition2;
private MutableTableState<Object, IN1> values1;
private MutableTableState<Object, IN2> values2;
IN1 reduced1;
IN2 reduced2;

public CoGroupReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
int keyPosition2) {
super(coReducer);
this.coReducer = coReducer;
this.keyPosition1 = keyPosition1;
this.keyPosition2 = keyPosition2;
values1 = new MutableTableState<Object, IN1>();
values2 = new MutableTableState<Object, IN2>();
protected CoGroupReduceFunction<IN1, IN2, OUT> coReducer;
protected StreamIterator<IN1> userIterator1;
protected StreamIterator<IN2> userIterator2;
protected Iterable<IN1> userIterable1;
protected Iterable<IN2> userIterable2;
protected long windowSize1;
protected long windowSize2;
protected long slideInterval1;
protected long slideInterval2;
protected CircularFifoList<StreamRecord<IN1>> circularList1;
protected CircularFifoList<StreamRecord<IN2>> circularList2;
protected long WindowStartTime1;
protected long WindowStartTime2;
protected long WindowEndTime1;
protected long WindowEndTime2;

public CoGroupReduceInvokable(CoGroupReduceFunction<IN1, IN2, OUT> reduceFunction,
long windowSize1, long windowSize2, long slideInterval1, long slideInterval2) {
super(reduceFunction);
this.coReducer = reduceFunction;
this.userIterator1 = new StreamIterator<IN1>();
this.userIterator2 = new StreamIterator<IN2>();
this.windowSize1 = windowSize1;
this.windowSize2 = windowSize2;
this.slideInterval1 = slideInterval1;
this.slideInterval2 = slideInterval2;
this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
this.circularList2 = new CircularFifoList<StreamRecord<IN2>>();
}

@Override
protected void mutableInvoke() throws Exception {
throw new RuntimeException("Reducing mutable sliding batch is not supported.");
}

@Override
public void handleStream1() throws Exception {
Object key = reuse1.getField(keyPosition1);
currentValue1 = values1.get(key);
nextValue1 = reuse1.getObject();
if (currentValue1 != null) {
callUserFunctionAndLogException1();
values1.put(key, reduced1);
collector.collect(coReducer.map1(reduced1));
} else {
values1.put(key, nextValue1);
collector.collect(coReducer.map1(nextValue1));
protected void handleStream1() throws Exception {
while (windowStart1()) {
circularList1.newSlide();
}
while (windowEnd1()) {
reduce1();
circularList1.shiftWindow();
}
circularList1.add(reuse1);
}

@Override
public void handleStream2() throws Exception {
Object key = reuse2.getField(keyPosition2);
currentValue2 = values2.get(key);
nextValue2 = reuse2.getObject();
if (currentValue2 != null) {
callUserFunctionAndLogException2();
values2.put(key, reduced2);
collector.collect(coReducer.map2(reduced2));
} else {
values2.put(key, nextValue2);
collector.collect(coReducer.map2(nextValue2));
protected void handleStream2() throws Exception {
while (windowStart2()) {
circularList2.newSlide();
}
while (windowEnd2()) {
reduce2();
circularList2.shiftWindow();
}
circularList2.add(reuse2);
}

protected void reduce1() throws Exception {
userIterator1.load(circularList1.getIterator());
callUserFunctionAndLogException1();
}

protected void reduce2() throws Exception {
userIterator2.load(circularList2.getIterator());
callUserFunctionAndLogException2();
}

@Override
protected void callUserFunction1() throws Exception {
reduced1 = coReducer.reduce1(currentValue1, nextValue1);

coReducer.reduce1(userIterable1, collector);
}

@Override
protected void callUserFunction2() throws Exception {
reduced2 = coReducer.reduce2(currentValue2, nextValue2);
coReducer.reduce2(userIterable2, collector);
}

protected abstract boolean windowStart1() throws Exception;

protected abstract boolean windowStart2() throws Exception;

protected abstract boolean windowEnd1() throws Exception;

protected abstract boolean windowEnd2() throws Exception;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
userIterable1 = new BatchIterable1();
userIterable2 = new BatchIterable2();
}

protected class BatchIterable1 implements Iterable<IN1> {
@Override
public Iterator<IN1> iterator() {
return userIterator1;
}
}

protected class BatchIterable2 implements Iterable<IN2> {
@Override
public Iterator<IN2> iterator() {
return userIterator2;
}
}

}
Loading

0 comments on commit b6ffdba

Please sign in to comment.