Skip to content

Commit

Permalink
[FLINK-9701] Introduce TTL configuration in state descriptors
Browse files Browse the repository at this point in the history
This closes apache#6313.
  • Loading branch information
azagrebin authored and StefanRRichter committed Jul 12, 2018
1 parent b407ba7 commit f45b7f7
Show file tree
Hide file tree
Showing 53 changed files with 1,411 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.state;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -92,6 +93,10 @@ public enum Type {
@Nullable
private String queryableStateName;

/** Name for queries against state created from this StateDescriptor. */
@Nullable
private StateTtlConfiguration ttlConfig;

/** The default value returned by the state when no other value is bound to a key. */
@Nullable
protected transient T defaultValue;
Expand Down Expand Up @@ -203,6 +208,8 @@ public TypeSerializer<T> getSerializer() {
* @throws IllegalStateException If queryable state name already set
*/
public void setQueryable(String queryableStateName) {
Preconditions.checkArgument(ttlConfig == null,
"Queryable state is currently not supported with TTL");
if (this.queryableStateName == null) {
this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
} else {
Expand Down Expand Up @@ -230,6 +237,27 @@ public boolean isQueryable() {
return queryableStateName != null;
}

/**
* Configures optional activation of state time-to-live (TTL).
*
* <p>State user value will expire, become unavailable and be cleaned up in storage
* depending on configured {@link StateTtlConfiguration}.
*
* @param ttlConfig configuration of state TTL
*/
public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
Preconditions.checkNotNull(ttlConfig);
Preconditions.checkArgument(queryableStateName == null,
"Queryable state is currently not supported with TTL");
this.ttlConfig = ttlConfig;
}

@Nullable
@Internal
public StateTtlConfiguration getTtlConfig() {
return ttlConfig;
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,12 @@

/**
* Configuration of state TTL logic.
* TODO: builder
*/
public class StateTtlConfiguration {
/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum TtlUpdateType {
/** TTL is disabled. State does not expire. */
Disabled,
/** Last access timestamp is initialised when state is created and updated on every write operation. */
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
@SuppressWarnings("unchecked")
protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) {
this(
new PrecomputedParameters(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers),
PrecomputedParameters.precompute(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers),
fieldSerializers);
}

Expand Down Expand Up @@ -187,6 +187,7 @@ public int hashCode() {
return 31 * Boolean.hashCode(precomputed.immutableTargetType) + Arrays.hashCode(fieldSerializers);
}

@SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object obj) {
if (canEqual(obj)) {
Expand All @@ -205,17 +206,12 @@ public boolean canEqual(Object obj) {

@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return new CompositeTypeSerializerConfigSnapshot(fieldSerializers) {
@Override
public int getVersion() {
return 0;
}
};
return new ConfigSnapshot(fieldSerializers);
}

@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) {
if (configSnapshot instanceof ConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
if (previousSerializersAndConfigs.size() == fieldSerializers.length) {
Expand All @@ -242,11 +238,7 @@ private CompatibilityResult<T> ensureFieldCompatibility(
}
}
}
PrecomputedParameters precomputed =
new PrecomputedParameters(this.precomputed.immutableTargetType, convertSerializers);
return requiresMigration ?
CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)) :
CompatibilityResult.compatible();
return requiresMigration ? createMigrationCompatResult(convertSerializers) : CompatibilityResult.compatible();
}

private CompatibilityResult<Object> resolveFieldCompatibility(
Expand All @@ -256,6 +248,12 @@ private CompatibilityResult<Object> resolveFieldCompatibility(
previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]);
}

private CompatibilityResult<T> createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) {
PrecomputedParameters precomputed =
PrecomputedParameters.precompute(this.precomputed.immutableTargetType, convertSerializers);
return CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers));
}

/** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */
protected static class PrecomputedParameters implements Serializable {
private static final long serialVersionUID = 1L;
Expand All @@ -272,7 +270,14 @@ protected static class PrecomputedParameters implements Serializable {
/** Whether any field serializer is stateful. */
final boolean stateful;

PrecomputedParameters(
private PrecomputedParameters(boolean immutableTargetType, boolean immutable, int length, boolean stateful) {
this.immutableTargetType = immutableTargetType;
this.immutable = immutable;
this.length = length;
this.stateful = stateful;
}

static PrecomputedParameters precompute(
boolean immutableTargetType,
TypeSerializer<Object>[] fieldSerializers) {
Preconditions.checkNotNull(fieldSerializers);
Expand All @@ -292,11 +297,26 @@ protected static class PrecomputedParameters implements Serializable {
}
totalLength = totalLength >= 0 ? totalLength + fieldSerializer.getLength() : totalLength;
}
return new PrecomputedParameters(immutableTargetType, fieldsImmutable, totalLength, stateful);
}
}

this.immutableTargetType = immutableTargetType;
this.immutable = immutableTargetType && fieldsImmutable;
this.length = totalLength;
this.stateful = stateful;
/** Snapshot field serializers of composite type. */
public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
private static final int VERSION = 0;

/** This empty nullary constructor is required for deserializing the configuration. */
@SuppressWarnings("unused")
public ConfigSnapshot() {
}

ConfigSnapshot(@Nonnull TypeSerializer<?>... nestedSerializers) {
super(nestedSerializers);
}

@Override
public int getVersion() {
return VERSION;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -80,12 +81,13 @@ public void testListSerialization() throws Exception {
new ExecutionConfig(),
false,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT
);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));

KvStateRequestSerializerTest.testListSerialization(key, listState);
Expand Down Expand Up @@ -121,7 +123,8 @@ public void testMapSerialization() throws Exception {
new ExecutionConfig(),
false,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP);
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -198,11 +199,12 @@ public void testListSerialization() throws Exception {
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
TtlTimeProvider.DEFAULT
);
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));

Expand Down Expand Up @@ -306,7 +308,8 @@ public void testMapSerialization() throws Exception {
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
TtlTimeProvider.DEFAULT
);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -761,6 +762,7 @@ private AbstractKeyedStateBackend<Integer> createKeyedStateBackend(KvStateRegist
IntSerializer.INSTANCE,
numKeyGroups,
new KeyGroupRange(0, 0),
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()),
TtlTimeProvider.DEFAULT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -84,6 +86,8 @@ public abstract class AbstractKeyedStateBackend<K> implements

private final ExecutionConfig executionConfig;

private final TtlTimeProvider ttlTimeProvider;

/** Decorates the input and output streams to write key-groups compressed. */
protected final StreamCompressionDecorator keyGroupCompressionDecorator;

Expand All @@ -93,7 +97,8 @@ public AbstractKeyedStateBackend(
ClassLoader userCodeClassLoader,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig) {
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider) {

this.kvStateRegistry = kvStateRegistry;
this.keySerializer = Preconditions.checkNotNull(keySerializer);
Expand All @@ -104,6 +109,7 @@ public AbstractKeyedStateBackend(
this.keyValueStatesByName = new HashMap<>();
this.executionConfig = executionConfig;
this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider);
}

private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
Expand Down Expand Up @@ -220,40 +226,33 @@ public <N, S extends State, T> void applyToAllKeys(
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {

checkNotNull(namespaceSerializer, "Namespace serializer");
checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");

if (keySerializer == null) {
throw new UnsupportedOperationException(
"State key serializer has not been configured in the config. " +
"This operation cannot use partitioned state.");
}

if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}

InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName());
if (existing != null) {
@SuppressWarnings("unchecked")
S typedState = (S) existing;
return typedState;
InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
if (kvState == null) {
if (!stateDescriptor.isSerializerInitialized()) {
stateDescriptor.initializeSerializerUnlessSet(executionConfig);
}
kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);
publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
return (S) kvState;
}

InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
keyValueStatesByName.put(stateDescriptor.getName(), kvState);

// Publish queryable state
private void publishQueryableStateIfEnabled(
StateDescriptor<?, ?> stateDescriptor,
InternalKvState<?, ?, ?> kvState) {
if (stateDescriptor.isQueryable()) {
if (kvStateRegistry == null) {
throw new IllegalStateException("State backend has not been initialized for job.");
}

String name = stateDescriptor.getQueryableStateName();
kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
}

return (S) kvState;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import java.io.IOException;

Expand All @@ -42,13 +43,14 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri

@Override
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws IOException;
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry,
TtlTimeProvider ttlTimeProvider) throws IOException;

@Override
public abstract OperatorStateBackend createOperatorStateBackend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface KeyedStateFactory {
* @param <S> The type of the public API state.
* @param <IS> The type of internal state.
*/
<N, SV, S extends State, IS extends S> IS createState(
<N, SV, S extends State, IS extends S> IS createInternalState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc) throws Exception;
}
Loading

0 comments on commit f45b7f7

Please sign in to comment.