Skip to content

Commit

Permalink
[FLINK-1619] [FLINK-1620] Grouped sliding prereducers added for Time …
Browse files Browse the repository at this point in the history
…and Count

Closes apache#465
  • Loading branch information
gyfora committed Mar 11, 2015
1 parent aacd4f2 commit 2522f02
Show file tree
Hide file tree
Showing 10 changed files with 979 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public DataStreamSource(StreamExecutionEnvironment environment, String operatorT
TypeInformation<OUT> outTypeInfo, StreamInvokable<?, ?> invokable, boolean isParallel) {
super(environment, operatorType, outTypeInfo, invokable);
this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
import org.apache.flink.streaming.api.windowing.windowbuffer.CompletePreAggregator;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingCountPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimePreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingGroupedPreReducer;
import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
Expand Down Expand Up @@ -337,8 +339,10 @@ private DiscretizedStream<OUT> discretize(WindowTransformation transformation,
}

private int getDiscretizerParallelism() {
return isLocal || WindowUtils.isParallelPolicy(getTrigger(), getEviction())
|| (discretizerKey != null) ? dataStream.environment.getDegreeOfParallelism() : 1;
return isLocal
|| WindowUtils.isParallelPolicy(getTrigger(), getEviction(),
dataStream.getParallelism()) || (discretizerKey != null) ? dataStream.environment
.getDegreeOfParallelism() : 1;
}

private StreamInvokable<OUT, WindowEvent<OUT>> getDiscretizer() {
Expand Down Expand Up @@ -381,21 +385,35 @@ private WindowBuffer<OUT> getWindowBuffer(WindowTransformation transformation) {
(ReduceFunction<OUT>) transformation.getUDF(), groupByKey, getType()
.createSerializer(getExecutionConfig()));
}
} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction) && groupByKey == null) {

return new SlidingCountPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream.getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());

} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction) && groupByKey == null) {
} else if (WindowUtils.isSlidingCountPolicy(trigger, eviction)) {
if (groupByKey == null) {
return new SlidingCountPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());
} else {
return new SlidingCountGroupedPreReducer<OUT>(
clean((ReduceFunction<OUT>) transformation.getUDF()), dataStream
.getType().createSerializer(getExecutionConfig()), groupByKey,
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
((CountTriggerPolicy<?>) trigger).getStart());
}

return new SlidingTimePreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));
} else if (WindowUtils.isSlidingTimePolicy(trigger, eviction)) {
if (groupByKey == null) {
return new SlidingTimePreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()),
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));
} else {
return new SlidingTimeGroupedPreReducer<OUT>(
(ReduceFunction<OUT>) transformation.getUDF(), dataStream.getType()
.createSerializer(getExecutionConfig()), groupByKey,
WindowUtils.getWindowSize(eviction), WindowUtils.getSlideSize(trigger),
WindowUtils.getTimeStampWrapper(trigger));
}

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public Function getUDF() {
}
}

public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
return (eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy))
|| (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy);
public static boolean isParallelPolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction,
int inputParallelism) {
return inputParallelism != 1
&& ((eviction instanceof CountEvictionPolicy && (trigger instanceof CountTriggerPolicy || trigger instanceof TimeTriggerPolicy)) || (eviction instanceof TumblingEvictionPolicy && trigger instanceof CountTriggerPolicy));
}

public static boolean isSlidingTimePolicy(TriggerPolicy<?> trigger, EvictionPolicy<?> eviction) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.windowing.windowbuffer;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;

/**
* Non-grouped pre-reducer for tumbling eviction policy.
*/
public class SlidingCountGroupedPreReducer<T> extends SlidingGroupedPreReducer<T> {

private static final long serialVersionUID = 1L;

private long windowSize;
private long slideSize;
private int start;

protected long index = 0;

public SlidingCountGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
KeySelector<T, ?> key, long windowSize, long slideSize, int start) {
super(reducer, serializer, key);
if (windowSize > slideSize) {
this.windowSize = windowSize;
this.slideSize = slideSize;
this.start = start;
} else {
throw new RuntimeException(
"Window size needs to be larger than slide size for the sliding pre-reducer");
}
index = index - start;
}

@Override
protected void afterStore() {
index++;
}

@Override
public void store(T element) throws Exception {
if (index >= 0) {
super.store(element);
} else {
index++;
}
}

@Override
protected boolean currentEligible(T next) {
if (index <= slideSize) {
return true;
} else {
return index == windowSize;
}
}

@Override
protected void afterEmit() {
if (index >= slideSize) {
index = index - slideSize;
}
}

