Skip to content

Commit

Permalink
[FLINK-13034][state backends] Add isEmpty method for MapState
Browse files Browse the repository at this point in the history
This closes apache#9255
  • Loading branch information
Myasuka authored and StephanEwen committed Nov 6, 2019
1 parent 3a39d9c commit 051692a
Show file tree
Hide file tree
Showing 20 changed files with 130 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ added using `add(T)` are folded into an aggregate using a specified `FoldFunctio
retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
You can also use `isEmpty()` to check whether this map contains any key-value mappings.

All types of state also have a method `clear()` that clears the state for the currently
active key, i.e. the key of the input element.
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/stream/state/state.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ managed keyed state 接口提供不同类型状态的访问接口,这些状态
接口与 `ListState` 类似,但使用`add(T)`添加的元素会用指定的 `FoldFunction` 折叠成聚合值。

* `MapState<UK, UV>`: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 `put(UK,UV)` 或者 `putAll(Map<UK,UV>)` 添加映射。
使用 `get(UK)` 检索特定 key。 使用 `entries()``keys()``values()` 分别检索映射、键和值的可迭代视图。
使用 `get(UK)` 检索特定 key。 使用 `entries()``keys()``values()` 分别检索映射、键和值的可迭代视图。你还可以通过 `isEmpty()` 来判断是否包含任何键值对。

所有类型的状态还有一个`clear()` 方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,13 @@ public interface MapState<UK, UV> extends State {
* @throws Exception Thrown if the system cannot access the state.
*/
Iterator<Map.Entry<UK, UV>> iterator() throws Exception;

/**
* Returns true if this state contains no key-value mappings, otherwise false.
*
* @return True if this state contains no key-value mappings, otherwise false.
*
* @throws Exception Thrown if the system cannot access the state.
*/
boolean isEmpty() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
@VisibleForTesting
boolean hasNonEmptyPQ(KEY key) throws Exception {
setCurrentKey(key);
return elementQueueState.keys().iterator().hasNext();
return !elementQueueState.isEmpty();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,15 @@ public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
return new CountingIterator<>(values.entrySet().iterator());
}

@Override
public boolean isEmpty() throws Exception {
if (values == null) {
return true;
}

return values.isEmpty();
}

@Override
public void clear() {
stateWrites++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public Iterator<Map.Entry<K, V>> iterator() {
return Collections.unmodifiableSet(state.entrySet()).iterator();
}

@Override
public boolean isEmpty() {
return state.isEmpty();
}

@Override
public void clear() {
throw MODIFICATION_ATTEMPT_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
Expand Down Expand Up @@ -186,4 +187,9 @@ public void testClear() throws Exception {

mapState.clear();
}

@Test
public void testIsEmpty() throws Exception {
assertFalse(mapState.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,9 @@ public Iterator<Map.Entry<K, V>> iterator() throws Exception {
Iterator<Map.Entry<K, V>> original = originalState.iterator();
return original != null ? original : emptyState.entrySet().iterator();
}

@Override
public boolean isEmpty() throws Exception {
return originalState.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public Iterator<Map.Entry<UK, UV>> iterator() {
return userMap == null ? null : userMap.entrySet().iterator();
}

@Override
public boolean isEmpty() {
Map<UK, UV> userMap = stateTable.get(currentNamespace);
return userMap == null || userMap.isEmpty();
}

@Override
public byte[] getSerializedValue(
final byte[] serializedKeyAndNamespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
return entries().iterator();
}

@Override
public boolean isEmpty() throws Exception {
accessCallback.run();
return original.isEmpty();
}

@Nullable
@Override
public Map<UK, TtlValue<UV>> getUnexpiredOrNull(@Nonnull Map<UK, TtlValue<UV>> ttlValue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2573,7 +2573,6 @@ public void testMapState() throws Exception {
List<Integer> expectedKeys = Arrays.asList(103, 1031, 1032);
assertEquals(keys.size(), expectedKeys.size());
keys.removeAll(expectedKeys);
assertTrue(keys.isEmpty());

List<String> values = new ArrayList<>();
for (String value : state.values()) {
Expand All @@ -2582,7 +2581,6 @@ public void testMapState() throws Exception {
List<String> expectedValues = Arrays.asList("103", "1031", "1032");
assertEquals(values.size(), expectedValues.size());
values.removeAll(expectedValues);
assertTrue(values.isEmpty());

// make some more modifications
backend.setCurrentKey("1");
Expand Down Expand Up @@ -2655,6 +2653,34 @@ public void testMapState() throws Exception {
backend.dispose();
}

@Test
public void testMapStateIsEmpty() throws Exception {
MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);

AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

try {
MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
assertTrue(state.isEmpty());

int stateSize = 1024;
for (int i = 0; i < stateSize; i++) {
state.put(i, i * 2L);
assertFalse(state.isEmpty());
}

for (int i = 0; i < stateSize; i++) {
assertFalse(state.isEmpty());
state.remove(i);
}
assertTrue(state.isEmpty());

} finally {
backend.dispose();
}
}

/**
* Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public Iterator<Map.Entry<UK, UV>> iterator() {
return entries().iterator();
}

@Override
public boolean isEmpty() {
return getInternal().isEmpty();
}

@SuppressWarnings({"unchecked", "unused"})
static <N, T, S extends State, IS extends S> IS createState(
TypeSerializer<N> namespaceSerializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ public Map.Entry<UK, UV> next() {
};
}

@Override
public boolean isEmpty() {
final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {

iterator.seek(prefixBytes);

return !iterator.isValid() || !startWithKeyPrefix(prefixBytes, iterator.key());
}
}

@Override
public void clear() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ public Iterator<Map.Entry<K, V>> iterator() throws Exception {
return map.entrySet().iterator();
}

/**
* Returns true if the map view contains no key-value mappings, otherwise false.
*
* @return True if the map view contains no key-value mappings, otherwise false.
*
* @throws Exception Thrown if the system cannot access the state.
*/
public boolean isEmpty() throws Exception {
return map.isEmpty();
}

/**
* Removes all entries of this map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,7 @@ class StateMapView[K, V](state: MapState[K, V]) extends MapView[K, V] {

override def iterator: util.Iterator[util.Map.Entry[K, V]] = state.iterator()

override def isEmpty(): Boolean = state.isEmpty

override def clear(): Unit = state.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class TemporalRowtimeJoin(

// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
if (lastUnprocessedTime < Long.MaxValue || rightState.iterator().hasNext) {
if (lastUnprocessedTime < Long.MaxValue || !rightState.isEmpty) {
registerProcessingCleanUpTimer()
} else {
cleanUpLastTimer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public Iterator<Map.Entry<MK, MV>> iterator() throws Exception {
return original != null ? original : emptyState.entrySet().iterator();
}

@Override
public boolean isEmpty() throws Exception {
return getMapState().isEmpty();
}

@Override
public void clear() {
getMapState().clear();
Expand Down Expand Up @@ -191,6 +196,11 @@ public Iterator<Map.Entry<MK, MV>> iterator() throws Exception {
return new NullAwareMapIterator<>(getMapState().iterator(), new NullMapEntryImpl());
}

@Override
public boolean isEmpty() throws Exception {
return getMapState().isEmpty() && getNullState().value() == null;
}

@Override
public void clear() {
getMapState().clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void onEventTime(InternalTimer<Object, VoidNamespace> timer) throws Excep

// if we have more state at any side, then update the timer, else clean it up.
if (stateCleaningEnabled) {
if (lastUnprocessedTime < Long.MAX_VALUE || rightState.iterator().hasNext()) {
if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {
registerProcessingCleanupTimer();
} else {
cleanupLastTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ public void onTimer(
if (stateCleaningEnabled) {

// we check whether there are still records which have not been processed yet
boolean noRecordsToProcess = !inputState.keys().iterator().hasNext();
if (noRecordsToProcess) {
if (inputState.isEmpty()) {
// we clean the state
cleanupState(inputState, accState);
function.cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ public Iterator<Map.Entry<K, V>> iterator() throws Exception {
return map.entrySet().iterator();
}

@Override
public boolean isEmpty() throws Exception {
return map.isEmpty();
}

@Override
public void clear() {
map.clear();
Expand Down Expand Up @@ -588,6 +593,11 @@ public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
return internalMap.entrySet().iterator();
}

@Override
public boolean isEmpty() {
return internalMap.isEmpty();
}

@Override
public void clear() {
internalMap.clear();
Expand Down

0 comments on commit 051692a

Please sign in to comment.