diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java index 2481fdcc2e41b..f72c50a9d88d9 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java @@ -167,18 +167,20 @@ class FsStateChangelogWriter implements StateChangelogWriter= preEmptivePersistThresholdInBytes) { - LOG.debug( - "pre-emptively flush {}MB of appended changes to the common store", - activeChangeSetSize / 1024 / 1024); - persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : notUploaded.firstKey()); - } + activeChangeSet.add(StateChange.ofDataChange(keyGroup, value)); + preEmptiveFlushIfNeeded(value); } @Override @@ -208,6 +210,16 @@ public CompletableFuture> persist return persistInternal(from); } + private void preEmptiveFlushIfNeeded(byte[] value) throws IOException { + activeChangeSetSize += value.length; + if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) { + LOG.debug( + "pre-emptively flush {}MB of appended changes to the common store", + activeChangeSetSize / 1024 / 1024); + persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : notUploaded.firstKey()); + } + } + private CompletableFuture> persistInternal( SequenceNumber from) throws IOException { ensureCanPersist(from); diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java index 58d142305cd52..3c4d5cb783512 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java @@ -36,10 +36,10 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.TreeMap; import java.util.stream.Collectors; import static java.util.Comparator.comparing; +import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP; /** Serialization format for state changes. */ @Internal @@ -68,16 +68,27 @@ private void writeChangeSet(DataOutputViewStreamWrapper output, List> byKeyGroup = changes.stream().collect(Collectors.groupingBy(StateChange::getKeyGroup)); - // sort groups to output metadata first (see StateChangeLoggerImpl.COMMON_KEY_GROUP) - Map> sorted = new TreeMap<>(byKeyGroup); - output.writeInt(sorted.size()); - for (Map.Entry> entry : sorted.entrySet()) { - output.writeInt(entry.getValue().size()); - output.writeInt(entry.getKey()); - for (StateChange stateChange : entry.getValue()) { - output.writeInt(stateChange.getChange().length); - output.write(stateChange.getChange()); - } + // write the number of key groups + output.writeInt(byKeyGroup.size()); + // output metadata first (see StateChange.META_KEY_GROUP) + List meta = byKeyGroup.remove(META_KEY_GROUP); + if (meta != null) { + writeChangeSetOfKG(output, META_KEY_GROUP, meta); + } + // output changeSets + for (Map.Entry> entry : byKeyGroup.entrySet()) { + writeChangeSetOfKG(output, entry.getKey(), entry.getValue()); + } + } + + private void writeChangeSetOfKG( + DataOutputViewStreamWrapper output, int keyGroup, List stateChanges) + throws IOException { + output.writeInt(stateChanges.size()); + output.writeInt(keyGroup); + for (StateChange stateChange : stateChanges) { + output.writeInt(stateChange.getChange().length); + output.write(stateChange.getChange()); } } @@ -124,7 +135,9 @@ private StateChange readChange() throws IOException { int size = input.readInt(); byte[] bytes = new byte[size]; IOUtils.readFully(input, bytes, 0, size); - return new StateChange(keyGroup, bytes); + return keyGroup == META_KEY_GROUP + ? StateChange.ofMetadataChange(bytes) + : StateChange.ofDataChange(keyGroup, bytes); } @Override diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java index d70c3f63082e5..c2252b196061a 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java @@ -364,7 +364,7 @@ private List getChanges(int size) { new StateChangeSet( UUID.randomUUID(), SequenceNumber.of(0), - singletonList(new StateChange(0, change)))); + singletonList(StateChange.ofDataChange(0, change)))); } private static void withStore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 37f22ee95ad2c..8f5be0f46a7fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -74,6 +74,7 @@ import java.util.UUID; import static org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.UNKNOWN_CHECKPOINTED_SIZE; +import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP; /** * Base (De)serializer for checkpoint metadata format version 2 and 3. @@ -494,7 +495,11 @@ static KeyedStateHandle deserializeKeyedStateHandle( int bytesSize = dis.readInt(); byte[] bytes = new byte[bytesSize]; IOUtils.readFully(dis, bytes, 0, bytesSize); - changes.add(new StateChange(keyGroup, bytes)); + StateChange stateChange = + keyGroup == META_KEY_GROUP + ? StateChange.ofMetadataChange(bytes) + : StateChange.ofDataChange(keyGroup, bytes); + changes.add(stateChange); } StateHandleID stateHandleId = new StateHandleID(dis.readUTF()); return InMemoryChangelogStateHandle.restore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java index 8859844453850..d9386f7844ed1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChange.java @@ -26,18 +26,33 @@ @Internal public class StateChange implements Serializable { + /* For metadata, see FLINK-23035.*/ + public static final int META_KEY_GROUP = -1; + private static final long serialVersionUID = 1L; private final int keyGroup; private final byte[] change; - public StateChange(int keyGroup, byte[] change) { - // todo: enable check in FLINK-23035 - // Preconditions.checkArgument(keyGroup >= 0); + StateChange(byte[] change) { + this.keyGroup = META_KEY_GROUP; + this.change = Preconditions.checkNotNull(change); + } + + StateChange(int keyGroup, byte[] change) { + Preconditions.checkArgument(keyGroup >= 0); this.keyGroup = keyGroup; this.change = Preconditions.checkNotNull(change); } + public static StateChange ofMetadataChange(byte[] change) { + return new StateChange(change); + } + + public static StateChange ofDataChange(int keyGroup, byte[] change) { + return new StateChange(keyGroup, change); + } + @Override public String toString() { return String.format("keyGroup=%d, dataSize=%d", keyGroup, change.length); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java index 756528139d231..337047d6d055b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java @@ -36,6 +36,9 @@ public interface StateChangelogWriter exten */ SequenceNumber nextSequenceNumber(); + /** Appends the provided **metadata** to this log. No persistency guarantees. */ + void appendMeta(byte[] value) throws IOException; + /** Appends the provided data to this log. No persistency guarantees. */ void append(int keyGroup, byte[] value) throws IOException; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java index eb07ba477167a..7fdb33076400b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.NotThreadSafe; +import java.io.IOException; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -41,6 +42,7 @@ import java.util.stream.Stream; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP; @NotThreadSafe class InMemoryStateChangelogWriter implements StateChangelogWriter { @@ -57,6 +59,16 @@ public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) { this.keyGroupRange = keyGroupRange; } + @Override + public void appendMeta(byte[] value) throws IOException { + Preconditions.checkState(!closed, "LogWriter is closed"); + LOG.trace("append metadata: {} bytes", value.length); + changesByKeyGroup + .computeIfAbsent(META_KEY_GROUP, unused -> new TreeMap<>()) + .put(sqn, value); + sqn = sqn.next(); + } + @Override public void append(int keyGroup, byte[] value) { Preconditions.checkState(!closed, "LogWriter is closed"); @@ -96,8 +108,16 @@ private List collectChanges(SequenceNumber after) { private Stream> toChangeStream( NavigableMap changeMap, SequenceNumber after, int keyGroup) { + if (keyGroup == META_KEY_GROUP) { + return changeMap.tailMap(after, true).entrySet().stream() + .map(e2 -> Tuple2.of(e2.getKey(), StateChange.ofMetadataChange(e2.getValue()))); + } return changeMap.tailMap(after, true).entrySet().stream() - .map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup, e2.getValue()))); + .map( + e2 -> + Tuple2.of( + e2.getKey(), + StateChange.ofDataChange(keyGroup, e2.getValue()))); } @Override diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java index c3f4abe5eee0e..1f84a91228442 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java +++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java @@ -50,7 +50,6 @@ abstract class AbstractStateChangeLogger implements StateChangeLogger, Closeable { - static final int COMMON_KEY_GROUP = -1; protected final StateChangelogWriter stateChangelogWriter; protected final InternalKeyContext keyContext; protected RegisteredStateMetaInfoBase metaInfo; @@ -158,10 +157,7 @@ protected void log( private void logMetaIfNeeded() throws IOException { if (!metaDataWritten) { - // todo: add StateChangelogWriter.append() version without a keygroup - // when all callers and implementers are merged (FLINK-21356 or later) - stateChangelogWriter.append( - COMMON_KEY_GROUP, + stateChangelogWriter.appendMeta( serializeRaw( out -> { out.writeByte(METADATA.getCode()); diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java index 9813409475f98..e00b85c1ff858 100644 --- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java +++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java @@ -31,7 +31,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import static org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP; +import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP; import static org.apache.flink.state.changelog.StateChangeOperation.METADATA; import static org.junit.Assert.assertEquals; @@ -45,7 +45,7 @@ public void testMetadataOperationLogged() throws IOException { try (StateChangeLogger logger = getLogger(writer, keyContext)) { List> expectedAppends = new ArrayList<>(); - expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA)); + expectedAppends.add(Tuple2.of(META_KEY_GROUP, METADATA)); // log every applicable operations, several times each int numOpTypes = StateChangeOperation.values().length; @@ -104,6 +104,11 @@ protected Optional> log( protected static class TestingStateChangelogWriter implements StateChangelogWriter { private final List> appends = new ArrayList<>(); + @Override + public void appendMeta(byte[] value) { + appends.add(Tuple2.of(META_KEY_GROUP, StateChangeOperation.byCode(value[0]))); + } + @Override public void append(int keyGroup, byte[] value) { appends.add(Tuple2.of(keyGroup, StateChangeOperation.byCode(value[0])));