@Override
public SlidingCountGroupedPreReducer<T> clone() {
return new SlidingCountGroupedPreReducer<T>(reducer, serializer, key, windowSize,
slideSize, start);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.windowing.windowbuffer;

import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.StreamWindow;

public abstract class SlidingGroupedPreReducer<T> extends SlidingPreReducer<T> {

private static final long serialVersionUID = 1L;

protected Map<Object, T> currentReducedMap = new HashMap<Object, T>();
protected LinkedList<Map<Object, T>> reducedMap = new LinkedList<Map<Object, T>>();

protected KeySelector<T, ?> key;

public SlidingGroupedPreReducer(ReduceFunction<T> reducer, TypeSerializer<T> serializer,
KeySelector<T, ?> key) {
super(reducer, serializer);
this.key = key;
}

public boolean addFinalAggregate(StreamWindow<T> currentWindow) throws Exception {
Map<Object, T> finalReduce = null;

if (!reducedMap.isEmpty()) {
finalReduce = reducedMap.get(0);
for (int i = 1; i < reducedMap.size(); i++) {
finalReduce = reduceMaps(finalReduce, reducedMap.get(i));

}
if (currentReducedMap != null) {
finalReduce = reduceMaps(finalReduce, currentReducedMap);
}

} else {
finalReduce = currentReducedMap;
}

if (finalReduce != null) {
currentWindow.addAll(finalReduce.values());
return true;
} else {
return false;
}

}

private Map<Object, T> reduceMaps(Map<Object, T> first, Map<Object, T> second) throws Exception {

Map<Object, T> reduced = new HashMap<Object, T>();

// Get the common keys in the maps
Set<Object> interSection = new HashSet<Object>();
Set<Object> diffFirst = new HashSet<Object>();
Set<Object> diffSecond = new HashSet<Object>();

for (Object key : first.keySet()) {
if (second.containsKey(key)) {
interSection.add(key);
} else {
diffFirst.add(key);
}
}

for (Object key : second.keySet()) {
if (!interSection.contains(key)) {
diffSecond.add(key);
}
}

// Reduce the common keys
for (Object key : interSection) {
reduced.put(
key,
reducer.reduce(serializer.copy(first.get(key)),
serializer.copy(second.get(key))));
}

for (Object key : diffFirst) {
reduced.put(key, first.get(key));
}

for (Object key : diffSecond) {
reduced.put(key, second.get(key));
}

return reduced;
}

protected void updateCurrent(T element) throws Exception {
if (currentReducedMap == null) {
currentReducedMap = new HashMap<Object, T>();
currentReducedMap.put(key.getKey(element), element);
} else {
Object nextKey = key.getKey(element);
T last = currentReducedMap.get(nextKey);
if (last == null) {
currentReducedMap.put(nextKey, element);
} else {
currentReducedMap.put(nextKey, reducer.reduce(serializer.copy(last), element));
}
}
}

@Override
protected void removeLastReduced() {
reducedMap.removeFirst();
}

@Override
protected void addCurrentToBuffer(T element) throws Exception {
reducedMap.add(currentReducedMap);
}

@Override
protected void resetCurrent() {
currentReducedMap = null;
}

@Override
protected boolean currentNotEmpty() {
return currentReducedMap != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,23 @@ protected void afterStore() {
}

protected void addToBufferIfEligible(T element) throws Exception {
if (currentEligible(element) && currentReduced != null) {
if (currentEligible(element) && currentNotEmpty()) {
addCurrentToBuffer(element);
elementsPerPreAggregate.add(elementsSinceLastPreAggregate);
elementsSinceLastPreAggregate = 1;
} else {
updateCurrent(element);

elementsSinceLastPreAggregate++;
elementsSinceLastPreAggregate = 0;
resetCurrent();
}
updateCurrent(element);

elementsSinceLastPreAggregate++;
}

protected void resetCurrent() {
currentReduced = null;
}

protected boolean currentNotEmpty() {
return currentReduced != null;
}

protected void updateCurrent(T element) throws Exception {
Expand All @@ -122,9 +130,8 @@ protected void updateCurrent(T element) throws Exception {
}
}

protected void addCurrentToBuffer(T element) {
protected void addCurrentToBuffer(T element) throws Exception {
reduced.add(currentReduced);
currentReduced = element;
}

protected abstract boolean currentEligible(T next);
Expand All @@ -135,7 +142,7 @@ public void evict(int n) {
Integer lastPreAggregateSize = elementsPerPreAggregate.peek();
while (lastPreAggregateSize != null && lastPreAggregateSize <= toRemove) {
toRemove = max(toRemove - elementsPerPreAggregate.removeFirst(), 0);
reduced.removeFirst();
removeLastReduced();
lastPreAggregateSize = elementsPerPreAggregate.peek();
}

Expand All @@ -144,7 +151,11 @@ public void evict(int n) {
}
}

public int max(int a, int b) {
protected void removeLastReduced() {
reduced.removeFirst();
}

public static int max(int a, int b) {
if (a > b) {
return a;
} else {
Expand Down
Loading

0 comments on commit 2522f02

Please sign in to comment.