From c225a8c3757ed96417e721ae013165d51f463ec0 Mon Sep 17 00:00:00 2001 From: Yun Tang Date: Fri, 16 Jul 2021 12:32:07 +0800 Subject: [PATCH] [FLINK-23367][state] Ensure InternalPriorityQueue#iterator could be closed for changelog state-backend --- .../ChangelogKeyGroupedPriorityQueue.java | 5 +-- .../state/changelog/ChangelogMapState.java | 37 ++++++++++--------- .../changelog/StateChangeLoggingIterator.java | 18 ++++++--- 3 files changed, 34 insertions(+), 26 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java index eff82db8b05a2..2379f6453d6d9 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyGroupedPriorityQueue.java @@ -129,9 +129,8 @@ private void logAddition(Collection toAdd) { @Override @Nonnull public CloseableIterator iterator() { - return CloseableIterator.adapterForIterator( - StateChangeLoggingIterator.create( - delegatedPriorityQueue.iterator(), logger, serializer::serialize, null)); + return StateChangeLoggingIterator.create( + delegatedPriorityQueue.iterator(), logger, serializer::serialize, null); } @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java index 3b35dcd764597..219839fbd39ae 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogMapState.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.state.changelog.restore.ChangelogApplierFactory; import org.apache.flink.state.changelog.restore.StateChangeApplier; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.ThrowingConsumer; @@ -122,22 +123,24 @@ public Iterable> entries() throws Exception { private Iterator> getEntryIterator(Iterator> iterator) { final N currentNamespace = getCurrentNamespace(); return StateChangeLoggingIterator.create( - new Iterator>() { - @Override - public Map.Entry next() { - return loggingMapEntry(iterator.next(), changeLogger, currentNamespace); - } - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - @Override - public void remove() { - iterator.remove(); - } - }, + CloseableIterator.adapterForIterator( + new Iterator>() { + @Override + public Map.Entry next() { + return loggingMapEntry( + iterator.next(), changeLogger, currentNamespace); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public void remove() { + iterator.remove(); + } + }), changeLogger, (entry, out) -> serializeKey(entry.getKey(), out), currentNamespace); @@ -148,7 +151,7 @@ public Iterable keys() throws Exception { Iterable iterable = delegatedState.keys(); return () -> StateChangeLoggingIterator.create( - iterable.iterator(), + CloseableIterator.adapterForIterator(iterable.iterator()), changeLogger, this::serializeKey, getCurrentNamespace()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java index 38ea9438b3e1c..3484e4af717e4 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLoggingIterator.java @@ -18,6 +18,7 @@ package org.apache.flink.state.changelog; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.function.BiConsumerWithException; @@ -25,11 +26,11 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.Iterator; -class StateChangeLoggingIterator implements Iterator { +class StateChangeLoggingIterator + implements CloseableIterator { - private final Iterator iterator; + private final CloseableIterator iterator; private final StateChangeLogger changeLogger; private final BiConsumerWithException removalWriter; @@ -37,7 +38,7 @@ class StateChangeLoggingIterator implements Iter @Nullable private StateElement lastReturned; private StateChangeLoggingIterator( - Iterator iterator, + CloseableIterator iterator, StateChangeLogger changeLogger, BiConsumerWithException removalWriter, @@ -69,12 +70,17 @@ public void remove() { } @Nonnull - public static Iterator create( - Iterator iterator, + public static CloseableIterator create( + CloseableIterator iterator, StateChangeLogger changeLogger, BiConsumerWithException removalWriter, Namespace ns) { return new StateChangeLoggingIterator<>(iterator, changeLogger, removalWriter, ns); } + + @Override + public void close() throws Exception { + iterator.close(); + } }