Skip to content

Commit

Permalink
[FLINK-23367][state] Ensure InternalPriorityQueue#iterator could be c…
Browse files Browse the repository at this point in the history
…losed for changelog state-backend
  • Loading branch information
Myasuka committed Jul 16, 2021
1 parent 10db8b1 commit c225a8c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,8 @@ private void logAddition(Collection<? extends T> toAdd) {
@Override
@Nonnull
public CloseableIterator<T> iterator() {
return CloseableIterator.adapterForIterator(
StateChangeLoggingIterator.create(
delegatedPriorityQueue.iterator(), logger, serializer::serialize, null));
return StateChangeLoggingIterator.create(
delegatedPriorityQueue.iterator(), logger, serializer::serialize, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.state.changelog.restore.ChangelogApplierFactory;
import org.apache.flink.state.changelog.restore.StateChangeApplier;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingConsumer;

Expand Down Expand Up @@ -122,22 +123,24 @@ public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
private Iterator<Map.Entry<UK, UV>> getEntryIterator(Iterator<Map.Entry<UK, UV>> iterator) {
final N currentNamespace = getCurrentNamespace();
return StateChangeLoggingIterator.create(
new Iterator<Map.Entry<UK, UV>>() {
@Override
public Map.Entry<UK, UV> next() {
return loggingMapEntry(iterator.next(), changeLogger, currentNamespace);
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public void remove() {
iterator.remove();
}
},
CloseableIterator.adapterForIterator(
new Iterator<Map.Entry<UK, UV>>() {
@Override
public Map.Entry<UK, UV> next() {
return loggingMapEntry(
iterator.next(), changeLogger, currentNamespace);
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public void remove() {
iterator.remove();
}
}),
changeLogger,
(entry, out) -> serializeKey(entry.getKey(), out),
currentNamespace);
Expand All @@ -148,7 +151,7 @@ public Iterable<UK> keys() throws Exception {
Iterable<UK> iterable = delegatedState.keys();
return () ->
StateChangeLoggingIterator.create(
iterable.iterator(),
CloseableIterator.adapterForIterator(iterable.iterator()),
changeLogger,
this::serializeKey,
getCurrentNamespace());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@
package org.apache.flink.state.changelog;

import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Iterator;

class StateChangeLoggingIterator<State, StateElement, Namespace> implements Iterator<StateElement> {
class StateChangeLoggingIterator<State, StateElement, Namespace>
implements CloseableIterator<StateElement> {

private final Iterator<StateElement> iterator;
private final CloseableIterator<StateElement> iterator;
private final StateChangeLogger<State, Namespace> changeLogger;
private final BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException>
removalWriter;
private final Namespace ns;
@Nullable private StateElement lastReturned;

private StateChangeLoggingIterator(
Iterator<StateElement> iterator,
CloseableIterator<StateElement> iterator,
StateChangeLogger<State, Namespace> changeLogger,
BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException>
removalWriter,
Expand Down Expand Up @@ -69,12 +70,17 @@ public void remove() {
}

@Nonnull
public static <Namespace, State, StateElement> Iterator<StateElement> create(
Iterator<StateElement> iterator,
public static <Namespace, State, StateElement> CloseableIterator<StateElement> create(
CloseableIterator<StateElement> iterator,
StateChangeLogger<State, Namespace> changeLogger,
BiConsumerWithException<StateElement, DataOutputViewStreamWrapper, IOException>
removalWriter,
Namespace ns) {
return new StateChangeLoggingIterator<>(iterator, changeLogger, removalWriter, ns);
}

@Override
public void close() throws Exception {
iterator.close();
}
}

0 comments on commit c225a8c

Please sign in to comment.