Skip to content

Commit

Permalink
[FLINK-4207] WindowOperator becomes very slow with allowed lateness
Browse files Browse the repository at this point in the history
  • Loading branch information
kl0u committed Jul 26, 2016
1 parent 884d3e2 commit 12bf7c1
Show file tree
Hide file tree
Showing 16 changed files with 580 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public ACC get() {
byte[] key = baos.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
if (valueBytes == null) {
return stateDesc.getDefaultValue();
return null;
}
return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
} catch (IOException|RocksDBException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -94,7 +93,7 @@ public Iterable<V> get() {
byte[] valueBytes = backend.db.get(columnFamily, key);

if (valueBytes == null) {
return Collections.emptyList();
return null;
}

ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@ public interface AppendingState<IN, OUT> extends State {
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* @return The operator state value corresponding to the current input.
*
* <p>
* <b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
* should return {@code null}.
* </p>
*
* @return The operator state value corresponding to the current input or {@code null}
* if the state is empty.
*
* @throws Exception Thrown if the system cannot access the state.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;

/**
* Generic implementation of {@link ListState} based on a wrapped {@link ValueState}.
Expand Down Expand Up @@ -82,11 +81,7 @@ public void dispose() {

@Override
public Iterable<T> get() throws Exception {
ArrayList<T> result = wrappedState.value();
if (result == null) {
return Collections.emptyList();
}
return result;
return wrappedState.value();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,8 @@ public ACC get() {
if (currentNSState == null) {
currentNSState = state.get(currentNamespace);
}
if (currentNSState != null) {
ACC value = currentNSState.get(currentKey);
return value != null ? value : stateDesc.getDefaultValue();
}
return stateDesc.getDefaultValue();
return currentNSState != null ?
currentNSState.get(currentKey) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.flink.runtime.state.KvStateSnapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -85,15 +83,8 @@ public Iterable<V> get() {
if (currentNSState == null) {
currentNSState = state.get(currentNamespace);
}
if (currentNSState != null) {
List<V> result = currentNSState.get(currentKey);
if (result == null) {
return Collections.emptyList();
} else {
return result;
}
}
return Collections.emptyList();
return currentNSState != null ?
currentNSState.get(currentKey) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,8 @@ public ACC get() {
if (currentNSState == null) {
currentNSState = state.get(currentNamespace);
}
if (currentNSState != null) {
ACC value = currentNSState.get(currentKey);
return value != null ? value : stateDesc.getDefaultValue();
}
return stateDesc.getDefaultValue();
return currentNSState != null ?
currentNSState.get(currentKey) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.flink.runtime.state.KvStateSnapshot;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -56,15 +54,8 @@ public Iterable<V> get() {
if (currentNSState == null) {
currentNSState = state.get(currentNamespace);
}
if (currentNSState != null) {
List<V> result = currentNSState.get(currentKey);
if (result == null) {
return Collections.emptyList();
} else {
return result;
}
}
return Collections.emptyList();
return currentNSState != null ?
currentNSState.get(currentKey) : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,10 @@ public void testListState() {
Joiner joiner = Joiner.on(",");
// some modifications to the state
backend.setCurrentKey(1);
assertEquals("", joiner.join(state.get()));
assertEquals(null, state.get());
state.add("1");
backend.setCurrentKey(2);
assertEquals("", joiner.join(state.get()));
assertEquals(null, state.get());
state.add("2");
backend.setCurrentKey(1);
assertEquals("1", joiner.join(state.get()));
Expand Down Expand Up @@ -438,10 +438,10 @@ public void testFoldingState() {

// some modifications to the state
backend.setCurrentKey(1);
assertEquals("Fold-Initial:", state.get());
assertEquals(null, state.get());
state.add(1);
backend.setCurrentKey(2);
assertEquals("Fold-Initial:", state.get());
assertEquals(null, state.get());
state.add(2);
backend.setCurrentKey(1);
assertEquals("Fold-Initial:,1", state.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public boolean canMerge() {
@Override
public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx);
return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,14 @@ public void merge(W mergeResult,

// check if the window is already inactive
if (isLate(actualWindow)) {
LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
mergingWindows.retireWindow(actualWindow);
continue;
}

W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
ListState<StreamRecord<IN>> windowState = getPartitionedState(
stateWindow, windowSerializer, windowStateDescriptor);
windowState.add(element);
Expand All @@ -149,7 +151,15 @@ public void merge(W mergeResult,
// on the (possibly merged) window
TriggerResult triggerResult = context.onElement(element);
TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);
fireOrContinue(combinedTriggerResult, actualWindow, windowState);

if (combinedTriggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = windowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
fire(actualWindow, contents);
}

if (combinedTriggerResult.isPurge()) {
cleanup(actualWindow, windowState, mergingWindows);
Expand All @@ -163,7 +173,6 @@ public void merge(W mergeResult,

// check if the window is already inactive
if (isLate(window)) {
LOG.info("Dropped element " + element + " for window " + window + " due to lateness.");
continue;
}

Expand All @@ -175,7 +184,15 @@ public void merge(W mergeResult,
context.window = window;

TriggerResult triggerResult = context.onElement(element);
fireOrContinue(triggerResult, window, windowState);

if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = windowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
fire(window, contents);
}

if (triggerResult.isPurge()) {
cleanup(window, windowState, null);
Expand Down Expand Up @@ -207,16 +224,30 @@ public void processWatermark(Watermark mark) throws Exception {
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
// then the window is already purged and this is a cleanup
// timer set due to allowed lateness that has nothing to clean,
// so it is safe to just ignore
continue;
}
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}

Iterable<StreamRecord<IN>> contents = windowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}

TriggerResult triggerResult = context.onEventTime(timer.timestamp);
fireOrContinue(triggerResult, context.window, windowState);
if (triggerResult.isFire()) {
fire(context.window, contents);
}

if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
cleanup(timer.window, windowState, mergingWindows);
if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
cleanup(context.window, windowState, mergingWindows);
}

} else {
Expand Down Expand Up @@ -255,16 +286,30 @@ public void trigger(long time) throws Exception {
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
W stateWindow = mergingWindows.getStateWindow(context.window);
if (stateWindow == null) {
// then the window is already purged and this is a cleanup
// timer set due to allowed lateness that has nothing to clean,
// so it is safe to just ignore
continue;
}
windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
} else {
windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
}

Iterable<StreamRecord<IN>> contents = windowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}

TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
fireOrContinue(triggerResult, context.window, windowState);
if (triggerResult.isFire()) {
fire(context.window, contents);
}

if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) {
cleanup(timer.window, windowState, mergingWindows);
if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
cleanup(context.window, windowState, mergingWindows);
}

} else {
Expand All @@ -273,15 +318,8 @@ public void trigger(long time) throws Exception {
} while (fire);
}

private void fireOrContinue(TriggerResult triggerResult,
W window,
ListState<StreamRecord<IN>> windowState) throws Exception {
if (!triggerResult.isFire()) {
return;
}

private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
Iterable<StreamRecord<IN>> contents = windowState.get();

// Work around type system restrictions...
int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window);
Expand All @@ -307,7 +345,6 @@ private void cleanup(W window,
mergingWindows.retireWindow(window);
}
context.clear();
deleteCleanupTimer(window);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner, ListState<Tu
this.windowAssigner = windowAssigner;
windows = new HashMap<>();

for (Tuple2<W, W> window: state.get()) {
windows.put(window.f0, window.f1);
Iterable<Tuple2<W, W>> windowState = state.get();
if (windowState != null) {
for (Tuple2<W, W> window: windowState) {
windows.put(window.f0, window.f1);
}
}
}

Expand All @@ -100,12 +103,7 @@ public void persist(ListState<Tuple2<W, W>> state) throws Exception {
* @param window The window for which to get the state window.
*/
public W getStateWindow(W window) {
W result = windows.get(window);
if (result == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}

return result;
return windows.get(window);
}

/**
Expand Down
Loading

0 comments on commit 12bf7c1

Please sign in to comment.