Skip to content

Commit

Permalink
[FLINK-9489] Checkpoint timers as part of managed keyed state instead…
Browse files Browse the repository at this point in the history
… of raw keyed state

Optimization for relaxed bulk polls

Deactivate optimization for now because it still contains a bug

This closes apache#6333.
  • Loading branch information
StefanRRichter authored and tillrohrmann committed Jul 16, 2018
1 parent 0bbc91e commit dbddf00
Show file tree
Hide file tree
Showing 57 changed files with 1,251 additions and 668 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,9 @@ StreamCompressionDecorator getKeyGroupCompressionDecorator() {
@VisibleForTesting
public abstract int numStateEntries();

// TODO remove this once heap-based timers are working with RocksDB incremental snapshots!
public boolean requiresLegacySynchronousTimerSnapshots() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface BackendWritableBroadcastState<K, V> extends BroadcastState<K, V

long write(FSDataOutputStream out) throws IOException;

void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo);
void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo);

RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo();
RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K,

if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastBackendStateMetaInfo<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
Expand All @@ -227,7 +227,7 @@ public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K,
final StateMetaInfoSnapshot metaInfoSnapshot = restoredBroadcastStateMetaInfos.get(name);

@SuppressWarnings("unchecked")
RegisteredBroadcastBackendStateMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastBackendStateMetaInfo<K, V>(metaInfoSnapshot);
RegisteredBroadcastStateBackendMetaInfo<K, V> restoredMetaInfo = new RegisteredBroadcastStateBackendMetaInfo<K, V>(metaInfoSnapshot);

