Skip to content

Commit

Permalink
[streaming] CoWindow operator rework + stream iteration example added
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored and mbalassi committed Oct 7, 2014
1 parent 97a7322 commit ec82d97
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -30,14 +29,15 @@
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
Expand Down Expand Up @@ -167,11 +167,12 @@ public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2)
/**
* Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
* maps the output to a common type. The transformation calls a
* {@link CoFlatMapFunction#flatMap1} for each element of the first input and
* {@link CoFlatMapFunction#flatMap2} for each element of the second input. Each
* CoFlatMapFunction call returns any number of elements including none. The
* user can also extend {@link RichFlatMapFunction} to gain access to other
* features provided by the {@link RichFuntion} interface.
* {@link CoFlatMapFunction#flatMap1} for each element of the first input
* and {@link CoFlatMapFunction#flatMap2} for each element of the second
* input. Each CoFlatMapFunction call returns any number of elements
* including none. The user can also extend {@link RichFlatMapFunction} to
* gain access to other features provided by the {@link RichFuntion}
* interface.
*
* @param coFlatMapper
* The CoFlatMapFunction used to jointly transform the two input
Expand Down Expand Up @@ -226,13 +227,13 @@ public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2)
}

/**
* Applies a CoGroup transformation on the connected DataStreams. The
* transformation calls the {@link CoGroupFunction#coGroupache} method for
* for time aligned windows of the two data streams. System time is used as
* Applies a CoWindow transformation on the connected DataStreams. The
* transformation calls the {@link CoWindowFunction#coWindow} method for for
* time aligned windows of the two data streams. System time is used as
* default to compute windows.
*
* @param coGroupFunction
* The {@link CoGroupFunction} that will be applied for the time
* @param coWindowFunction
* The {@link CoWindowFunction} that will be applied for the time
* windows.
* @param windowSize
* Size of the windows that will be aligned for both streams in
Expand All @@ -243,20 +244,20 @@ public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2)
*
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize, long slideInterval) {
return windowReduceGroup(coGroupFunction, windowSize, slideInterval,
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval) {
return windowReduce(coWindowFunction, windowSize, slideInterval,
new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
}

/**
* Applies a CoGroup transformation on the connected DataStreams. The
* transformation calls the {@link CoGroupFunction#coGroupache} method for
* for time aligned windows of the two data streams. The user can implement
* Applies a CoWindow transformation on the connected DataStreams. The
* transformation calls the {@link CoWindowFunction#coWindow} method for
* time aligned windows of the two data streams. The user can implement
* their own time stamps or use the system time by default.
*
* @param coGroupFunction
* The {@link CoGroupFunction} that will be applied for the time
* @param coWindowFunction
* The {@link CoWindowFunction} that will be applied for the time
* windows.
* @param windowSize
* Size of the windows that will be aligned for both streams. If
Expand All @@ -272,8 +273,8 @@ public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2)
* User defined time stamps for the second input.
* @return The transformed {@link DataStream}.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize, long slideInterval,
public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval,
TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {

if (windowSize < 1) {
Expand All @@ -283,15 +284,15 @@ public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2)
throw new IllegalArgumentException("Slide interval must be positive");
}

FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coGroupFunction,
CoGroupFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coGroupFunction,
CoGroupFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coGroupFunction,
CoGroupFunction.class, 2);
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coWindowFunction,
CoWindowFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coWindowFunction,
CoWindowFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coWindowFunction,
CoWindowFunction.class, 2);

return addCoFunction("coWindowReduce", coGroupFunction, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoGroupInvokable<IN1, IN2, OUT>(coGroupFunction, windowSize,
return addCoFunction("coWindowReduce", coWindowFunction, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoWindowInvokable<IN1, IN2, OUT>(coWindowFunction, windowSize,
slideInterval, timestamp1, timestamp2));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.function.co;

import java.io.Serializable;
import java.util.List;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.util.Collector;

public interface CoWindowFunction<IN1, IN2, O> extends Function, Serializable {

public void coWindow(List<IN1> first, List<IN2> second, Collector<O> out) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.function.co;

import java.util.List;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.util.Collector;

public abstract class RichCoWindowFunction<IN1, IN2, O> extends AbstractRichFunction implements
CoWindowFunction<IN1, IN2, O> {

private static final long serialVersionUID = 1L;

@Override
public abstract void coWindow(List<IN1> first, List<IN2> second, Collector<O> out)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public String toString() {

@Override
public void open(Configuration config) throws Exception{
super.open(config);
serializer = inSerializer.getObjectSerializer();
this.batch = new StreamBatch();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Iterator;

import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
Expand All @@ -46,8 +45,6 @@ public class CoBatchReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
protected StreamBatch<IN2> batch2;
protected StreamBatch<IN1> currentBatch1;
protected StreamBatch<IN2> currentBatch2;
protected TypeSerializer<IN1> serializer1;
protected TypeSerializer<IN2> serializer2;

public CoBatchReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long batchSize1,
long batchSize2, long slideSize1, long slideSize2) {
Expand Down Expand Up @@ -173,8 +170,6 @@ public void open(Configuration config) throws Exception {
super.open(config);
this.batch1 = new StreamBatch<IN1>(batchSize1, slideSize1);
this.batch2 = new StreamBatch<IN2>(batchSize2, slideSize2);
this.serializer1 = srSerializer1.getObjectSerializer();
this.serializer2 = srSerializer2.getObjectSerializer();
}

public void reduceToBuffer1(StreamRecord<IN1> next, StreamBatch<IN1> streamBatch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.invokable.operator.co;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
Expand All @@ -41,6 +42,8 @@ public CoInvokable(Function userFunction) {
protected StreamRecord<IN2> reuse2;
protected StreamRecordSerializer<IN1> srSerializer1;
protected StreamRecordSerializer<IN2> srSerializer2;
protected TypeSerializer<IN1> serializer1;
protected TypeSerializer<IN2> serializer2;

public void initialize(Collector<OUT> collector,
CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
Expand All @@ -55,6 +58,8 @@ public void initialize(Collector<OUT> collector,
this.srSerializer1 = serializer1;
this.srSerializer2 = serializer2;
this.isMutable = isMutable;
this.serializer1 = srSerializer1.getObjectSerializer();
this.serializer2 = srSerializer2.getObjectSerializer();
}

protected void resetReuseAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
package org.apache.flink.streaming.api.invokable.operator.co;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.streaming.api.function.co.CoWindowFunction;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.CircularFifoList;

public class CoGroupInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;

protected CoGroupFunction<IN1, IN2, OUT> coGroupFunction;
protected CoWindowFunction<IN1, IN2, OUT> coWindowFunction;
protected long windowSize;
protected long slideSize;
protected CircularFifoList<StreamRecord<IN1>> circularList1;
Expand All @@ -41,10 +43,10 @@ public class CoGroupInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
protected long startTime;
protected long nextRecordTime;

public CoGroupInvokable(CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize,
public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
long slideInterval, TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
super(coGroupFunction);
this.coGroupFunction = coGroupFunction;
super(coWindowFunction);
this.coWindowFunction = coWindowFunction;
this.windowSize = windowSize;
this.slideSize = slideInterval;
this.circularList1 = new CircularFifoList<StreamRecord<IN1>>();
Expand Down Expand Up @@ -73,8 +75,19 @@ protected void handleStream2() throws Exception {

@Override
protected void callUserFunction() throws Exception {
if(!window.circularList1.isEmpty() || !window.circularList2.isEmpty()){
coGroupFunction.coGroup(window.getIterable1(), window.getIterable2(), collector);

List<IN1> first = new ArrayList<IN1>();
List<IN2> second = new ArrayList<IN2>();

for (IN1 element : window.circularList1.getElements()) {
first.add(serializer1.copy(element));
}
for (IN2 element : window.circularList2.getElements()) {
second.add(serializer2.copy(element));
}

if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
coWindowFunction.coWindow(first, second, collector);
}
}

Expand Down Expand Up @@ -167,18 +180,19 @@ public String toString() {
}

@Override
public void close() {
public void close() throws Exception {
if (!window.miniBatchEnd()) {
callUserFunctionAndLogException();
}
super.close();
}

@Override
protected void callUserFunction1() throws Exception {
}

@Override
protected void callUserFunction2() throws Exception {
protected void callUserFunction2() throws Exception {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/**
Expand Down Expand Up @@ -73,6 +74,11 @@ public void shiftWindow(int numberOfSlides) {
}

}

@SuppressWarnings("unchecked")
public List<T> getElements(){
return (List<T>) queue;
}

public Iterator<T> getIterator() {
return queue.iterator();
Expand Down
Loading

0 comments on commit ec82d97

Please sign in to comment.