Skip to content

Commit

Permalink
[FLINK-23035][state/changelog] Add explicit append() to StateChangelo…
Browse files Browse the repository at this point in the history
…gWriter to write metadata
  • Loading branch information
fredia authored and curcur committed Dec 16, 2022
1 parent a6a3217 commit 6b357b4
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,18 +167,20 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
this.localChangelogRegistry = localChangelogRegistry;
}

@Override
public void appendMeta(byte[] value) throws IOException {
LOG.trace("append metadata to {}: {} bytes", logId, value.length);
checkState(!closed, "%s is closed", logId);
activeChangeSet.add(StateChange.ofMetadataChange(value));
preEmptiveFlushIfNeeded(value);
}

@Override
public void append(int keyGroup, byte[] value) throws IOException {
LOG.trace("append to {}: keyGroup={} {} bytes", logId, keyGroup, value.length);
checkState(!closed, "%s is closed", logId);
activeChangeSet.add(new StateChange(keyGroup, value));
activeChangeSetSize += value.length;
if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
LOG.debug(
"pre-emptively flush {}MB of appended changes to the common store",
activeChangeSetSize / 1024 / 1024);
persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : notUploaded.firstKey());
}
activeChangeSet.add(StateChange.ofDataChange(keyGroup, value));
preEmptiveFlushIfNeeded(value);
}

@Override
Expand Down Expand Up @@ -208,6 +210,16 @@ public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist
return persistInternal(from);
}

private void preEmptiveFlushIfNeeded(byte[] value) throws IOException {
activeChangeSetSize += value.length;
if (activeChangeSetSize >= preEmptivePersistThresholdInBytes) {
LOG.debug(
"pre-emptively flush {}MB of appended changes to the common store",
activeChangeSetSize / 1024 / 1024);
persistInternal(notUploaded.isEmpty() ? activeSequenceNumber : notUploaded.firstKey());
}
}

