From be1da48d084941733dfd0121bdb8bc1812d0d38f Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Wed, 8 Sep 2021 17:50:08 +0200 Subject: [PATCH] [FLINK-22944][state] Re-use output in StateChangeLogger --- .../changelog/AbstractStateChangeLogger.java | 25 ++++++++++++++----- .../changelog/ChangelogKeyedStateBackend.java | 8 +++++- .../state/changelog/StateChangeLogger.java | 3 ++- .../state/changelog/ChangelogPqStateTest.java | 3 +++ .../changelog/StateChangeLoggerTestBase.java | 24 ++++++++++-------- .../state/changelog/TestChangeLoggerKv.java | 3 +++ 6 files changed, 47 insertions(+), 19 deletions(-) diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java index b6d9408aed61b..a27c4800511f5 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java @@ -27,9 +27,12 @@ import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters; import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.IOException; import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.KEY_VALUE; @@ -45,12 +48,15 @@ import static org.apache.flink.state.changelog.StateChangeOperation.SET_INTERNAL; import static org.apache.flink.util.Preconditions.checkNotNull; -abstract class AbstractStateChangeLogger implements StateChangeLogger { +abstract class AbstractStateChangeLogger + implements StateChangeLogger, Closeable { static final int COMMON_KEY_GROUP = -1; protected final StateChangelogWriter stateChangelogWriter; protected final InternalKeyContext keyContext; protected final RegisteredStateMetaInfoBase metaInfo; private final StateMetaInfoSnapshot.BackendStateType stateType; + private final ByteArrayOutputStream out = new ByteArrayOutputStream(); + private final DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out); private boolean metaDataWritten = false; private final short stateShortId; @@ -183,11 +189,18 @@ protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper out) private byte[] serializeRaw( ThrowingConsumer dataWriter) throws IOException { - // todo: optimize performance - try (ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out)) { - dataWriter.accept(wrapper); - return out.toByteArray(); + dataWriter.accept(wrapper); + wrapper.flush(); + byte[] bytes = out.toByteArray(); + out.reset(); + return bytes; + } + + @Override + public void close() throws IOException { + try (Closer closer = Closer.create()) { + closer.register(wrapper); + closer.register(out); } } } diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java index 6f8de49a6cf92..cc786fe3ed034 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java @@ -60,6 +60,8 @@ import org.apache.flink.state.changelog.restore.FunctionDelegationHelper; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.shaded.guava30.com.google.common.io.Closer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +142,7 @@ public class ChangelogKeyedStateBackend private final TtlTimeProvider ttlTimeProvider; private final StateChangelogWriter stateChangelogWriter; + private final Closer closer = Closer.create(); private long lastCheckpointId = -1L; @@ -216,6 +219,7 @@ public ChangelogKeyedStateBackend( this.mainMailboxExecutor = checkNotNull(mainMailboxExecutor); this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool); this.completeRestore(initialState); + this.closer.register(keyedStateBackend); } // -------------------- CheckpointableKeyedStateBackend -------------------------------- @@ -226,7 +230,7 @@ public KeyGroupRange getKeyGroupRange() { @Override public void close() throws IOException { - keyedStateBackend.close(); + closer.close(); } @Override @@ -388,6 +392,7 @@ KeyGroupedInternalPriorityQueue create( new RegisteredPriorityQueueStateBackendMetaInfo<>( stateName, byteOrderedElementSerializer), ++lastCreatedStateId); + closer.register(priorityQueueStateChangeLogger); queue = new ChangelogKeyGroupedPriorityQueue<>( keyedStateBackend.create(stateName, byteOrderedElementSerializer), @@ -516,6 +521,7 @@ public IS createInternalState( stateDesc.getTtlConfig(), stateDesc.getDefaultValue(), ++lastCreatedStateId); + closer.register(kvStateChangeLogger); IS is = stateFactory.create( state, diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java index 0c1e8613c52dd..2e129a94a0a88 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/StateChangeLogger.java @@ -21,6 +21,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.function.ThrowingConsumer; +import java.io.Closeable; import java.io.IOException; /** @@ -41,7 +42,7 @@ * @param type of state (value) * @param type of namespace */ -interface StateChangeLogger { +interface StateChangeLogger extends Closeable { /** State updated, such as by {@link ListState#update}. */ void valueUpdated(Value newValue, Namespace ns) throws IOException; diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java index 6157948e52a90..1b46c280a2526 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogPqStateTest.java @@ -169,6 +169,9 @@ public void valueElementRemoved( public boolean anythingChanged() { return stateElementChanged || stateElementRemoved || stateCleared; } + + @Override + public void close() {} } private static class TestingInternalQueueState diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java index ae877452aeb13..30004ae06239c 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java @@ -42,19 +42,21 @@ public void testMetadataOperationLogged() throws IOException { TestingStateChangelogWriter writer = new TestingStateChangelogWriter(); InternalKeyContextImpl keyContext = new InternalKeyContextImpl<>(KeyGroupRange.of(1, 1000), 1000); - StateChangeLogger logger = getLogger(writer, keyContext); - List> expectedAppends = new ArrayList<>(); - expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA)); - - // log every applicable operations, several times each - int numOpTypes = StateChangeOperation.values().length; - for (int i = 0; i < numOpTypes * 7; i++) { - String element = Integer.toString(i); - StateChangeOperation operation = StateChangeOperation.byCode((byte) (i % numOpTypes)); - log(operation, element, logger, keyContext).ifPresent(expectedAppends::add); + try (StateChangeLogger logger = getLogger(writer, keyContext)) { + List> expectedAppends = new ArrayList<>(); + expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA)); + + // log every applicable operations, several times each + int numOpTypes = StateChangeOperation.values().length; + for (int i = 0; i < numOpTypes * 7; i++) { + String element = Integer.toString(i); + StateChangeOperation operation = + StateChangeOperation.byCode((byte) (i % numOpTypes)); + log(operation, element, logger, keyContext).ifPresent(expectedAppends::add); + } + assertEquals(expectedAppends, writer.appends); } - assertEquals(expectedAppends, writer.appends); } protected abstract StateChangeLogger getLogger( diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java index 6683609ffa402..ed5b5da0efe81 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/TestChangeLoggerKv.java @@ -131,4 +131,7 @@ public boolean anythingChanged() { || stateElementRemoved || stateMerged; } + + @Override + public void close() {} }