Skip to content

Commit

Permalink
[FLINK-22944][state] Re-use output in StateChangeLogger
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Sep 18, 2021
1 parent 032f467 commit be1da48
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.function.ThrowingConsumer;

import org.apache.flink.shaded.guava30.com.google.common.io.Closer;

import javax.annotation.Nullable;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;

import static org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot.BackendStateType.KEY_VALUE;
Expand All @@ -45,12 +48,15 @@
import static org.apache.flink.state.changelog.StateChangeOperation.SET_INTERNAL;
import static org.apache.flink.util.Preconditions.checkNotNull;

abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeLogger<Value, Ns> {
abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeLogger<Value, Ns>, Closeable {
static final int COMMON_KEY_GROUP = -1;
protected final StateChangelogWriter<?> stateChangelogWriter;
protected final InternalKeyContext<Key> keyContext;
protected final RegisteredStateMetaInfoBase metaInfo;
private final StateMetaInfoSnapshot.BackendStateType stateType;
private final ByteArrayOutputStream out = new ByteArrayOutputStream();
private final DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out);
private boolean metaDataWritten = false;
private final short stateShortId;

Expand Down Expand Up @@ -183,11 +189,18 @@ protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper out)
private byte[] serializeRaw(
ThrowingConsumer<DataOutputViewStreamWrapper, IOException> dataWriter)
throws IOException {
// todo: optimize performance
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputViewStreamWrapper wrapper = new DataOutputViewStreamWrapper(out)) {
dataWriter.accept(wrapper);
return out.toByteArray();
dataWriter.accept(wrapper);
wrapper.flush();
byte[] bytes = out.toByteArray();
out.reset();
return bytes;
}

@Override
public void close() throws IOException {
try (Closer closer = Closer.create()) {
closer.register(wrapper);
closer.register(out);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.flink.state.changelog.restore.FunctionDelegationHelper;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.io.Closer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -140,6 +142,7 @@ public class ChangelogKeyedStateBackend<K>
private final TtlTimeProvider ttlTimeProvider;

private final StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter;
private final Closer closer = Closer.create();

private long lastCheckpointId = -1L;

Expand Down Expand Up @@ -216,6 +219,7 @@ public ChangelogKeyedStateBackend(
this.mainMailboxExecutor = checkNotNull(mainMailboxExecutor);
this.asyncOperationsThreadPool = checkNotNull(asyncOperationsThreadPool);
this.completeRestore(initialState);
this.closer.register(keyedStateBackend);
}

// -------------------- CheckpointableKeyedStateBackend --------------------------------
Expand All @@ -226,7 +230,7 @@ public KeyGroupRange getKeyGroupRange() {

@Override
public void close() throws IOException {
keyedStateBackend.close();
closer.close();
}

@Override
Expand Down Expand Up @@ -388,6 +392,7 @@ KeyGroupedInternalPriorityQueue<T> create(
new RegisteredPriorityQueueStateBackendMetaInfo<>(
stateName, byteOrderedElementSerializer),
++lastCreatedStateId);
closer.register(priorityQueueStateChangeLogger);
queue =
new ChangelogKeyGroupedPriorityQueue<>(
keyedStateBackend.create(stateName, byteOrderedElementSerializer),
Expand Down Expand Up @@ -516,6 +521,7 @@ public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
stateDesc.getTtlConfig(),
stateDesc.getDefaultValue(),
++lastCreatedStateId);
closer.register(kvStateChangeLogger);
IS is =
stateFactory.create(
state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.function.ThrowingConsumer;

import java.io.Closeable;
import java.io.IOException;

/**
Expand All @@ -41,7 +42,7 @@
* @param <Value> type of state (value)
* @param <Namespace> type of namespace
*/
interface StateChangeLogger<Value, Namespace> {
interface StateChangeLogger<Value, Namespace> extends Closeable {

/** State updated, such as by {@link ListState#update}. */
void valueUpdated(Value newValue, Namespace ns) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public void valueElementRemoved(
public boolean anythingChanged() {
return stateElementChanged || stateElementRemoved || stateCleared;
}

@Override
public void close() {}
}

private static class TestingInternalQueueState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,21 @@ public void testMetadataOperationLogged() throws IOException {
TestingStateChangelogWriter writer = new TestingStateChangelogWriter();
InternalKeyContextImpl<String> keyContext =
new InternalKeyContextImpl<>(KeyGroupRange.of(1, 1000), 1000);
StateChangeLogger<String, Namespace> logger = getLogger(writer, keyContext);

List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new ArrayList<>();
expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));

// log every applicable operations, several times each
int numOpTypes = StateChangeOperation.values().length;
for (int i = 0; i < numOpTypes * 7; i++) {
String element = Integer.toString(i);
StateChangeOperation operation = StateChangeOperation.byCode((byte) (i % numOpTypes));
log(operation, element, logger, keyContext).ifPresent(expectedAppends::add);
try (StateChangeLogger<String, Namespace> logger = getLogger(writer, keyContext)) {
List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new ArrayList<>();
expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));

// log every applicable operations, several times each
int numOpTypes = StateChangeOperation.values().length;
for (int i = 0; i < numOpTypes * 7; i++) {
String element = Integer.toString(i);
StateChangeOperation operation =
StateChangeOperation.byCode((byte) (i % numOpTypes));
log(operation, element, logger, keyContext).ifPresent(expectedAppends::add);
}
assertEquals(expectedAppends, writer.appends);
}
assertEquals(expectedAppends, writer.appends);
}

protected abstract StateChangeLogger<String, Namespace> getLogger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,7 @@ public boolean anythingChanged() {
|| stateElementRemoved
|| stateMerged;
}

@Override
public void close() {}
}

0 comments on commit be1da48

Please sign in to comment.