Skip to content

Commit

Permalink
[streaming] Fixed SlidingWindowStateIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
ghermann authored and mbalassi committed Sep 20, 2014
1 parent 1c58eb8 commit d97efde
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class BatchReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, O
protected long granularity;
protected int listSize;
protected transient SlidingWindowState<IN> state;

private long batchSize;
private int counter = 0;

Expand All @@ -59,15 +59,17 @@ protected void mutableInvoke() throws Exception {

@Override
protected void immutableInvoke() throws Exception {
if ((reuse = recordIterator.next(reuse)) == null) {
if (getNextRecord() == null) {
throw new RuntimeException("DataStream must not be empty");
}


initializeAtFirstRecord();

while (reuse != null && !state.isFull()) {
collectOneUnit();
}
reduce();

while (reuse != null) {
for (int i = 0; i < slideSize / granularity; i++) {
if (reuse != null) {
Expand All @@ -78,20 +80,35 @@ protected void immutableInvoke() throws Exception {
}
}

protected void initializeAtFirstRecord() {
counter = 0;
}

protected void collectOneUnit() throws IOException {
ArrayList<StreamRecord<IN>> list;
list = new ArrayList<StreamRecord<IN>>(listSize);

do {
list.add(reuse);
resetReuse();
} while ((reuse = recordIterator.next(reuse)) != null && batchNotFull());

if (!batchNotFull()) {
list = new ArrayList<StreamRecord<IN>>();
} else {
list = new ArrayList<StreamRecord<IN>>(listSize);

do {
list.add(reuse);
resetReuse();
} while (getNextRecord() != null && batchNotFull());
}
state.pushBack(list);
}


protected StreamRecord<IN> getNextRecord() throws IOException {
reuse = recordIterator.next(reuse);
if (reuse != null) {
counter++;
}
return reuse;
}

protected boolean batchNotFull() {
counter++;
if (counter < granularity) {
return true;
} else {
Expand All @@ -104,12 +121,12 @@ protected void reduce() {
userIterator = state.getIterator();
callUserFunctionAndLogException();
}

@Override
protected void callUserFunction() throws Exception {
reducer.reduce(userIterable, collector);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Expand All @@ -126,5 +143,4 @@ public Iterator<IN> iterator() {

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.invokable.operator;

import java.io.IOException;
import java.util.ArrayList;

import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
Expand All @@ -38,55 +37,20 @@ public WindowReduceInvokable(GroupReduceFunction<IN, OUT> reduceFunction, long w
}

@Override
protected void immutableInvoke() throws Exception {
if ((reuse = recordIterator.next(reuse)) == null) {
throw new RuntimeException("DataStream must not be empty");
}

nextRecordTime = timestamp.getTimestamp(reuse.getObject()); // **
startTime = nextRecordTime - (nextRecordTime % granularity); // **

while (reuse != null && !state.isFull()) {
collectOneUnit();
}
reduce();

while (reuse != null) {
for (int i = 0; i < slideSize / granularity; i++) {
if (reuse != null) {
collectOneUnit();
}
}
reduce();
}
protected void initializeAtFirstRecord() {
startTime = nextRecordTime - (nextRecordTime % granularity);
}

@Override
protected void collectOneUnit() throws IOException {
ArrayList<StreamRecord<IN>> list;
if (nextRecordTime > startTime + granularity - 1) {
list = new ArrayList<StreamRecord<IN>>();
startTime += granularity;
} else {
list = new ArrayList<StreamRecord<IN>>(listSize);

list.add(reuse);
resetReuse();

while ((reuse = recordIterator.next(reuse)) != null && batchNotFull()) {
list.add(reuse);
resetReuse();
}
protected StreamRecord<IN> getNextRecord() throws IOException {
reuse = recordIterator.next(reuse);
if (reuse != null) {
nextRecordTime = timestamp.getTimestamp(reuse.getObject());
}
state.pushBack(list);
// System.out.println(list);
// System.out.println(startTime + " - " + (startTime + granularity - 1) + " ("
// + nextRecordTime + ")");
return reuse;
}

@Override
protected boolean batchNotFull() {
nextRecordTime = timestamp.getTimestamp(reuse.getObject());
if (nextRecordTime < startTime + granularity) {
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.flink.streaming.api.invokable.operator.BatchIterator;
Expand All @@ -27,6 +28,8 @@
public class SlidingWindowStateIterator<T> implements BatchIterator<T> {

private CircularFifoBuffer buffer;
// private StreamRecord<T> nextElement;

private Iterator<Collection<StreamRecord<T>>> iterator;
private Iterator<StreamRecord<T>> subIterator;
private Iterator<StreamRecord<T>> streamRecordIterator;
Expand All @@ -37,16 +40,29 @@ public SlidingWindowStateIterator(CircularFifoBuffer buffer) {
}

public boolean hasNext() {
while (iterator.hasNext() && !subIterator.hasNext()) {
subIterator = iterator.next().iterator();
}

return subIterator.hasNext();
}

public T next() {
T nextElement = subIterator.next().getObject();
if (!subIterator.hasNext()) {
if (iterator.hasNext()) {
subIterator = iterator.next().iterator();
T nextElement;

if (hasNext()) {
nextElement = subIterator.next().getObject();

if (!subIterator.hasNext()) {
if (iterator.hasNext()) {
subIterator = iterator.next().iterator();
}
}
} else {
throw new NoSuchElementException("There is no more element in the current batch");
}


return nextElement;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,37 @@ public void before() {
windowSize = 10;
slideSize = 5;
timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L);
expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, EOW, "3", "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8", "9", EOW, "9", "10", EOW));
expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, "3", "4", "5", "6", EOW, "3",
"4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8",
EOW, "7", "8", "9", EOW, "9", "10", EOW));
invokables.add(new WindowReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));

windowSize = 10;
slideSize = 4;
timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L);
expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", EOW, "3", "4", "5", "6",
EOW, "4", "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "7", "8", "9", EOW, "9",
"10", EOW));
invokables.add(new WindowReduceInvokable<Integer, String>(new MySlidingWindowReduce(),
windowSize, slideSize, new MyTimestamp(timestamps)));
}

@Test
public void slidingBatchReduceTest() {
List<List<String>> actualResults = new ArrayList<List<String>>();

for (WindowReduceInvokable<Integer, String> invokable : invokables) {
actualResults.add(MockInvokable.createAndExecute(invokable,
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));
List<String> result = MockInvokable.createAndExecute(invokable,
Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
actualResults.add(result);
}

assertEquals(expectedResults, actualResults);
Iterator<List<String>> actualResult = actualResults.iterator();

for (List<String> expectedResult : expectedResults) {
assertEquals(expectedResult, actualResult.next());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;

Expand All @@ -35,7 +37,7 @@ public class SlidingWindowStateTest {

@SuppressWarnings("unchecked")
@Test
public void test() {
public void basicSlidingWindowStateTest() {
SlidingWindowState<Integer> state = new SlidingWindowState<Integer>(SLIDING_BATCH_SIZE,
SLIDE_SIZE, UNIT);
state.pushBack(Arrays.asList(new StreamRecord<Integer>().setObject(0)));
Expand Down Expand Up @@ -65,6 +67,52 @@ public void test() {
}
assertEquals(getExpectedSet(2, 4), actualSet);
}

private final static int WINDOW_SIZE = 10;
private final static int WINDOW_SLIDE_SIZE = 4;
private static final int WINDOW_UNIT = 2;

@SuppressWarnings("unchecked")
@Test
public void slidingWithGreaterUnit() {
SlidingWindowState<Integer> state = new SlidingWindowState<Integer>(WINDOW_SIZE,
WINDOW_SLIDE_SIZE, WINDOW_UNIT);
state.pushBack(new ArrayList<StreamRecord<Integer>>());
state.pushBack(Arrays.asList(new StreamRecord<Integer>().setObject(1)));
state.pushBack(new ArrayList<StreamRecord<Integer>>());
state.pushBack(Arrays.asList(new StreamRecord<Integer>().setObject(2), new StreamRecord<Integer>().setObject(3)));
state.pushBack(new ArrayList<StreamRecord<Integer>>());

SortedSet<Integer> actualSet = new TreeSet<Integer>();
SlidingWindowStateIterator<Integer> iterator = state.getIterator();

iterator.hasNext();
iterator.hasNext();
while (iterator.hasNext()) {
iterator.hasNext();
iterator.hasNext();
actualSet.add(iterator.next());
iterator.hasNext();
iterator.hasNext();
}

assertEquals(getExpectedSet(1, 3), actualSet);
actualSet.clear();

Iterator<StreamRecord<Integer>> streamRecordIterator = state.getStreamRecordIterator();

streamRecordIterator.hasNext();
streamRecordIterator.hasNext();
while (streamRecordIterator.hasNext()) {
streamRecordIterator.hasNext();
streamRecordIterator.hasNext();
actualSet.add(streamRecordIterator.next().getObject());
streamRecordIterator.hasNext();
streamRecordIterator.hasNext();
}

assertEquals(getExpectedSet(1, 3), actualSet);
}

private SortedSet<Integer> getExpectedSet(int from, int to) {
SortedSet<Integer> expectedSet = new TreeSet<Integer>();
Expand Down

0 comments on commit d97efde

Please sign in to comment.