// check compatibility to determine if state migration is required
CompatibilityResult<K> keyCompatibility = CompatibilityUtil.resolveCompatibilityResult(
Expand All @@ -247,7 +247,7 @@ public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K,
if (!keyCompatibility.isRequiresMigration() && !valueCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
broadcastState.setStateMetaInfo(
new RegisteredBroadcastBackendStateMetaInfo<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
Expand Down Expand Up @@ -510,8 +510,8 @@ public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exc
// Recreate all PartitionableListStates from the meta info
for (StateMetaInfoSnapshot restoredSnapshot : restoredOperatorMetaInfoSnapshots) {

final RegisteredOperatorBackendStateMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
final RegisteredOperatorStateBackendMetaInfo<?> restoredMetaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);

if (restoredMetaInfo.getPartitionStateSerializer() == null ||
restoredMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
Expand Down Expand Up @@ -546,8 +546,8 @@ public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exc

for (StateMetaInfoSnapshot restoredSnapshot : restoredBroadcastMetaInfoSnapshots) {

final RegisteredBroadcastBackendStateMetaInfo<?, ?> restoredMetaInfo =
new RegisteredBroadcastBackendStateMetaInfo<>(restoredSnapshot);
final RegisteredBroadcastStateBackendMetaInfo<?, ?> restoredMetaInfo =
new RegisteredBroadcastStateBackendMetaInfo<>(restoredSnapshot);

if (restoredMetaInfo.getKeySerializer() == null || restoredMetaInfo.getValueSerializer() == null ||
restoredMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer ||
Expand Down Expand Up @@ -613,7 +613,7 @@ static final class PartitionableListState<S> implements ListState<S> {
/**
* Meta information of the state, including state name, assignment mode, and serializer
*/
private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo;

/**
* The internal list the holds the elements of the state
Expand All @@ -625,12 +625,12 @@ static final class PartitionableListState<S> implements ListState<S> {
*/
private final ArrayListSerializer<S> internalListCopySerializer;

PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this(stateMetaInfo, new ArrayList<S>());
}

private PartitionableListState(
RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo,
RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo,
ArrayList<S> internalList) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
Expand All @@ -643,11 +643,11 @@ private PartitionableListState(PartitionableListState<S> toCopy) {
this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));
}

public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}

public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() {
return stateMetaInfo;
}

Expand Down Expand Up @@ -741,7 +741,7 @@ private <S> ListState<S> getListState(
// no restored state for the state name; simply create new state holder

partitionableListState = new PartitionableListState<>(
new RegisteredOperatorBackendStateMetaInfo<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
Expand All @@ -757,8 +757,8 @@ private <S> ListState<S> getListState(
mode);

StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
RegisteredOperatorBackendStateMetaInfo<S> metaInfo =
new RegisteredOperatorBackendStateMetaInfo<>(restoredSnapshot);
RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);

// check compatibility to determine if state migration is required
TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();
Expand All @@ -772,7 +772,7 @@ private <S> ListState<S> getListState(
if (!stateCompatibility.isRequiresMigration()) {
// new serializer is compatible; use it to replace the old serializer
partitionableListState.setStateMetaInfo(
new RegisteredOperatorBackendStateMetaInfo<>(name, newPartitionStateSerializer, mode));
new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
} else {
// TODO state migration currently isn't possible.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
/**
* Meta information of the state, including state name, assignment mode, and serializer.
*/
private RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo;
private RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo;

/**
* The internal map the holds the elements of the state.
Expand All @@ -54,11 +54,11 @@ public class HeapBroadcastState<K, V> implements BackendWritableBroadcastState<K
*/
private final MapSerializer<K, V> internalMapCopySerializer;

HeapBroadcastState(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
HeapBroadcastState(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this(stateMetaInfo, new HashMap<>());
}

private HeapBroadcastState(final RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {
private HeapBroadcastState(final RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo, final Map<K, V> internalMap) {

this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo);
this.backingMap = Preconditions.checkNotNull(internalMap);
Expand All @@ -70,12 +70,12 @@ private HeapBroadcastState(HeapBroadcastState<K, V> toCopy) {
}

@Override
public void setStateMetaInfo(RegisteredBroadcastBackendStateMetaInfo<K, V> stateMetaInfo) {
public void setStateMetaInfo(RegisteredBroadcastStateBackendMetaInfo<K, V> stateMetaInfo) {
this.stateMetaInfo = stateMetaInfo;
}

@Override
public RegisteredBroadcastBackendStateMetaInfo<K, V> getStateMetaInfo() {
public RegisteredBroadcastStateBackendMetaInfo<K, V> getStateMetaInfo() {
return stateMetaInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,22 @@
@FunctionalInterface
public interface KeyExtractorFunction<T> {

KeyExtractorFunction<? extends Keyed<?>> FOR_KEYED_OBJECTS = new KeyExtractorFunction<Keyed<?>>() {
@Nonnull
@Override
public Object extractKeyFromElement(@Nonnull Keyed<?> element) {
return element.getKey();
}
};

/**
* Returns the key for the given element by which the key-group can be computed.
*/
@Nonnull
Object extractKeyFromElement(@Nonnull T element);

@SuppressWarnings("unchecked")
static <T extends Keyed<?>> KeyExtractorFunction<T> forKeyedObjects() {
return (KeyExtractorFunction<T>) FOR_KEYED_OBJECTS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.state;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

Expand All @@ -28,7 +29,7 @@
import java.io.IOException;

/**
* Abstract class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
* Class that contains the base algorithm for partitioning data into key-groups. This algorithm currently works
* with two array (input, output) for optimal algorithmic complexity. Notice that this could also be implemented over a
* single array, using some cuckoo-hashing-style element replacement. This would have worse algorithmic complexity but
* better space efficiency. We currently prefer the trade-off in favor of better algorithmic complexity.
Expand Down Expand Up @@ -89,7 +90,7 @@ public class KeyGroupPartitioner<T> {

/** Cached result. */
@Nullable
protected StateSnapshot.KeyGroupPartitionedSnapshot computedResult;
protected StateSnapshot.StateKeyGroupWriter computedResult;

/**
* Creates a new {@link KeyGroupPartitioner}.
Expand Down Expand Up @@ -131,7 +132,7 @@ public KeyGroupPartitioner(
/**
* Partitions the data into key-groups and returns the result via {@link PartitioningResult}.
*/
public StateSnapshot.KeyGroupPartitionedSnapshot partitionByKeyGroup() {
public StateSnapshot.StateKeyGroupWriter partitionByKeyGroup() {
if (computedResult == null) {
reportAllElementKeyGroups();
buildHistogramByAccumulatingCounts();
Expand Down Expand Up @@ -198,7 +199,7 @@ private void executePartitioning() {
* This represents the result of key-group partitioning. The data in {@link #partitionedElements} is partitioned
* w.r.t. {@link KeyGroupPartitioner#keyGroupRange}.
*/
public static class PartitioningResult<T> implements StateSnapshot.KeyGroupPartitionedSnapshot {
private static class PartitioningResult<T> implements StateSnapshot.StateKeyGroupWriter {

/**
* Function to write one element to a {@link DataOutputView}.
Expand Down Expand Up @@ -249,7 +250,7 @@ private int getKeyGroupEndOffsetExclusive(int keyGroup) {
}

@Override
public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {
public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) throws IOException {

int startOffset = getKeyGroupStartOffsetInclusive(keyGroupId);
int endOffset = getKeyGroupEndOffsetExclusive(keyGroupId);
Expand All @@ -264,6 +265,43 @@ public void writeMappingsInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId)
}
}

public static <T> StateSnapshotKeyGroupReader createKeyGroupPartitionReader(
@Nonnull ElementReaderFunction<T> readerFunction,
@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {
return new PartitioningResultKeyGroupReader<>(readerFunction, elementConsumer);
}

/**
* General algorithm to read key-grouped state that was written from a {@link PartitioningResult}
*
* @param <T> type of the elements to read.
*/
private static class PartitioningResultKeyGroupReader<T> implements StateSnapshotKeyGroupReader {

@Nonnull
private final ElementReaderFunction<T> readerFunction;

@Nonnull
private final KeyGroupElementsConsumer<T> elementConsumer;

public PartitioningResultKeyGroupReader(
@Nonnull ElementReaderFunction<T> readerFunction,
@Nonnull KeyGroupElementsConsumer<T> elementConsumer) {

this.readerFunction = readerFunction;
this.elementConsumer = elementConsumer;
}

@Override
public void readMappingsInKeyGroup(@Nonnull DataInputView in, @Nonnegative int keyGroupId) throws IOException {
int numElements = in.readInt();
for (int i = 0; i < numElements; i++) {
T element = readerFunction.readElement(in);
elementConsumer.consume(element, keyGroupId);
}
}
}

/**
* This functional interface defines how one element is written to a {@link DataOutputView}.
*
Expand All @@ -281,4 +319,28 @@ public interface ElementWriterFunction<T> {
*/
void writeElement(@Nonnull T element, @Nonnull DataOutputView dov) throws IOException;
}

/**
* This functional interface defines how one element is read from a {@link DataInputView}.
*
* @param <T> type of the read elements.
*/
@FunctionalInterface
public interface ElementReaderFunction<T> {

@Nonnull
T readElement(@Nonnull DataInputView div) throws IOException;
}

/**
* Functional interface to consume elements from a key group.
*
* @param <T> type of the consumed elements.
*/
@FunctionalInterface
public interface KeyGroupElementsConsumer<T> {


void consume(@Nonnull T element, @Nonnegative int keyGroupId) throws IOException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

/**
* Interface for objects that have a key attribute.
*
* @param <K> type of the key.
*/
public interface Keyed<K> {

/**
* Returns the key attribute.
*/
K getKey();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.state;

import javax.annotation.Nonnull;

/**
* Interface for objects that can be compared by priority.
* @param <T> type of the compared objects.
*/
public interface PriorityComparable<T> {

/**
* @see PriorityComparator#comparePriority(Object, Object).
*/
int comparePriorityTo(@Nonnull T other);
}
Loading

0 comments on commit dbddf00

Please sign in to comment.