diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index c6d4c95a7dff4..14ce200232078 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -115,6 +115,7 @@ added using `add(T)` are folded into an aggregate using a specified `FoldFunctio retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or `putAll(Map)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively. +You can also use `isEmpty()` to check whether this map contains any key-value mappings. All types of state also have a method `clear()` that clears the state for the currently active key, i.e. the key of the input element. diff --git a/docs/dev/stream/state/state.zh.md b/docs/dev/stream/state/state.zh.md index 78da6410d66a2..b5d40eba282dd 100644 --- a/docs/dev/stream/state/state.zh.md +++ b/docs/dev/stream/state/state.zh.md @@ -87,7 +87,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态 接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。 * `MapState`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map)` 添加映射。 - 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。 + 使用 `get(UK)` 检索特定 key。 使用 `entries()`,`keys()` 和 `values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。 所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。 diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java index 7a130d49083d2..94eb275eba194 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapState.java @@ -124,4 +124,13 @@ public interface MapState extends State { * @throws Exception Thrown if the system cannot access the state. */ Iterator> iterator() throws Exception; + + /** + * Returns true if this state contains no key-value mappings, otherwise false. + * + * @return True if this state contains no key-value mappings, otherwise false. + * + * @throws Exception Thrown if the system cannot access the state. + */ + boolean isEmpty() throws Exception; } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java index fe95c6dbbd054..2717c138fd6da 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java @@ -530,7 +530,7 @@ boolean hasNonEmptySharedBuffer(KEY key) throws Exception { @VisibleForTesting boolean hasNonEmptyPQ(KEY key) throws Exception { setCurrentKey(key); - return elementQueueState.keys().iterator().hasNext(); + return !elementQueueState.isEmpty(); } @VisibleForTesting diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java index 4d510cf405b02..1f5672d027520 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java @@ -219,6 +219,15 @@ public Iterator> iterator() throws Exception { return new CountingIterator<>(values.entrySet().iterator()); } + @Override + public boolean isEmpty() throws Exception { + if (values == null) { + return true; + } + + return values.isEmpty(); + } + @Override public void clear() { stateWrites++; diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java index 4d51b7de6be92..7a20a9648cc75 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java @@ -113,6 +113,11 @@ public Iterator> iterator() { return Collections.unmodifiableSet(state.entrySet()).iterator(); } + @Override + public boolean isEmpty() { + return state.isEmpty(); + } + @Override public void clear() { throw MODIFICATION_ATTEMPT_ERROR; diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java index 6465257b6b1a3..3694c54fc2e9d 100644 --- a/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java +++ b/flink-queryable-state/flink-queryable-state-client-java/src/test/java/org/apache/flink/queryablestate/client/state/ImmutableMapStateTest.java @@ -32,6 +32,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -186,4 +187,9 @@ public void testClear() throws Exception { mapState.clear(); } + + @Test + public void testIsEmpty() throws Exception { + assertFalse(mapState.isEmpty()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java index ce4d032c88252..301909bda991d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingMapState.java @@ -95,4 +95,9 @@ public Iterator> iterator() throws Exception { Iterator> original = originalState.iterator(); return original != null ? original : emptyState.entrySet().iterator(); } + + @Override + public boolean isEmpty() throws Exception { + return originalState.isEmpty(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java index 745e7f4f58e35..23620b8548150 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapMapState.java @@ -161,6 +161,12 @@ public Iterator> iterator() { return userMap == null ? null : userMap.entrySet().iterator(); } + @Override + public boolean isEmpty() { + Map userMap = stateTable.get(currentNamespace); + return userMap == null || userMap.isEmpty(); + } + @Override public byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index c3f624ab38e88..cb061742e5449 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -121,6 +121,12 @@ public Iterator> iterator() throws Exception { return entries().iterator(); } + @Override + public boolean isEmpty() throws Exception { + accessCallback.run(); + return original.isEmpty(); + } + @Nullable @Override public Map> getUnexpiredOrNull(@Nonnull Map> ttlValue) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 132fd01a64195..f90720d367138 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -2573,7 +2573,6 @@ public void testMapState() throws Exception { List expectedKeys = Arrays.asList(103, 1031, 1032); assertEquals(keys.size(), expectedKeys.size()); keys.removeAll(expectedKeys); - assertTrue(keys.isEmpty()); List values = new ArrayList<>(); for (String value : state.values()) { @@ -2582,7 +2581,6 @@ public void testMapState() throws Exception { List expectedValues = Arrays.asList("103", "1031", "1032"); assertEquals(values.size(), expectedValues.size()); values.removeAll(expectedValues); - assertTrue(values.isEmpty()); // make some more modifications backend.setCurrentKey("1"); @@ -2655,6 +2653,34 @@ public void testMapState() throws Exception { backend.dispose(); } + @Test + public void testMapStateIsEmpty() throws Exception { + MapStateDescriptor kvId = new MapStateDescriptor<>("id", Integer.class, Long.class); + + AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + + try { + MapState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + assertTrue(state.isEmpty()); + + int stateSize = 1024; + for (int i = 0; i < stateSize; i++) { + state.put(i, i * 2L); + assertFalse(state.isEmpty()); + } + + for (int i = 0; i < stateSize; i++) { + assertFalse(state.isEmpty()); + state.remove(i); + } + assertTrue(state.isEmpty()); + + } finally { + backend.dispose(); + } + } + /** * Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java index 9b5ac10c92d04..28b2a24147aa6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java @@ -86,6 +86,11 @@ public Iterator> iterator() { return entries().iterator(); } + @Override + public boolean isEmpty() { + return getInternal().isEmpty(); + } + @SuppressWarnings({"unchecked", "unused"}) static IS createState( TypeSerializer namespaceSerializer, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 64ce823d03fab..e7e1d2519f284 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -239,6 +239,18 @@ public Map.Entry next() { }; } + @Override + public boolean isEmpty() { + final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace(); + + try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) { + + iterator.seek(prefixBytes); + + return !iterator.isValid() || !startWithKeyPrefix(prefixBytes, iterator.key()); + } + } + @Override public void clear() { try { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java index 7feb07bffca42..b7a37042adfbf 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/dataview/MapView.java @@ -203,6 +203,17 @@ public Iterator> iterator() throws Exception { return map.entrySet().iterator(); } + /** + * Returns true if the map view contains no key-value mappings, otherwise false. + * + * @return True if the map view contains no key-value mappings, otherwise false. + * + * @throws Exception Thrown if the system cannot access the state. + */ + public boolean isEmpty() throws Exception { + return map.isEmpty(); + } + /** * Removes all entries of this map. */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala index 22f5f0b23459f..2096cf630d604 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/dataview/StateMapView.scala @@ -50,5 +50,7 @@ class StateMapView[K, V](state: MapState[K, V]) extends MapView[K, V] { override def iterator: util.Iterator[util.Map.Entry[K, V]] = state.iterator() + override def isEmpty(): Boolean = state.isEmpty + override def clear(): Unit = state.clear() } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala index f6911096f9371..73c876067f474 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala @@ -169,7 +169,7 @@ class TemporalRowtimeJoin( // if we have more state at any side, then update the timer, else clean it up. if (stateCleaningEnabled) { - if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) { + if (lastUnprocessedTime < Long.MaxValue || !rightState.isEmpty) { registerProcessingCleanUpTimer() } else { cleanUpLastTimer() diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java index c7f2686c35f61..16d96d6297da6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/dataview/StateMapView.java @@ -107,6 +107,11 @@ public Iterator> iterator() throws Exception { return original != null ? original : emptyState.entrySet().iterator(); } + @Override + public boolean isEmpty() throws Exception { + return getMapState().isEmpty(); + } + @Override public void clear() { getMapState().clear(); @@ -191,6 +196,11 @@ public Iterator> iterator() throws Exception { return new NullAwareMapIterator<>(getMapState().iterator(), new NullMapEntryImpl()); } + @Override + public boolean isEmpty() throws Exception { + return getMapState().isEmpty() && getNullState().value() == null; + } + @Override public void clear() { getMapState().clear(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java index 64be99bb6ae42..f55fdd15b3a12 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperator.java @@ -189,7 +189,7 @@ public void onEventTime(InternalTimer timer) throws Excep // if we have more state at any side, then update the timer, else clean it up. if (stateCleaningEnabled) { - if (lastUnprocessedTime < Long.MAX_VALUE || rightState.iterator().hasNext()) { + if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) { registerProcessingCleanupTimer(); } else { cleanupLastTimer(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java index e5a3216c72740..297f8bddc8b2d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/over/AbstractRowTimeUnboundedPrecedingOver.java @@ -157,8 +157,7 @@ public void onTimer( if (stateCleaningEnabled) { // we check whether there are still records which have not been processed yet - boolean noRecordsToProcess = !inputState.keys().iterator().hasNext(); - if (noRecordsToProcess) { + if (inputState.isEmpty()) { // we clean the state cleanupState(inputState, accState); function.cleanup(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java index 7a502b6c06dc1..e53751f9b3c42 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/MergingWindowSetTest.java @@ -417,6 +417,11 @@ public Iterator> iterator() throws Exception { return map.entrySet().iterator(); } + @Override + public boolean isEmpty() throws Exception { + return map.isEmpty(); + } + @Override public void clear() { map.clear(); @@ -588,6 +593,11 @@ public Iterator> iterator() throws Exception { return internalMap.entrySet().iterator(); } + @Override + public boolean isEmpty() { + return internalMap.isEmpty(); + } + @Override public void clear() { internalMap.clear();