private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistInternal(
SequenceNumber from) throws IOException {
ensureCanPersist(from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static java.util.Comparator.comparing;
import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;

/** Serialization format for state changes. */
@Internal
Expand Down Expand Up @@ -68,16 +68,27 @@ private void writeChangeSet(DataOutputViewStreamWrapper output, List<StateChange
// write in groups to output kg id only once
Map<Integer, List<StateChange>> byKeyGroup =
changes.stream().collect(Collectors.groupingBy(StateChange::getKeyGroup));
// sort groups to output metadata first (see StateChangeLoggerImpl.COMMON_KEY_GROUP)
Map<Integer, List<StateChange>> sorted = new TreeMap<>(byKeyGroup);
output.writeInt(sorted.size());
for (Map.Entry<Integer, List<StateChange>> entry : sorted.entrySet()) {
output.writeInt(entry.getValue().size());
output.writeInt(entry.getKey());
for (StateChange stateChange : entry.getValue()) {
output.writeInt(stateChange.getChange().length);
output.write(stateChange.getChange());
}
// write the number of key groups
output.writeInt(byKeyGroup.size());
// output metadata first (see StateChange.META_KEY_GROUP)
List<StateChange> meta = byKeyGroup.remove(META_KEY_GROUP);
if (meta != null) {
writeChangeSetOfKG(output, META_KEY_GROUP, meta);
}
// output changeSets
for (Map.Entry<Integer, List<StateChange>> entry : byKeyGroup.entrySet()) {
writeChangeSetOfKG(output, entry.getKey(), entry.getValue());
}
}

private void writeChangeSetOfKG(
DataOutputViewStreamWrapper output, int keyGroup, List<StateChange> stateChanges)
throws IOException {
output.writeInt(stateChanges.size());
output.writeInt(keyGroup);
for (StateChange stateChange : stateChanges) {
output.writeInt(stateChange.getChange().length);
output.write(stateChange.getChange());
}
}

Expand Down Expand Up @@ -124,7 +135,9 @@ private StateChange readChange() throws IOException {
int size = input.readInt();
byte[] bytes = new byte[size];
IOUtils.readFully(input, bytes, 0, size);
return new StateChange(keyGroup, bytes);
return keyGroup == META_KEY_GROUP
? StateChange.ofMetadataChange(bytes)
: StateChange.ofDataChange(keyGroup, bytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ private List<StateChangeSet> getChanges(int size) {
new StateChangeSet(
UUID.randomUUID(),
SequenceNumber.of(0),
singletonList(new StateChange(0, change))));
singletonList(StateChange.ofDataChange(0, change))));
}

private static void withStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.UUID;

import static org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle.UNKNOWN_CHECKPOINTED_SIZE;
import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;

/**
* Base (De)serializer for checkpoint metadata format version 2 and 3.
Expand Down Expand Up @@ -494,7 +495,11 @@ static KeyedStateHandle deserializeKeyedStateHandle(
int bytesSize = dis.readInt();
byte[] bytes = new byte[bytesSize];
IOUtils.readFully(dis, bytes, 0, bytesSize);
changes.add(new StateChange(keyGroup, bytes));
StateChange stateChange =
keyGroup == META_KEY_GROUP
? StateChange.ofMetadataChange(bytes)
: StateChange.ofDataChange(keyGroup, bytes);
changes.add(stateChange);
}
StateHandleID stateHandleId = new StateHandleID(dis.readUTF());
return InMemoryChangelogStateHandle.restore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,33 @@
@Internal
public class StateChange implements Serializable {

/* For metadata, see FLINK-23035.*/
public static final int META_KEY_GROUP = -1;

private static final long serialVersionUID = 1L;

private final int keyGroup;
private final byte[] change;

public StateChange(int keyGroup, byte[] change) {
// todo: enable check in FLINK-23035
// Preconditions.checkArgument(keyGroup >= 0);
StateChange(byte[] change) {
this.keyGroup = META_KEY_GROUP;
this.change = Preconditions.checkNotNull(change);
}

StateChange(int keyGroup, byte[] change) {
Preconditions.checkArgument(keyGroup >= 0);
this.keyGroup = keyGroup;
this.change = Preconditions.checkNotNull(change);
}

public static StateChange ofMetadataChange(byte[] change) {
return new StateChange(change);
}

public static StateChange ofDataChange(int keyGroup, byte[] change) {
return new StateChange(keyGroup, change);
}

@Override
public String toString() {
return String.format("keyGroup=%d, dataSize=%d", keyGroup, change.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public interface StateChangelogWriter<Handle extends ChangelogStateHandle> exten
*/
SequenceNumber nextSequenceNumber();

/** Appends the provided **metadata** to this log. No persistency guarantees. */
void appendMeta(byte[] value) throws IOException;

/** Appends the provided data to this log. No persistency guarantees. */
void append(int keyGroup, byte[] value) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -41,6 +42,7 @@
import java.util.stream.Stream;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;

@NotThreadSafe
class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChangelogStateHandle> {
Expand All @@ -57,6 +59,16 @@ public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) {
this.keyGroupRange = keyGroupRange;
}

@Override
public void appendMeta(byte[] value) throws IOException {
Preconditions.checkState(!closed, "LogWriter is closed");
LOG.trace("append metadata: {} bytes", value.length);
changesByKeyGroup
.computeIfAbsent(META_KEY_GROUP, unused -> new TreeMap<>())
.put(sqn, value);
sqn = sqn.next();
}

@Override
public void append(int keyGroup, byte[] value) {
Preconditions.checkState(!closed, "LogWriter is closed");
Expand Down Expand Up @@ -96,8 +108,16 @@ private List<StateChange> collectChanges(SequenceNumber after) {

private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(
NavigableMap<SequenceNumber, byte[]> changeMap, SequenceNumber after, int keyGroup) {
if (keyGroup == META_KEY_GROUP) {
return changeMap.tailMap(after, true).entrySet().stream()
.map(e2 -> Tuple2.of(e2.getKey(), StateChange.ofMetadataChange(e2.getValue())));
}
return changeMap.tailMap(after, true).entrySet().stream()
.map(e2 -> Tuple2.of(e2.getKey(), new StateChange(keyGroup, e2.getValue())));
.map(
e2 ->
Tuple2.of(
e2.getKey(),
StateChange.ofDataChange(keyGroup, e2.getValue())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeLogger<Value, Ns>, Closeable {
static final int COMMON_KEY_GROUP = -1;
protected final StateChangelogWriter<?> stateChangelogWriter;
protected final InternalKeyContext<Key> keyContext;
protected RegisteredStateMetaInfoBase metaInfo;
Expand Down Expand Up @@ -158,10 +157,7 @@ protected void log(

private void logMetaIfNeeded() throws IOException {
if (!metaDataWritten) {
// todo: add StateChangelogWriter.append() version without a keygroup
// when all callers and implementers are merged (FLINK-21356 or later)
stateChangelogWriter.append(
COMMON_KEY_GROUP,
stateChangelogWriter.appendMeta(
serializeRaw(
out -> {
out.writeByte(METADATA.getCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP;
import static org.apache.flink.runtime.state.changelog.StateChange.META_KEY_GROUP;
import static org.apache.flink.state.changelog.StateChangeOperation.METADATA;
import static org.junit.Assert.assertEquals;

Expand All @@ -45,7 +45,7 @@ public void testMetadataOperationLogged() throws IOException {

try (StateChangeLogger<String, Namespace> logger = getLogger(writer, keyContext)) {
List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new ArrayList<>();
expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));
expectedAppends.add(Tuple2.of(META_KEY_GROUP, METADATA));

// log every applicable operations, several times each
int numOpTypes = StateChangeOperation.values().length;
Expand Down Expand Up @@ -104,6 +104,11 @@ protected Optional<Tuple2<Integer, StateChangeOperation>> log(
protected static class TestingStateChangelogWriter implements StateChangelogWriter {
private final List<Tuple2<Integer, StateChangeOperation>> appends = new ArrayList<>();

@Override
public void appendMeta(byte[] value) {
appends.add(Tuple2.of(META_KEY_GROUP, StateChangeOperation.byCode(value[0])));
}

@Override
public void append(int keyGroup, byte[] value) {
appends.add(Tuple2.of(keyGroup, StateChangeOperation.byCode(value[0])));
Expand Down

0 comments on commit 6b357b4

Please sign in to comment.