From 12bf7c1a0b81d199085fe874c64763c51a93b3bf Mon Sep 17 00:00:00 2001 From: kl0u Date: Mon, 18 Jul 2016 11:37:06 +0200 Subject: [PATCH] [FLINK-4207] WindowOperator becomes very slow with allowed lateness --- .../streaming/state/RocksDBFoldingState.java | 2 +- .../streaming/state/RocksDBListState.java | 3 +- .../api/common/state/AppendingState.java | 10 +- .../flink/runtime/state/GenericListState.java | 7 +- .../state/filesystem/FsFoldingState.java | 7 +- .../runtime/state/filesystem/FsListState.java | 13 +- .../runtime/state/memory/MemFoldingState.java | 7 +- .../runtime/state/memory/MemListState.java | 13 +- .../runtime/state/StateBackendTestBase.java | 8 +- .../windowing/triggers/PurgingTrigger.java | 2 +- .../windowing/EvictingWindowOperator.java | 75 ++- .../operators/windowing/MergingWindowSet.java | 14 +- .../operators/windowing/WindowOperator.java | 74 ++- .../windowing/MergingWindowSetTest.java | 7 +- .../windowing/WindowOperatorTest.java | 438 ++++++++++++++++++ .../sessionwindows/SessionWindowITCase.java | 2 - 16 files changed, 580 insertions(+), 102 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java index 8ffe3a628cda9..218fa2a56dca8 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBFoldingState.java @@ -93,7 +93,7 @@ public ACC get() { byte[] key = baos.toByteArray(); byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { - return stateDesc.getDefaultValue(); + return null; } return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); } catch (IOException|RocksDBException e) { diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index b13c5ae826da3..ce3a48e98f1ad 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -32,7 +32,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import static java.util.Objects.requireNonNull; @@ -94,7 +93,7 @@ public Iterable get() { byte[] valueBytes = backend.db.get(columnFamily, key); if (valueBytes == null) { - return Collections.emptyList(); + return null; } ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java index 04dc784c7491c..8ea8364296766 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java @@ -46,8 +46,14 @@ public interface AppendingState extends State { * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. - * - * @return The operator state value corresponding to the current input. + * + *

+ * NOTE TO IMPLEMENTERS: if the state is empty, then this method + * should return {@code null}. + *

+ * + * @return The operator state value corresponding to the current input or {@code null} + * if the state is empty. * * @throws Exception Thrown if the system cannot access the state. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java index 341485540db10..2e408989c1e86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; /** * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}. @@ -82,11 +81,7 @@ public void dispose() { @Override public Iterable get() throws Exception { - ArrayList result = wrappedState.value(); - if (result == null) { - return Collections.emptyList(); - } - return result; + return wrappedState.value(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java index bba6df51af5d2..90baf36cf805d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsFoldingState.java @@ -88,11 +88,8 @@ public ACC get() { if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - ACC value = currentNSState.get(currentKey); - return value != null ? value : stateDesc.getDefaultValue(); - } - return stateDesc.getDefaultValue(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java index 1d5b5f89381cc..46c9830511ab6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsListState.java @@ -27,9 +27,7 @@ import org.apache.flink.runtime.state.KvStateSnapshot; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -85,15 +83,8 @@ public Iterable get() { if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - List result = currentNSState.get(currentKey); - if (result == null) { - return Collections.emptyList(); - } else { - return result; - } - } - return Collections.emptyList(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java index 07b677b47a666..9953a64563a5a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemFoldingState.java @@ -64,11 +64,8 @@ public ACC get() { if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - ACC value = currentNSState.get(currentKey); - return value != null ? value : stateDesc.getDefaultValue(); - } - return stateDesc.getDefaultValue(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java index d5e4dfd96266c..97461d0140e4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemListState.java @@ -26,9 +26,7 @@ import org.apache.flink.runtime.state.KvStateSnapshot; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; /** @@ -56,15 +54,8 @@ public Iterable get() { if (currentNSState == null) { currentNSState = state.get(currentNamespace); } - if (currentNSState != null) { - List result = currentNSState.get(currentKey); - if (result == null) { - return Collections.emptyList(); - } else { - return result; - } - } - return Collections.emptyList(); + return currentNSState != null ? + currentNSState.get(currentKey) : null; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 12cf1125223bc..80f1de398a7c4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -243,10 +243,10 @@ public void testListState() { Joiner joiner = Joiner.on(","); // some modifications to the state backend.setCurrentKey(1); - assertEquals("", joiner.join(state.get())); + assertEquals(null, state.get()); state.add("1"); backend.setCurrentKey(2); - assertEquals("", joiner.join(state.get())); + assertEquals(null, state.get()); state.add("2"); backend.setCurrentKey(1); assertEquals("1", joiner.join(state.get())); @@ -438,10 +438,10 @@ public void testFoldingState() { // some modifications to the state backend.setCurrentKey(1); - assertEquals("Fold-Initial:", state.get()); + assertEquals(null, state.get()); state.add(1); backend.setCurrentKey(2); - assertEquals("Fold-Initial:", state.get()); + assertEquals(null, state.get()); state.add(2); backend.setCurrentKey(1); assertEquals("Fold-Initial:,1", state.get()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index 85d0b525422f4..8b30130d0522d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -72,7 +72,7 @@ public boolean canMerge() { @Override public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx); - return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : triggerResult; + return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 3f2c6a3904072..15f716c775416 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -132,12 +132,14 @@ public void merge(W mergeResult, // check if the window is already inactive if (isLate(actualWindow)) { - LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness."); mergingWindows.retireWindow(actualWindow); continue; } W stateWindow = mergingWindows.getStateWindow(actualWindow); + if (stateWindow == null) { + throw new IllegalStateException("Window " + window + " is not in in-flight window set."); + } ListState> windowState = getPartitionedState( stateWindow, windowSerializer, windowStateDescriptor); windowState.add(element); @@ -149,7 +151,15 @@ public void merge(W mergeResult, // on the (possibly merged) window TriggerResult triggerResult = context.onElement(element); TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0); - fireOrContinue(combinedTriggerResult, actualWindow, windowState); + + if (combinedTriggerResult.isFire()) { + Iterable> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + fire(actualWindow, contents); + } if (combinedTriggerResult.isPurge()) { cleanup(actualWindow, windowState, mergingWindows); @@ -163,7 +173,6 @@ public void merge(W mergeResult, // check if the window is already inactive if (isLate(window)) { - LOG.info("Dropped element " + element + " for window " + window + " due to lateness."); continue; } @@ -175,7 +184,15 @@ public void merge(W mergeResult, context.window = window; TriggerResult triggerResult = context.onElement(element); - fireOrContinue(triggerResult, window, windowState); + + if (triggerResult.isFire()) { + Iterable> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + fire(window, contents); + } if (triggerResult.isPurge()) { cleanup(window, windowState, null); @@ -207,16 +224,30 @@ public void processWatermark(Watermark mark) throws Exception { if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + Iterable> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onEventTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -255,16 +286,30 @@ public void trigger(long time) throws Exception { if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + Iterable> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -273,15 +318,8 @@ public void trigger(long time) throws Exception { } while (fire); } - private void fireOrContinue(TriggerResult triggerResult, - W window, - ListState> windowState) throws Exception { - if (!triggerResult.isFire()) { - return; - } - + private void fire(W window, Iterable> contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); - Iterable> contents = windowState.get(); // Work around type system restrictions... int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); @@ -307,7 +345,6 @@ private void cleanup(W window, mergingWindows.retireWindow(window); } context.clear(); - deleteCleanupTimer(window); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java index d02a34814ae41..c806d2d7a7180 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java @@ -80,8 +80,11 @@ public MergingWindowSet(MergingWindowAssigner windowAssigner, ListState(); - for (Tuple2 window: state.get()) { - windows.put(window.f0, window.f1); + Iterable> windowState = state.get(); + if (windowState != null) { + for (Tuple2 window: windowState) { + windows.put(window.f0, window.f1); + } } } @@ -100,12 +103,7 @@ public void persist(ListState> state) throws Exception { * @param window The window for which to get the state window. */ public W getStateWindow(W window) { - W result = windows.get(window); - if (result == null) { - throw new IllegalStateException("Window " + window + " is not in in-flight window set."); - } - - return result; + return windows.get(window); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index bb05d2b9f9fff..2434843bd04a2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -334,12 +334,15 @@ public void merge(W mergeResult, // drop if the window is already late if (isLate(actualWindow)) { - LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness."); mergingWindows.retireWindow(actualWindow); continue; } W stateWindow = mergingWindows.getStateWindow(actualWindow); + if (stateWindow == null) { + throw new IllegalStateException("Window " + window + " is not in in-flight window set."); + } + AppendingState windowState = getPartitionedState( stateWindow, windowSerializer, windowStateDescriptor); windowState.add(element.getValue()); @@ -351,7 +354,14 @@ public void merge(W mergeResult, // on the (possibly merged) window TriggerResult triggerResult = context.onElement(element); TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0); - fireOrContinue(combinedTriggerResult, actualWindow, windowState); + + if (combinedTriggerResult.isFire()) { + ACC contents = windowState.get(); + if (contents == null) { + continue; + } + fire(actualWindow, contents); + } if (combinedTriggerResult.isPurge()) { cleanup(actualWindow, windowState, mergingWindows); @@ -364,7 +374,6 @@ public void merge(W mergeResult, // drop if the window is already late if (isLate(window)) { - LOG.info("Dropped element " + element + " for window " + window + " due to lateness."); continue; } @@ -376,7 +385,14 @@ public void merge(W mergeResult, context.window = window; TriggerResult triggerResult = context.onElement(element); - fireOrContinue(triggerResult, window, windowState); + + if (triggerResult.isFire()) { + ACC contents = windowState.get(); + if (contents == null) { + continue; + } + fire(window, contents); + } if (triggerResult.isPurge()) { cleanup(window, windowState, null); @@ -408,16 +424,30 @@ public void processWatermark(Watermark mark) throws Exception { if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onEventTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -456,16 +486,30 @@ public void trigger(long time) throws Exception { if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + continue; + } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - fireOrContinue(triggerResult, context.window, windowState); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(timer.window, timer.timestamp))) { - cleanup(timer.window, windowState, mergingWindows); + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } } else { @@ -487,7 +531,6 @@ private void cleanup(W window, mergingWindows.retireWindow(window); } context.clear(); - deleteCleanupTimer(window); } /** @@ -495,15 +538,8 @@ private void cleanup(W window, * The caller must ensure that the correct key is set in the state backend and the context object. */ @SuppressWarnings("unchecked") - private void fireOrContinue(TriggerResult triggerResult, - W window, - AppendingState windowState) throws Exception { - if (!triggerResult.isFire()) { - return; - } - + private void fire(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); - ACC contents = windowState.get(); userFunction.apply(context.key, context.window, contents, timestampedCollector); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index cf90f8ae478c1..939f13fd807ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -133,12 +133,7 @@ public void testIncrementalMerging() throws Exception { // retire the first batch of windows windowSet.retireWindow(new TimeWindow(0, 6)); - try { - windowSet.getStateWindow(new TimeWindow(0, 6)); - fail("Expected exception"); - } catch (IllegalStateException e) { - //ignore - } + assertTrue(windowSet.getStateWindow(new TimeWindow(0, 6)) == null); assertTrue(windowSet.getStateWindow(new TimeWindow(10, 15)).equals(new TimeWindow(11, 14))); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 90bd3f2c70d7d..62266c4327425 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -17,9 +17,12 @@ */ package org.apache.flink.streaming.runtime.operators.windowing; +import com.google.common.base.Joiner; import com.google.common.collect.Iterables; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; @@ -51,6 +54,8 @@ import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; @@ -1952,10 +1957,379 @@ public void testDropDueToLatenessSessionWithHugeLateness() throws Exception { testHarness.close(); } + @Test + public void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 100; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + ListStateDescriptor> windowStateDesc = + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Iterable>, String, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new PassThroughFunction2()), + new EventTimeTriggerAccumGC(LATENESS), + LATENESS); + + OneInputStreamOperatorTestHarness, String> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2100)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>("GOT: (key2,1)", 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2100)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + private class PassThroughFunction2 implements WindowFunction, String, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String k, TimeWindow window, Iterable> input, Collector out) throws Exception { + out.collect("GOT: " + Joiner.on(",").join(input)); + } + } + + @Test + public void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 1; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + ListStateDescriptor> windowStateDesc = + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Iterable>, Tuple2, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2000)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 1; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + ReducingStateDescriptor> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Tuple2, Tuple2, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2000)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyFoldingStateForTumblingWindows() throws Exception { + final int WINDOW_SIZE = 2; + final long LATENESS = 1; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + FoldingStateDescriptor, Tuple2> windowStateDesc = + new FoldingStateDescriptor<>( + "window-contents", + new Tuple2<>((String) null, 0), + new FoldFunction, Tuple2>() { + @Override + public Tuple2 fold(Tuple2 accumulator, Tuple2 value) throws Exception { + return new Tuple2<>(value.f0, accumulator.f1 + value.f1); + } + }, + inputType); + + WindowOperator, Tuple2, Tuple2, TimeWindow> operator = + new WindowOperator<>( + TumblingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + // normal element + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(1599)); + testHarness.processWatermark(new Watermark(1999)); + testHarness.processWatermark(new Watermark(2000)); + testHarness.processWatermark(new Watermark(5000)); + + expected.add(new Watermark(1599)); + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999)); + expected.add(new Watermark(1999)); // here it fires and purges + expected.add(new Watermark(2000)); // here is the cleanup timer + expected.add(new Watermark(5000)); + + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception { + final int GAP_SIZE = 3; + final long LATENESS = 10; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + ListStateDescriptor> windowStateDesc = + new ListStateDescriptor<>("window-contents", inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Iterable>, Tuple2, TimeWindow> operator = + new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalIterableWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(4998)); + + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + expected.add(new Watermark(4998)); + + testHarness.processWatermark(new Watermark(14600)); + expected.add(new Watermark(14600)); + + ConcurrentLinkedQueue actual = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception { + + final int GAP_SIZE = 3; + final long LATENESS = 10; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + ReducingStateDescriptor> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Tuple2, Tuple3, TimeWindow> operator = + new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()), + EventTimeTrigger.create(), + LATENESS); + + operator.setInputType(TypeInfoParser.>parse("Tuple2"), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness, Tuple3> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(4998)); + + expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 4000L), 3999)); + expected.add(new Watermark(4998)); + + testHarness.processWatermark(new Watermark(14600)); + expected.add(new Watermark(14600)); + + ConcurrentLinkedQueue actual = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + testHarness.close(); + } + + @Test + public void testCleanupTimerWithEmptyFoldingStateForSessionWindows() throws Exception { + final int GAP_SIZE = 3; + final long LATENESS = 10; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + FoldingStateDescriptor, Tuple2> windowStateDesc = + new FoldingStateDescriptor<>( + "window-contents", + new Tuple2<>((String) null, 0), + new FoldFunction, Tuple2>() { + @Override + public Tuple2 fold(Tuple2 accumulator, Tuple2 value) throws Exception { + return new Tuple2<>(value.f0, accumulator.f1 + value.f1); + } + }, + inputType); + + WindowOperator, Tuple2, Tuple2, TimeWindow> operator = + new WindowOperator<>( + EventTimeSessionWindows.withGap(Time.seconds(GAP_SIZE)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + windowStateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughFunction()), + EventTimeTrigger.create(), + LATENESS); + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + ConcurrentLinkedQueue expected = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000)); + testHarness.processWatermark(new Watermark(4998)); + + expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 3999)); + expected.add(new Watermark(4998)); + + testHarness.processWatermark(new Watermark(14600)); + expected.add(new Watermark(14600)); + + ConcurrentLinkedQueue actual = testHarness.getOutput(); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + testHarness.close(); + } + // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ + private class PassThroughFunction implements WindowFunction, Tuple2, String, TimeWindow> { + private static final long serialVersionUID = 1L; + + @Override + public void apply(String k, TimeWindow window, Iterable> input, Collector> out) throws Exception { + for (Tuple2 in: input) { + out.collect(in); + } + } + } + public static class SumReducer implements ReduceFunction> { private static final long serialVersionUID = 1L; @Override @@ -2112,4 +2486,68 @@ public Collection assignWindows(Object element, long timestamp, Wind return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout)); } } + + /** + * A trigger that fires at the end of the window but does not + * purge the state of the fired window. This is to test the state + * garbage collection mechanism. + */ + public class EventTimeTriggerAccumGC extends Trigger { + private static final long serialVersionUID = 1L; + + private long cleanupTime; + + private EventTimeTriggerAccumGC() { + cleanupTime = 0L; + } + + public EventTimeTriggerAccumGC(long cleanupTime) { + this.cleanupTime = cleanupTime; + } + + @Override + public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { + if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { + // if the watermark is already past the window fire immediately + return TriggerResult.FIRE; + } else { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } + } + + @Override + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { + return time == window.maxTimestamp() || time == window.maxTimestamp() + cleanupTime ? + TriggerResult.FIRE_AND_PURGE : + TriggerResult.CONTINUE; + } + + @Override + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { + return TriggerResult.CONTINUE; + } + + @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception { + ctx.deleteEventTimeTimer(window.maxTimestamp()); + } + + @Override + public boolean canMerge() { + return true; + } + + @Override + public TriggerResult onMerge(TimeWindow window, + OnMergeContext ctx) { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } + + @Override + public String toString() { + return "EventTimeTrigger()"; + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java index eb137aa500dd0..9b4855faab8f5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java @@ -22,9 +22,7 @@ import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.WindowedStream; -import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction;