diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java index 6c54e71261f37..956fd059d9794 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java @@ -18,6 +18,7 @@ package org.apache.flink.api.common.state; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -92,6 +93,10 @@ public enum Type { @Nullable private String queryableStateName; + /** Name for queries against state created from this StateDescriptor. */ + @Nullable + private StateTtlConfiguration ttlConfig; + /** The default value returned by the state when no other value is bound to a key. */ @Nullable protected transient T defaultValue; @@ -203,6 +208,8 @@ public TypeSerializer getSerializer() { * @throws IllegalStateException If queryable state name already set */ public void setQueryable(String queryableStateName) { + Preconditions.checkArgument(ttlConfig == null, + "Queryable state is currently not supported with TTL"); if (this.queryableStateName == null) { this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name"); } else { @@ -230,6 +237,27 @@ public boolean isQueryable() { return queryableStateName != null; } + /** + * Configures optional activation of state time-to-live (TTL). + * + *

State user value will expire, become unavailable and be cleaned up in storage + * depending on configured {@link StateTtlConfiguration}. + * + * @param ttlConfig configuration of state TTL + */ + public void enableTimeToLive(StateTtlConfiguration ttlConfig) { + Preconditions.checkNotNull(ttlConfig); + Preconditions.checkArgument(queryableStateName == null, + "Queryable state is currently not supported with TTL"); + this.ttlConfig = ttlConfig; + } + + @Nullable + @Internal + public StateTtlConfiguration getTtlConfig() { + return ttlConfig; + } + // ------------------------------------------------------------------------ /** diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java index 8ef2046a4153d..9bd8b15c02f91 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java @@ -27,15 +27,12 @@ /** * Configuration of state TTL logic. - * TODO: builder */ public class StateTtlConfiguration { /** * This option value configures when to update last access timestamp which prolongs state TTL. */ public enum TtlUpdateType { - /** TTL is disabled. State does not expire. */ - Disabled, /** Last access timestamp is initialised when state is created and updated on every write operation. */ OnCreateAndWrite, /** The same as OnCreateAndWrite but also updated on read. */ diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java index 43c55338f1a81..2db7a309ab647 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java @@ -50,7 +50,7 @@ public abstract class CompositeSerializer extends TypeSerializer { @SuppressWarnings("unchecked") protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { this( - new PrecomputedParameters(immutableTargetType, (TypeSerializer[]) fieldSerializers), + PrecomputedParameters.precompute(immutableTargetType, (TypeSerializer[]) fieldSerializers), fieldSerializers); } @@ -187,6 +187,7 @@ public int hashCode() { return 31 * Boolean.hashCode(precomputed.immutableTargetType) + Arrays.hashCode(fieldSerializers); } + @SuppressWarnings("EqualsWhichDoesntCheckParameterClass") @Override public boolean equals(Object obj) { if (canEqual(obj)) { @@ -205,17 +206,12 @@ public boolean canEqual(Object obj) { @Override public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new CompositeTypeSerializerConfigSnapshot(fieldSerializers) { - @Override - public int getVersion() { - return 0; - } - }; + return new ConfigSnapshot(fieldSerializers); } @Override public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) { + if (configSnapshot instanceof ConfigSnapshot) { List, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = ((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); if (previousSerializersAndConfigs.size() == fieldSerializers.length) { @@ -242,11 +238,7 @@ private CompatibilityResult ensureFieldCompatibility( } } } - PrecomputedParameters precomputed = - new PrecomputedParameters(this.precomputed.immutableTargetType, convertSerializers); - return requiresMigration ? - CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)) : - CompatibilityResult.compatible(); + return requiresMigration ? createMigrationCompatResult(convertSerializers) : CompatibilityResult.compatible(); } private CompatibilityResult resolveFieldCompatibility( @@ -256,6 +248,12 @@ private CompatibilityResult resolveFieldCompatibility( previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + private CompatibilityResult createMigrationCompatResult(TypeSerializer[] convertSerializers) { + PrecomputedParameters precomputed = + PrecomputedParameters.precompute(this.precomputed.immutableTargetType, convertSerializers); + return CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)); + } + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ protected static class PrecomputedParameters implements Serializable { private static final long serialVersionUID = 1L; @@ -272,7 +270,14 @@ protected static class PrecomputedParameters implements Serializable { /** Whether any field serializer is stateful. */ final boolean stateful; - PrecomputedParameters( + private PrecomputedParameters(boolean immutableTargetType, boolean immutable, int length, boolean stateful) { + this.immutableTargetType = immutableTargetType; + this.immutable = immutable; + this.length = length; + this.stateful = stateful; + } + + static PrecomputedParameters precompute( boolean immutableTargetType, TypeSerializer[] fieldSerializers) { Preconditions.checkNotNull(fieldSerializers); @@ -292,11 +297,26 @@ protected static class PrecomputedParameters implements Serializable { } totalLength = totalLength >= 0 ? totalLength + fieldSerializer.getLength() : totalLength; } + return new PrecomputedParameters(immutableTargetType, fieldsImmutable, totalLength, stateful); + } + } - this.immutableTargetType = immutableTargetType; - this.immutable = immutableTargetType && fieldsImmutable; - this.length = totalLength; - this.stateful = stateful; + /** Snapshot field serializers of composite type. */ + public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + private static final int VERSION = 0; + + /** This empty nullary constructor is required for deserializing the configuration. */ + @SuppressWarnings("unused") + public ConfigSnapshot() { + } + + ConfigSnapshot(@Nonnull TypeSerializer... nestedSerializers) { + super(nestedSerializers); + } + + @Override + public int getVersion() { + return VERSION; } } } diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java index 9ea3198a147f0..176e55a7d8c3d 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.Rule; import org.junit.Test; @@ -80,12 +81,13 @@ public void testListSerialization() throws Exception { new ExecutionConfig(), false, TestLocalRecoveryConfig.disabled(), - RocksDBStateBackend.PriorityQueueStateType.HEAP + RocksDBStateBackend.PriorityQueueStateType.HEAP, + TtlTimeProvider.DEFAULT ); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE, + final InternalListState listState = longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); KvStateRequestSerializerTest.testListSerialization(key, listState); @@ -121,7 +123,8 @@ public void testMapSerialization() throws Exception { new ExecutionConfig(), false, TestLocalRecoveryConfig.disabled(), - RocksDBStateBackend.PriorityQueueStateType.HEAP); + RocksDBStateBackend.PriorityQueueStateType.HEAP, + TtlTimeProvider.DEFAULT); longHeapKeyedStateBackend.restore(null); longHeapKeyedStateBackend.setCurrentKey(key); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java index 73f88319d402a..d5390662d01e1 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.Test; import org.junit.runner.RunWith; @@ -198,11 +199,12 @@ public void testListSerialization() throws Exception { async, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128) + new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), + TtlTimeProvider.DEFAULT ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState listState = longHeapKeyedStateBackend.createState( + final InternalListState listState = longHeapKeyedStateBackend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); @@ -306,7 +308,8 @@ public void testMapSerialization() throws Exception { async, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128) + new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128), + TtlTimeProvider.DEFAULT ); longHeapKeyedStateBackend.setCurrentKey(key); diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java index 9947dac70a7a2..adcf3aefb7cb4 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; @@ -761,6 +762,7 @@ private AbstractKeyedStateBackend createKeyedStateBackend(KvStateRegist IntSerializer.INSTANCE, numKeyGroups, new KeyGroupRange(0, 0), - registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId())); + registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()), + TtlTimeProvider.DEFAULT); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java index 8ce25b6fa0f8d..c7f1bd9fc5127 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; @@ -84,6 +86,8 @@ public abstract class AbstractKeyedStateBackend implements private final ExecutionConfig executionConfig; + private final TtlTimeProvider ttlTimeProvider; + /** Decorates the input and output streams to write key-groups compressed. */ protected final StreamCompressionDecorator keyGroupCompressionDecorator; @@ -93,7 +97,8 @@ public AbstractKeyedStateBackend( ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - ExecutionConfig executionConfig) { + ExecutionConfig executionConfig, + TtlTimeProvider ttlTimeProvider) { this.kvStateRegistry = kvStateRegistry; this.keySerializer = Preconditions.checkNotNull(keySerializer); @@ -104,6 +109,7 @@ public AbstractKeyedStateBackend( this.keyValueStatesByName = new HashMap<>(); this.executionConfig = executionConfig; this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig); + this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider); } private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) { @@ -220,40 +226,33 @@ public void applyToAllKeys( public S getOrCreateKeyedState( final TypeSerializer namespaceSerializer, StateDescriptor stateDescriptor) throws Exception { - checkNotNull(namespaceSerializer, "Namespace serializer"); + checkNotNull(keySerializer, "State key serializer has not been configured in the config. " + + "This operation cannot use partitioned state."); - if (keySerializer == null) { - throw new UnsupportedOperationException( - "State key serializer has not been configured in the config. " + - "This operation cannot use partitioned state."); - } - - if (!stateDescriptor.isSerializerInitialized()) { - stateDescriptor.initializeSerializerUnlessSet(executionConfig); - } - - InternalKvState existing = keyValueStatesByName.get(stateDescriptor.getName()); - if (existing != null) { - @SuppressWarnings("unchecked") - S typedState = (S) existing; - return typedState; + InternalKvState kvState = keyValueStatesByName.get(stateDescriptor.getName()); + if (kvState == null) { + if (!stateDescriptor.isSerializerInitialized()) { + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + } + kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled( + namespaceSerializer, stateDescriptor, this, ttlTimeProvider); + keyValueStatesByName.put(stateDescriptor.getName(), kvState); + publishQueryableStateIfEnabled(stateDescriptor, kvState); } + return (S) kvState; + } - InternalKvState kvState = createState(namespaceSerializer, stateDescriptor); - keyValueStatesByName.put(stateDescriptor.getName(), kvState); - - // Publish queryable state + private void publishQueryableStateIfEnabled( + StateDescriptor stateDescriptor, + InternalKvState kvState) { if (stateDescriptor.isQueryable()) { if (kvStateRegistry == null) { throw new IllegalStateException("State backend has not been initialized for job."); } - String name = stateDescriptor.getQueryableStateName(); kvStateRegistry.registerKvState(keyGroupRange, name, kvState); } - - return (S) kvState; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java index 7e9c35786bf29..d397a88c21475 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import java.io.IOException; @@ -42,13 +43,14 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri @Override public abstract AbstractKeyedStateBackend createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException; + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws IOException; @Override public abstract OperatorStateBackend createOperatorStateBackend( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java index dd251bd9fc99a..de359791d84e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java @@ -36,7 +36,7 @@ public interface KeyedStateFactory { * @param The type of the public API state. * @param The type of internal state. */ - IS createState( + IS createInternalState( TypeSerializer namespaceSerializer, StateDescriptor stateDesc) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java index f34cd9b63a58f..2775b71ac7205 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import java.io.IOException; @@ -117,7 +118,7 @@ public interface StateBackend extends java.io.Serializable { /** * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding keyed state - * and checkpointing it. + * and checkpointing it. Uses default TTL time provider. * *

Keyed State is state where each value is bound to a key. * @@ -127,14 +128,46 @@ public interface StateBackend extends java.io.Serializable { * * @throws Exception This method may forward all exceptions that occur while instantiating the backend. */ - AbstractKeyedStateBackend createKeyedStateBackend( + default AbstractKeyedStateBackend createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception; + TaskKvStateRegistry kvStateRegistry) throws Exception { + return createKeyedStateBackend( + env, + jobID, + operatorIdentifier, + keySerializer, + numberOfKeyGroups, + keyGroupRange, + kvStateRegistry, + TtlTimeProvider.DEFAULT); + } + + /** + * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding keyed state + * and checkpointing it. + * + *

Keyed State is state where each value is bound to a key. + * + * @param The type of the keys by which the state is organized. + * + * @return The Keyed State Backend for the given job, operator, and key group range. + * + * @throws Exception This method may forward all exceptions that occur while instantiating the backend. + */ + AbstractKeyedStateBackend createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws Exception; /** * Creates a new {@link OperatorStateBackend} that can be used for storing operator state. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index ad1581bf6ad1b..f5a86e1ac51a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TernaryBoolean; import org.slf4j.LoggerFactory; @@ -93,7 +94,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur private static final long serialVersionUID = -8191916350224044011L; /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; // ------------------------------------------------------------------------ @@ -448,13 +449,14 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException @Override public AbstractKeyedStateBackend createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) { + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) { TaskStateManager taskStateManager = env.getTaskStateManager(); LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig(); @@ -470,7 +472,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( isUsingAsynchronousSnapshots(), env.getExecutionConfig(), localRecoveryConfig, - priorityQueueSetFactory); + priorityQueueSetFactory, + ttlTimeProvider); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 562c93de1114b..495dfe0d4af91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.StreamCompressionDecorator; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -171,9 +172,11 @@ public HeapKeyedStateBackend( boolean asynchronousSnapshots, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, - PriorityQueueSetFactory priorityQueueSetFactory) { + PriorityQueueSetFactory priorityQueueSetFactory, + TtlTimeProvider ttlTimeProvider) { - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); + super(kvStateRegistry, keySerializer, userCodeClassLoader, + numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider); this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig); SnapshotStrategySynchronicityBehavior synchronicityTrait = asynchronousSnapshots ? @@ -241,7 +244,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( TypeSerializer namespaceSerializer, StateDescriptor stateDesc) throws Exception { StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index d78944c78ec59..1c464d7fa9772 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TernaryBoolean; import javax.annotation.Nullable; @@ -307,7 +308,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) { TaskStateManager taskStateManager = env.getTaskStateManager(); HeapPriorityQueueSetFactory priorityQueueSetFactory = @@ -321,7 +323,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( isUsingAsynchronousSnapshots(), env.getExecutionConfig(), taskStateManager.createLocalRecoveryConfig(), - priorityQueueSetFactory); + priorityQueueSetFactory, + ttlTimeProvider); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java index 29a575a2beb8a..1b72c5437240d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java @@ -55,8 +55,6 @@ abstract class AbstractTtlDecorator { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); - Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled, - "State does not need to be wrapped with TTL if it is configured as disabled."); this.original = original; this.config = config; this.timeProvider = timeProvider; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index 98d7c52f0fd7c..21145e5b47c11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -23,13 +23,15 @@ import org.apache.flink.runtime.state.internal.InternalMapState; import org.apache.flink.util.FlinkRuntimeException; +import javax.annotation.Nonnull; + import java.util.AbstractMap; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; +import java.util.NoSuchElementException; +import java.util.function.Function; /** * This class wraps map state with TTL logic. @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream> entriesStream() throws Exception { + private Iterable entries( + Function, R> resultMapper) throws Exception { Iterable>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static Map.Entry unwrapWithoutTs(Map.Entry> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator implements Iterator { + private final Iterator>> originalIterator; + private final Function, R> resultMapper; + private Map.Entry nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable>> withTs, + @Nonnull Function, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { + originalIterator.remove(); + } else { + throw new IllegalStateException("next() has not been called or hasNext() has been called afterwards," + + " remove() is supported only right after calling next()"); + } + } + + private Map.Entry getUnexpiredAndUpdateOrCleanup(Map.Entry> e) { + UV unexpiredValue; + try { + unexpiredValue = getWithTtlCheckAndUpdate( + e::getValue, + v -> original.put(e.getKey(), v), + originalIterator::remove); + } catch (Exception ex) { + throw new FlinkRuntimeException(ex); + } + return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java index 01e4be9deff23..c0aa465319cba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java @@ -49,7 +49,7 @@ public T get() throws Exception { @Override public void add(T value) throws Exception { - original.add(wrapWithTs(value, Long.MAX_VALUE)); + original.add(wrapWithTs(value)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java index 82096a6e19787..5909ac7c41558 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java @@ -49,16 +49,14 @@ public static IS createStateAndWrapWithTt TypeSerializer namespaceSerializer, StateDescriptor stateDesc, KeyedStateFactory originalStateFactory, - StateTtlConfiguration ttlConfig, TtlTimeProvider timeProvider) throws Exception { Preconditions.checkNotNull(namespaceSerializer); Preconditions.checkNotNull(stateDesc); Preconditions.checkNotNull(originalStateFactory); - Preconditions.checkNotNull(ttlConfig); Preconditions.checkNotNull(timeProvider); - return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ? - originalStateFactory.createState(namespaceSerializer, stateDesc) : - new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider) + return stateDesc.getTtlConfig() == null ? + originalStateFactory.createInternalState(namespaceSerializer, stateDesc) : + new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider) .createState(namespaceSerializer, stateDesc); } @@ -96,7 +94,7 @@ private IS createState( stateDesc.getClass(), TtlStateFactory.class); throw new FlinkRuntimeException(message); } - return stateFactory.createState(namespaceSerializer, stateDesc); + return stateFactory.createInternalState(namespaceSerializer, stateDesc); } @SuppressWarnings("unchecked") @@ -106,7 +104,7 @@ private IS createValueState( ValueStateDescriptor> ttlDescriptor = new ValueStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlValueState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer()); } @@ -118,7 +116,7 @@ private IS createListState( ListStateDescriptor> ttlDescriptor = new ListStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); return (IS) new TtlListState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, listStateDesc.getSerializer()); } @@ -132,7 +130,7 @@ private IS createMapState( mapStateDesc.getKeySerializer(), new TtlSerializer<>(mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, mapStateDesc.getSerializer()); } @@ -146,7 +144,7 @@ private IS createReducingState( new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlReducingState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer()); } @@ -161,7 +159,7 @@ private IS createAggregatingStat AggregatingStateDescriptor, OUT> ttlDescriptor = new AggregatingStateDescriptor<>( stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction); } @@ -178,7 +176,7 @@ private IS createFoldingState( new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlFoldingState<>( - originalStateFactory.createState(namespaceSerializer, ttlDescriptor), + originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor), ttlConfig, timeProvider, stateDesc.getSerializer()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java index bac9d3663f9bf..84809ccd22099 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java @@ -21,6 +21,8 @@ /** * Provides time to TTL logic to judge about state expiration. */ -interface TtlTimeProvider { +public interface TtlTimeProvider { + TtlTimeProvider DEFAULT = System::currentTimeMillis; + long currentTimestamp(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index 9456f10b7fab9..ffebc52f8a006 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -168,7 +169,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws Exception { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java index 1325431f65670..6424a7a7b35df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.IOUtils; import org.junit.Rule; @@ -70,7 +71,8 @@ private void validateSupportForAsyncSnapshots(StateBackend backend) throws Excep IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), - null + null, + TtlTimeProvider.DEFAULT ); assertTrue(keyedStateBackend.supportsAsynchronousSnapshots()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 3c5756bd22af4..bfdc05d2747d7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -73,6 +73,7 @@ import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.runtime.state.internal.InternalReducingState; import org.apache.flink.runtime.state.internal.InternalValueState; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader; import org.apache.flink.types.IntValue; @@ -182,7 +183,8 @@ protected AbstractKeyedStateBackend createKeyedBackend( keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry()); + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT); backend.restore(null); @@ -219,7 +221,8 @@ protected AbstractKeyedStateBackend restoreKeyedBackend( keySerializer, numberOfKeyGroups, keyGroupRange, - env.getTaskKvStateRegistry()); + env.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT); backend.restore(new StateObjectCollection<>(state)); @@ -3545,7 +3548,7 @@ public void testParallelAsyncSnapshots() throws Exception { } // insert some data to the backend. - InternalValueState valueState = backend.createState( + InternalValueState valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -3602,7 +3605,7 @@ public void testAsyncSnapshot() throws Exception { try { backend = createKeyedBackend(IntSerializer.INSTANCE); - InternalValueState valueState = backend.createState( + InternalValueState valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -3649,7 +3652,7 @@ public void testAsyncSnapshot() throws Exception { try { backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle); - InternalValueState valueState = backend.createState( + InternalValueState valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); @@ -3791,7 +3794,7 @@ public void testAsyncSnapshotCancellation() throws Exception { return; } - InternalValueState valueState = backend.createState( + InternalValueState valueState = backend.createInternalState( VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java index dfcdffc786912..558f6295e2264 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.TestLogger; import org.apache.commons.io.IOUtils; @@ -54,7 +55,8 @@ public void testCompressionConfiguration() { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { Assert.assertTrue( @@ -77,7 +79,8 @@ public void testCompressionConfiguration() { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { Assert.assertTrue( @@ -118,12 +121,13 @@ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { InternalValueState state = - stateBackend.createState(new VoidNamespaceSerializer(), stateDescriptor); + stateBackend.createInternalState(new VoidNamespaceSerializer(), stateDescriptor); stateBackend.setCurrentKey("A"); state.setCurrentNamespace(VoidNamespace.INSTANCE); @@ -160,12 +164,13 @@ private void snapshotRestoreRoundtrip(boolean useCompression) throws Exception { true, executionConfig, TestLocalRecoveryConfig.disabled(), - mock(PriorityQueueSetFactory.class)); + mock(PriorityQueueSetFactory.class), + TtlTimeProvider.DEFAULT); try { stateBackend.restore(StateObjectCollection.singleton(stateHandle)); - InternalValueState state = stateBackend.createState( + InternalValueState state = stateBackend.createInternalState( new VoidNamespaceSerializer(), stateDescriptor); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java index 249d0c3a0e132..7b8d69f2cb61e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java @@ -73,7 +73,7 @@ public void testMapStateMigrationAfterHashMapSerRemoval() throws Exception { keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot())); - InternalMapState state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr); + InternalMapState state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr); keyedBackend.setCurrentKey("abc"); state.setCurrentNamespace(namespace1); @@ -233,7 +233,7 @@ public void testRestore1_2ToMaster() throws Exception { final ListStateDescriptor stateDescr = new ListStateDescriptor<>("my-state", Long.class); stateDescr.initializeSerializerUnlessSet(new ExecutionConfig()); - InternalListState state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr); + InternalListState state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr); assertEquals(7, keyedBackend.numStateEntries()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java index cf6aef463aa58..0eddf3c5958cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -61,6 +62,7 @@ public HeapKeyedStateBackend createKeyedBackend(TypeSerializer keySeri async, new ExecutionConfig(), TestLocalRecoveryConfig.disabled(), - new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128)); + new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128), + TtlTimeProvider.DEFAULT); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java new file mode 100644 index 0000000000000..06de4be42ce7f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; + +/** Test suite for heap state TTL. */ +public class HeapTtlStateTest extends TtlStateTestBase { + @Override + protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) { + return new StateBackendTestContext(timeProvider) { + @Override + protected StateBackend createStateBackend() { + return new MemoryStateBackend(false); + } + }; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java new file mode 100644 index 0000000000000..392bdba831e1d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.ttl.mock.MockStateBackend; + +/** Test suite for mock state TTL. */ +public class MockTtlStateTest extends TtlStateTestBase { + @Override + protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) { + return new StateBackendTestContext(timeProvider) { + @Override + protected StateBackend createStateBackend() { + return new MockStateBackend(); + } + }; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java similarity index 94% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java index e14c3f8679b64..f980043fa9bc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.state.ttl; -class MockTimeProvider implements TtlTimeProvider { +class MockTtlTimeProvider implements TtlTimeProvider { long time = 0; @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java new file mode 100644 index 0000000000000..eaec234e425e8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +/** Base class for state backend test context. */ +public abstract class StateBackendTestContext { + private final StateBackend stateBackend; + private final CheckpointStorageLocation checkpointStorageLocation; + private final TtlTimeProvider timeProvider; + + private AbstractKeyedStateBackend keyedStateBackend; + + protected StateBackendTestContext(TtlTimeProvider timeProvider) { + this.timeProvider = Preconditions.checkNotNull(timeProvider); + this.stateBackend = Preconditions.checkNotNull(createStateBackend()); + this.checkpointStorageLocation = createCheckpointStorageLocation(); + } + + protected abstract StateBackend createStateBackend(); + + private CheckpointStorageLocation createCheckpointStorageLocation() { + try { + return stateBackend + .createCheckpointStorage(new JobID()) + .initializeLocationForCheckpoint(2L); + } catch (IOException e) { + throw new RuntimeException("unexpected"); + } + } + + void createAndRestoreKeyedStateBackend() { + Environment env = new DummyEnvironment(); + try { + disposeKeyedStateBackend(); + keyedStateBackend = stateBackend.createKeyedStateBackend( + env, new JobID(), "test", StringSerializer.INSTANCE, 10, + new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider); + keyedStateBackend.setCurrentKey("defaultKey"); + } catch (Exception e) { + throw new RuntimeException("unexpected"); + } + } + + void disposeKeyedStateBackend() { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); + keyedStateBackend = null; + } + } + + @Nonnull + KeyedStateHandle takeSnapshot() throws Exception { + RunnableFuture> snapshotRunnableFuture = + keyedStateBackend.snapshot(682375462392L, 10L, + checkpointStorageLocation, CheckpointOptions.forCheckpointWithDefaultLocation()); + if (!snapshotRunnableFuture.isDone()) { + snapshotRunnableFuture.run(); + } + return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot(); + } + + void restoreSnapshot(@Nullable KeyedStateHandle snapshot) throws Exception { + Collection restoreState = + snapshot == null ? null : new StateObjectCollection<>(Collections.singleton(snapshot)); + keyedStateBackend.restore(restoreState); + if (snapshot != null) { + snapshot.discardState(); + } + } + + void setCurrentKey(String key) { + Preconditions.checkNotNull(keyedStateBackend, "keyed backend is not initialised"); + keyedStateBackend.setCurrentKey(key); + } + + @SuppressWarnings("unchecked") + S createState( + StateDescriptor stateDescriptor, + @SuppressWarnings("SameParameterValue") N defaultNamespace) throws Exception { + S state = keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, stateDescriptor); + ((InternalKvState) state).setCurrentNamespace(defaultNamespace); + return state; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java similarity index 75% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java index 5d9c682601648..b19391a98e4db 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java @@ -20,6 +20,8 @@ import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -28,16 +30,12 @@ import java.util.Set; /** Test suite for {@link TtlAggregatingState}. */ -public class TtlAggregatingStateTest - extends TtlMergingStateBase.TtlIntegerMergingStateBase, Integer, String> { +class TtlAggregatingStateTestContext + extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext, Integer, String> { private static final long DEFAULT_ACCUMULATOR = 3L; @Override void initTestValues() { - updater = v -> ttlState.add(v); - getter = () -> ttlState.get(); - originalGetter = () -> ttlState.original.get(); - updateEmpty = 5; updateUnexpired = 7; updateExpired = 6; @@ -47,11 +45,26 @@ void initTestValues() { getUpdateExpired = "9"; } + @SuppressWarnings("unchecked") + @Override + StateDescriptor createStateDescriptor() { + return (StateDescriptor) new AggregatingStateDescriptor<>( + "TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE); + } + + @Override + void update(Integer value) throws Exception { + ttlState.add(value); + } + + @Override + String get() throws Exception { + return ttlState.get(); + } + @Override - TtlAggregatingState createState() { - AggregatingStateDescriptor aggregatingStateDes = - new AggregatingStateDescriptor<>("TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE); - return (TtlAggregatingState) wrapMockState(aggregatingStateDes); + Object getOriginal() throws Exception { + return ttlState.original.get(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java similarity index 69% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java index 8dac8ca4328ef..2b072b9eadf41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java @@ -20,17 +20,15 @@ import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; /** Test suite for {@link TtlFoldingState}. */ @SuppressWarnings("deprecation") -public class TtlFoldingStateTest extends TtlStateTestBase, Long, String> { +class TtlFoldingStateTestContext extends TtlStateTestContextBase, Long, String> { @Override void initTestValues() { - updater = v -> ttlState.add(v); - getter = () -> ttlState.get(); - originalGetter = () -> ttlState.original.get(); - updateEmpty = 5L; updateUnexpired = 7L; updateExpired = 6L; @@ -41,10 +39,25 @@ void initTestValues() { } @Override - TtlFoldingState createState() { - FoldingStateDescriptor foldingStateDesc = - new FoldingStateDescriptor<>("TtlTestFoldingState", "1", FOLD, StringSerializer.INSTANCE); - return (TtlFoldingState) wrapMockState(foldingStateDesc); + void update(Long value) throws Exception { + ttlState.add(value); + } + + @Override + String get() throws Exception { + return ttlState.get(); + } + + @Override + Object getOriginal() throws Exception { + return ttlState.original.get(); + } + + @SuppressWarnings("unchecked") + @Override + StateDescriptor createStateDescriptor() { + return (StateDescriptor) new FoldingStateDescriptor<>( + "TtlTestFoldingState", "1", FOLD, StringSerializer.INSTANCE); } private static final FoldFunction FOLD = (acc, val) -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java similarity index 71% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java index 893f9aeff062c..c113bf1c0501f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java @@ -19,6 +19,8 @@ package org.apache.flink.runtime.state.ttl; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; @@ -31,14 +33,10 @@ import java.util.stream.StreamSupport; /** Test suite for {@link TtlListState}. */ -public class TtlListStateTest - extends TtlMergingStateBase, List, Iterable> { +class TtlListStateTestContext + extends TtlMergingStateTestContext, List, Iterable> { @Override void initTestValues() { - updater = v -> ttlState.addAll(v); - getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList()); - originalGetter = () -> ttlState.original.get(); - emptyValue = Collections.emptyList(); updateEmpty = Arrays.asList(5, 7, 10); @@ -51,10 +49,24 @@ void initTestValues() { } @Override - TtlListState createState() { - ListStateDescriptor listStateDesc = - new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE); - return (TtlListState) wrapMockState(listStateDesc); + void update(List value) throws Exception { + ttlState.addAll(value); + } + + @Override + Iterable get() throws Exception { + return StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList()); + } + + @Override + Object getOriginal() throws Exception { + return ttlState.original.get() == null ? emptyValue : ttlState.original.get(); + } + + @SuppressWarnings("unchecked") + @Override + StateDescriptor createStateDescriptor() { + return (StateDescriptor) new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java similarity index 75% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java index bac2f41ea55ec..7fd61aaeb2980 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java @@ -28,15 +28,11 @@ import java.util.stream.StreamSupport; /** Test suite for collection methods of {@link TtlMapState}. */ -public class TtlMapStateTest extends - TtlMapStateTestBase, Set>> { +class TtlMapStateAllEntriesTestContext extends + TtlMapStateTestContext, Set>> { @Override void initTestValues() { - updater = map -> ttlState.putAll(map); - getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet()); - originalGetter = () -> ttlState.original.entries(); - emptyValue = Collections.emptySet(); updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10")); @@ -52,4 +48,19 @@ void initTestValues() { private static Map mapOf(Tuple2 ... entries) { return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } + + @Override + void update(Map map) throws Exception { + ttlState.putAll(map); + } + + @Override + Set> get() throws Exception { + return StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet()); + } + + @Override + Object getOriginal() throws Exception { + return ttlState.original.entries() == null ? Collections.emptySet() : ttlState.original.entries(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java similarity index 78% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java index d6949e7c1c15a..fb025afd9ad5f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state.ttl; /** Test suite for per element methods of {@link TtlMapState}. */ -public class TtlMapStatePerElementTest extends TtlMapStateTestBase { +class TtlMapStatePerElementTestContext extends TtlMapStateTestContext { private static final int TEST_KEY = 1; private static final String TEST_VAL1 = "test value1"; private static final String TEST_VAL2 = "test value2"; @@ -27,10 +27,6 @@ public class TtlMapStatePerElementTest extends TtlMapStateTestBase ttlState.put(TEST_KEY, v); - getter = () -> ttlState.get(TEST_KEY); - originalGetter = () -> ttlState.original.get(TEST_KEY); - updateEmpty = TEST_VAL1; updateUnexpired = TEST_VAL2; updateExpired = TEST_VAL3; @@ -39,4 +35,19 @@ void initTestValues() { getUnexpired = TEST_VAL2; getUpdateExpired = TEST_VAL3; } + + @Override + void update(String value) throws Exception { + ttlState.put(TEST_KEY, value); + } + + @Override + String get() throws Exception { + return ttlState.get(TEST_KEY); + } + + @Override + Object getOriginal() throws Exception { + return ttlState.original.get(TEST_KEY); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java similarity index 69% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java index dab319470f45b..bb7b3272b62e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java @@ -19,15 +19,17 @@ package org.apache.flink.runtime.state.ttl; import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -abstract class TtlMapStateTestBase - extends TtlStateTestBase, UV, GV> { +abstract class TtlMapStateTestContext + extends TtlStateTestContextBase, UV, GV> { + @SuppressWarnings("unchecked") @Override - TtlMapState createState() { - MapStateDescriptor mapStateDesc = - new MapStateDescriptor<>("TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE); - return (TtlMapState) wrapMockState(mapStateDesc); + StateDescriptor createStateDescriptor() { + return (StateDescriptor) new MapStateDescriptor<>( + "TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java similarity index 66% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java index 6a7aebe280026..85c6134f7b2bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java @@ -21,19 +21,15 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.internal.InternalMergingState; -import org.junit.Test; - import java.util.Arrays; import java.util.List; import java.util.Random; -import static org.junit.Assert.assertEquals; - -abstract class TtlMergingStateBase, UV, GV> - extends TtlStateTestBase { +abstract class TtlMergingStateTestContext, UV, GV> + extends TtlStateTestContextBase { static final Random RANDOM = new Random(); - private static final List NAMESPACES = Arrays.asList( + static final List NAMESPACES = Arrays.asList( "unsetNamespace1", "unsetNamespace2", "expiredNamespace", @@ -41,30 +37,7 @@ abstract class TtlMergingStateBase> expiredUpdatesToMerge = generateExpiredUpdatesToMerge(); - applyStateUpdates(expiredUpdatesToMerge); - - timeProvider.time = 120; - List> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge(); - applyStateUpdates(unexpiredUpdatesToMerge); - - timeProvider.time = 150; - List> finalUpdatesToMerge = generateFinalUpdatesToMerge(); - applyStateUpdates(finalUpdatesToMerge); - - timeProvider.time = 230; - ttlState.mergeNamespaces("targetNamespace", NAMESPACES); - ttlState.setCurrentNamespace("targetNamespace"); - assertEquals("Unexpected result of merge operation", - getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get()); - } - - private List> generateExpiredUpdatesToMerge() { + List> generateExpiredUpdatesToMerge() { return Arrays.asList( Tuple2.of("expiredNamespace", generateRandomUpdate()), Tuple2.of("expiredNamespace", generateRandomUpdate()), @@ -73,7 +46,7 @@ private List> generateExpiredUpdatesToMerge() { ); } - private List> generateUnexpiredUpdatesToMerge() { + List> generateUnexpiredUpdatesToMerge() { return Arrays.asList( Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()), Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()), @@ -82,7 +55,7 @@ private List> generateUnexpiredUpdatesToMerge() { ); } - private List> generateFinalUpdatesToMerge() { + List> generateFinalUpdatesToMerge() { return Arrays.asList( Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()), Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()), @@ -95,10 +68,10 @@ private List> generateFinalUpdatesToMerge() { abstract UV generateRandomUpdate(); - private void applyStateUpdates(List> updates) throws Exception { + void applyStateUpdates(List> updates) throws Exception { for (Tuple2 t : updates) { ttlState.setCurrentNamespace(t.f0); - updater.accept(t.f1); + update(t.f1); } } @@ -107,10 +80,10 @@ abstract GV getMergeResult( List> finalUpdatesToMerge); @SuppressWarnings("unchecked") - abstract static class TtlIntegerMergingStateBase< + abstract static class TtlIntegerMergingStateTestContext< S extends InternalMergingState, UV extends Number, GV> - extends TtlMergingStateBase { + extends TtlMergingStateTestContext { @Override UV generateRandomUpdate() { return (UV) (Integer) RANDOM.nextInt(1000); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java similarity index 70% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java index bc5f67fc69997..ea5e61be928ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java @@ -20,20 +20,18 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.tuple.Tuple2; import java.util.List; /** Test suite for {@link TtlReducingState}. */ -public class TtlReducingStateTest - extends TtlMergingStateBase.TtlIntegerMergingStateBase, Integer, Integer> { +class TtlReducingStateTestContext + extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext, Integer, Integer> { @Override void initTestValues() { - updater = v -> ttlState.add(v); - getter = () -> ttlState.get(); - originalGetter = () -> ttlState.original.get(); - updateEmpty = 5; updateUnexpired = 7; updateExpired = 6; @@ -43,11 +41,26 @@ void initTestValues() { getUpdateExpired = 6; } + @SuppressWarnings("unchecked") + @Override + StateDescriptor createStateDescriptor() { + return (StateDescriptor) new ReducingStateDescriptor<>( + "TtlTestReducingState", REDUCE, IntSerializer.INSTANCE); + } + + @Override + void update(Integer value) throws Exception { + ttlState.add(value); + } + + @Override + Integer get() throws Exception { + return ttlState.get(); + } + @Override - TtlReducingState createState() { - ReducingStateDescriptor aggregatingStateDes = - new ReducingStateDescriptor<>("TtlTestReducingState", REDUCE, IntSerializer.INSTANCE); - return (TtlReducingState) wrapMockState(aggregatingStateDes); + Object getOriginal() throws Exception { + return ttlState.original.get(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java index bc3d6e737cda4..5820f13f78489 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java @@ -22,105 +22,153 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.StateTtlConfiguration; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.internal.InternalKvState; -import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateFactory; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.function.SupplierWithException; -import org.apache.flink.util.function.ThrowingConsumer; +import org.apache.flink.util.StateMigrationException; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeThat; -abstract class TtlStateTestBase, UV, GV> { +/** State TTL base test suite. */ +@RunWith(Parameterized.class) +public abstract class TtlStateTestBase { private static final long TTL = 100; - private static final KeyedStateFactory MOCK_ORIGINAL_STATE_FACTORY = new MockKeyedStateFactory(); - S ttlState; - MockTimeProvider timeProvider; - StateTtlConfiguration ttlConfig; + private MockTtlTimeProvider timeProvider; + private StateBackendTestContext sbetc; + private StateTtlConfiguration ttlConfig; - ThrowingConsumer updater; - SupplierWithException getter; - SupplierWithException originalGetter; + @Before + public void setup() { + timeProvider = new MockTtlTimeProvider(); + sbetc = createStateBackendTestContext(timeProvider); + } - UV updateEmpty; - UV updateUnexpired; - UV updateExpired; + protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider); + + @Parameterized.Parameter + public TtlStateTestContextBase ctx; + + @Parameterized.Parameters(name = "{0}") + public static List> testContexts() { + return Arrays.asList( + new TtlValueStateTestContext(), + new TtlListStateTestContext(), + new TtlMapStateAllEntriesTestContext(), + new TtlMapStatePerElementTestContext(), + new TtlAggregatingStateTestContext(), + new TtlReducingStateTestContext(), + new TtlFoldingStateTestContext()); + } - GV getUpdateEmpty; - GV getUnexpired; - GV getUpdateExpired; + @SuppressWarnings("unchecked") + private , UV> TtlStateTestContextBase ctx() { + return (TtlStateTestContextBase) ctx; + } - GV emptyValue = null; + @SuppressWarnings("unchecked") + private TtlMergingStateTestContext mctx() { + return (TtlMergingStateTestContext) ctx; + } - void initTest() { + private void initTest() throws Exception { initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired); } - private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) { + private void initTest( + StateTtlConfiguration.TtlUpdateType updateType, + StateTtlConfiguration.TtlStateVisibility visibility) throws Exception { initTest(updateType, visibility, TTL); } - private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) { - timeProvider = new MockTimeProvider(); - StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5)); - ttlConfigBuilder.setTtlUpdateType(updateType) - .setStateVisibility(visibility) - .setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime) - .setTtl(Time.milliseconds(ttl)); - ttlConfig = ttlConfigBuilder.build(); - ttlState = createState(); - initTestValues(); + private void initTest( + StateTtlConfiguration.TtlUpdateType updateType, + StateTtlConfiguration.TtlStateVisibility visibility, + long ttl) throws Exception { + ttlConfig = StateTtlConfiguration + .newBuilder(Time.milliseconds(ttl)) + .setTtlUpdateType(updateType) + .setStateVisibility(visibility) + .build(); + sbetc.createAndRestoreKeyedStateBackend(); + sbetc.restoreSnapshot(null); + createState(); + ctx().initTestValues(); } - abstract S createState(); - - IS wrapMockState(StateDescriptor stateDesc) { - try { - return TtlStateFactory.createStateAndWrapWithTtlIfEnabled( - StringSerializer.INSTANCE, stateDesc, - MOCK_ORIGINAL_STATE_FACTORY, ttlConfig, timeProvider); - } catch (Exception e) { - throw new FlinkRuntimeException("Unexpected exception wrapping mock state", e); - } + @SuppressWarnings("unchecked") + private void createState() throws Exception { + StateDescriptor stateDescriptor = ctx().createStateDescriptor(); + stateDescriptor.enableTimeToLive(ttlConfig); + ctx().ttlState = + (InternalKvState) sbetc.createState(stateDescriptor, "defaultNamespace"); } - abstract void initTestValues(); + private void takeAndRestoreSnapshot() throws Exception { + KeyedStateHandle snapshot = sbetc.takeSnapshot(); + sbetc.createAndRestoreKeyedStateBackend(); + sbetc.restoreSnapshot(snapshot); + createState(); + } @Test public void testNonExistentValue() throws Exception { initTest(); - assertEquals("Non-existing state should be empty", emptyValue, getter.get()); + assertEquals("Non-existing state should be empty", ctx().emptyValue, ctx().get()); } @Test public void testExactExpirationOnWrite() throws Exception { initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired); + takeAndRestoreSnapshot(); + timeProvider.time = 0; - updater.accept(updateEmpty); + ctx().update(ctx().updateEmpty); + + takeAndRestoreSnapshot(); timeProvider.time = 20; - assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get()); + assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get()); + + takeAndRestoreSnapshot(); timeProvider.time = 50; - updater.accept(updateUnexpired); + ctx().update(ctx().updateUnexpired); + + takeAndRestoreSnapshot(); timeProvider.time = 120; - assertEquals("Unexpired state should be available after update", getUnexpired, getter.get()); + assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get()); + + takeAndRestoreSnapshot(); timeProvider.time = 170; - updater.accept(updateExpired); + ctx().update(ctx().updateExpired); + + takeAndRestoreSnapshot(); timeProvider.time = 220; - assertEquals("Unexpired state should be available after update", getUpdateExpired, getter.get()); + assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get()); + + takeAndRestoreSnapshot(); timeProvider.time = 300; - assertEquals("Expired state should be unavailable", emptyValue, getter.get()); - assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get()); + assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get()); + assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal()); } @Test @@ -128,11 +176,14 @@ public void testRelaxedExpirationOnWrite() throws Exception { initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp); timeProvider.time = 0; - updater.accept(updateEmpty); + ctx().update(ctx().updateEmpty); + + takeAndRestoreSnapshot(); timeProvider.time = 120; - assertEquals("Expired state should be available", getUpdateEmpty, getter.get()); - assertEquals("Expired state should be cleared on access", emptyValue, getter.get()); + assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get()); + assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal()); + assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get()); } @Test @@ -140,17 +191,23 @@ public void testExactExpirationOnRead() throws Exception { initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired); timeProvider.time = 0; - updater.accept(updateEmpty); + ctx().update(ctx().updateEmpty); + + takeAndRestoreSnapshot(); timeProvider.time = 50; - assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get()); + assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get()); + + takeAndRestoreSnapshot(); timeProvider.time = 120; - assertEquals("Unexpired state should be available after read", getUpdateEmpty, getter.get()); + assertEquals("Unexpired state should be available after read", ctx().getUpdateEmpty, ctx().get()); + + takeAndRestoreSnapshot(); timeProvider.time = 250; - assertEquals("Expired state should be unavailable", emptyValue, getter.get()); - assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get()); + assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get()); + assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal()); } @Test @@ -158,14 +215,18 @@ public void testRelaxedExpirationOnRead() throws Exception { initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp); timeProvider.time = 0; - updater.accept(updateEmpty); + ctx().update(ctx().updateEmpty); + + takeAndRestoreSnapshot(); timeProvider.time = 50; - assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get()); + assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get()); + + takeAndRestoreSnapshot(); timeProvider.time = 170; - assertEquals("Expired state should be available", getUpdateEmpty, getter.get()); - assertEquals("Expired state should be cleared on access", emptyValue, getter.get()); + assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get()); + assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get()); } @Test @@ -173,9 +234,154 @@ public void testExpirationTimestampOverflow() throws Exception { initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE); timeProvider.time = 10; - updater.accept(updateEmpty); + ctx().update(ctx().updateEmpty); + + takeAndRestoreSnapshot(); + + timeProvider.time = 50; + assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get()); + } + + @Test + public void testMergeNamespaces() throws Exception { + assumeThat(ctx, instanceOf(TtlMergingStateTestContext.class)); + + initTest(); + + timeProvider.time = 0; + List> expiredUpdatesToMerge = mctx().generateExpiredUpdatesToMerge(); + mctx().applyStateUpdates(expiredUpdatesToMerge); + + takeAndRestoreSnapshot(); + + timeProvider.time = 120; + List> unexpiredUpdatesToMerge = mctx().generateUnexpiredUpdatesToMerge(); + mctx().applyStateUpdates(unexpiredUpdatesToMerge); + + takeAndRestoreSnapshot(); + + timeProvider.time = 150; + List> finalUpdatesToMerge = mctx().generateFinalUpdatesToMerge(); + mctx().applyStateUpdates(finalUpdatesToMerge); + + takeAndRestoreSnapshot(); + + timeProvider.time = 230; + mctx().ttlState.mergeNamespaces("targetNamespace", TtlMergingStateTestContext.NAMESPACES); + mctx().ttlState.setCurrentNamespace("targetNamespace"); + assertEquals("Unexpected result of merge operation", + mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), mctx().get()); + } + + @Test + public void testMultipleKeys() throws Exception { + testMultipleStateIds(id -> sbetc.setCurrentKey(id)); + } + + @Test + public void testMultipleNamespaces() throws Exception { + testMultipleStateIds(id -> ctx().ttlState.setCurrentNamespace(id)); + } + + private void testMultipleStateIds(Consumer idChanger) throws Exception { + initTest(); + + timeProvider.time = 0; + idChanger.accept("id2"); + ctx().update(ctx().updateEmpty); + + takeAndRestoreSnapshot(); timeProvider.time = 50; - assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get()); + idChanger.accept("id1"); + ctx().update(ctx().updateEmpty); + idChanger.accept("id2"); + ctx().update(ctx().updateUnexpired); + + takeAndRestoreSnapshot(); + + timeProvider.time = 120; + idChanger.accept("id1"); + assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get()); + idChanger.accept("id2"); + assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get()); + + takeAndRestoreSnapshot(); + + timeProvider.time = 170; + idChanger.accept("id2"); + ctx().update(ctx().updateExpired); + + takeAndRestoreSnapshot(); + + timeProvider.time = 230; + idChanger.accept("id1"); + assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get()); + idChanger.accept("id2"); + assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get()); + + takeAndRestoreSnapshot(); + + timeProvider.time = 300; + idChanger.accept("id1"); + assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get()); + idChanger.accept("id2"); + assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get()); + } + + @Test + public void testSnapshotChangeRestore() throws Exception { + initTest(); + + timeProvider.time = 0; + sbetc.setCurrentKey("k1"); + ctx().update(ctx().updateEmpty); + + timeProvider.time = 50; + sbetc.setCurrentKey("k1"); + ctx().update(ctx().updateUnexpired); + + timeProvider.time = 100; + sbetc.setCurrentKey("k2"); + ctx().update(ctx().updateEmpty); + + KeyedStateHandle snapshot = sbetc.takeSnapshot(); + + timeProvider.time = 170; + sbetc.setCurrentKey("k1"); + ctx().update(ctx().updateExpired); + sbetc.setCurrentKey("k2"); + ctx().update(ctx().updateUnexpired); + + sbetc.createAndRestoreKeyedStateBackend(); + sbetc.restoreSnapshot(snapshot); + createState(); + + timeProvider.time = 180; + sbetc.setCurrentKey("k1"); + assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get()); + sbetc.setCurrentKey("k2"); + assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get()); + } + + @Test(expected = StateMigrationException.class) + public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception { + assumeThat(this, not(instanceOf(MockTtlStateTest.class))); + + initTest(); + + timeProvider.time = 0; + ctx().update(ctx().updateEmpty); + + KeyedStateHandle snapshot = sbetc.takeSnapshot(); + sbetc.createAndRestoreKeyedStateBackend(); + + sbetc.restoreSnapshot(snapshot); + sbetc.createState(ctx().createStateDescriptor(), ""); + } + + @After + public void tearDown() { + sbetc.disposeKeyedStateBackend(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java new file mode 100644 index 0000000000000..d40bfb00c20c9 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.runtime.state.internal.InternalKvState; + +abstract class TtlStateTestContextBase, UV, GV> { + S ttlState; + + UV updateEmpty; + UV updateUnexpired; + UV updateExpired; + + GV getUpdateEmpty; + GV getUnexpired; + GV getUpdateExpired; + + GV emptyValue = null; + + abstract void initTestValues(); + + abstract StateDescriptor createStateDescriptor(); + + abstract void update(UV value) throws Exception; + + abstract GV get() throws Exception; + + abstract Object getOriginal() throws Exception; + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java similarity index 67% rename from flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java index 8d9a4b4140207..976d89177a734 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java @@ -18,21 +18,19 @@ package org.apache.flink.runtime.state.ttl; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; /** Test suite for {@link TtlValueState}. */ -public class TtlValueStateTest extends TtlStateTestBase, String, String> { +class TtlValueStateTestContext extends TtlStateTestContextBase, String, String> { private static final String TEST_VAL1 = "test value1"; private static final String TEST_VAL2 = "test value2"; private static final String TEST_VAL3 = "test value3"; @Override void initTestValues() { - updater = v -> ttlState.update(v); - getter = () -> ttlState.value(); - originalGetter = () -> ttlState.original.value(); - updateEmpty = TEST_VAL1; updateUnexpired = TEST_VAL2; updateExpired = TEST_VAL3; @@ -42,10 +40,25 @@ void initTestValues() { getUpdateExpired = TEST_VAL3; } + @SuppressWarnings("unchecked") + @Override + StateDescriptor createStateDescriptor() { + return (StateDescriptor) new ValueStateDescriptor<>( + "TtlValueTestState", StringSerializer.INSTANCE); + } + + @Override + void update(String value) throws Exception { + ttlState.update(value); + } + + @Override + String get() throws Exception { + return ttlState.value(); + } + @Override - TtlValueState createState() { - ValueStateDescriptor valueStateDesc = - new ValueStateDescriptor<>("TtlValueTestState", StringSerializer.INSTANCE); - return (TtlValueState) wrapMockState(valueStateDesc); + Object getOriginal() throws Exception { + return ttlState.original.value(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java index 439ca7f2747fb..3f94669be714d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java @@ -21,14 +21,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.internal.InternalKvState; -import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; /** In memory mock internal state base class. */ -class MockInternalKvState implements InternalKvState { - private Map namespacedValues = new HashMap<>(); - private T defaultNamespaceValue; +abstract class MockInternalKvState implements InternalKvState { + Supplier> values; private N currentNamespace; private final Supplier emptyValue; @@ -38,7 +36,6 @@ class MockInternalKvState implements InternalKvState { MockInternalKvState(Supplier emptyValue) { this.emptyValue = emptyValue; - defaultNamespaceValue = emptyValue.get(); } @Override @@ -72,26 +69,20 @@ public byte[] getSerializedValue( @Override public void clear() { - if (currentNamespace == null) { - defaultNamespaceValue = emptyValue.get(); - } else { - namespacedValues.remove(currentNamespace); - } + getCurrentKeyValues().remove(currentNamespace); } + @SuppressWarnings("unchecked") public T getInternal() { - T value = currentNamespace == null ? defaultNamespaceValue : - namespacedValues.getOrDefault(currentNamespace, emptyValue.get()); - updateInternal(value); - return value; + return (T) getCurrentKeyValues().computeIfAbsent(currentNamespace, n -> emptyValue.get()); } @SuppressWarnings("WeakerAccess") public void updateInternal(T valueToStore) { - if (currentNamespace == null) { - defaultNamespaceValue = valueToStore; - } else { - namespacedValues.put(currentNamespace, valueToStore); - } + getCurrentKeyValues().put(currentNamespace, valueToStore); + } + + private Map getCurrentKeyValues() { + return values.get(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java index 386ef9772e530..9b5ac10c92d04 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java @@ -68,21 +68,17 @@ public boolean contains(UK key) { @Override public Iterable> entries() { - return copy().entrySet(); - } - - private Map copy() { - return new HashMap<>(getInternal()); + return getInternal().entrySet(); } @Override public Iterable keys() { - return copy().keySet(); + return getInternal().keySet(); } @Override public Iterable values() { - return copy().values(); + return getInternal().values(); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java new file mode 100644 index 0000000000000..363ecf8ef7932 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl.mock; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PriorityComparator; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet; +import org.apache.flink.runtime.state.ttl.TtlStateFactory; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** State backend which produces in memory mock state objects. */ +public class MockKeyedStateBackend extends AbstractKeyedStateBackend { + + @SuppressWarnings("deprecation") + private static final Map, KeyedStateFactory> STATE_FACTORIES = + Stream.of( + Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState), + Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState), + Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState), + Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState), + Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState), + Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState) + ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + + private final Map>> stateValues = new HashMap<>(); + + MockKeyedStateBackend( + TaskKvStateRegistry kvStateRegistry, + TypeSerializer keySerializer, + ClassLoader userCodeClassLoader, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + ExecutionConfig executionConfig, + TtlTimeProvider ttlTimeProvider) { + super(kvStateRegistry, keySerializer, userCodeClassLoader, + numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider); + } + + @Override + @SuppressWarnings("unchecked") + public IS createInternalState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDesc) throws Exception { + KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), TtlStateFactory.class); + throw new FlinkRuntimeException(message); + } + IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc); + ((MockInternalKvState) state).values = () -> stateValues + .computeIfAbsent(stateDesc.getName(), n -> new HashMap<>()) + .computeIfAbsent(getCurrentKey(), k -> new HashMap<>()); + return state; + } + + @Override + public int numStateEntries() { + int count = 0; + for (String state : stateValues.keySet()) { + for (K key : stateValues.get(state).keySet()) { + count += stateValues.get(state).get(key).size(); + } + } + return count; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + // noop + } + + @Override + public Stream getKeys(String state, N namespace) { + return stateValues.get(state).entrySet().stream() + .filter(e -> e.getValue().containsKey(namespace)) + .map(Map.Entry::getKey); + } + + @Override + public RunnableFuture> snapshot( + long checkpointId, + long timestamp, + CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) { + return new FutureTask<>(() -> SnapshotResult.of(new MockKeyedStateHandle<>(copy(stateValues)))); + } + + @SuppressWarnings("unchecked") + @Override + public void restore(Collection state) { + stateValues.clear(); + state = state == null ? Collections.emptyList() : state; + state.forEach(ksh -> stateValues.putAll(copy(((MockKeyedStateHandle) ksh).snapshotStates))); + } + + @SuppressWarnings("unchecked") + private static Map>> copy( + Map>> stateValues) { + Map>> snapshotStates = new HashMap<>(); + for (String stateName : stateValues.keySet()) { + Map> keyedValues = snapshotStates.computeIfAbsent(stateName, s -> new HashMap<>()); + for (K key : stateValues.get(stateName).keySet()) { + Map values = keyedValues.computeIfAbsent(key, s -> new HashMap<>()); + for (Object namespace : stateValues.get(stateName).get(key).keySet()) { + Object value = stateValues.get(stateName).get(key).get(namespace); + value = value instanceof List ? new ArrayList<>((List) value) : value; + value = value instanceof Map ? new HashMap<>((Map) value) : value; + values.put(namespace, value); + } + } + } + return snapshotStates; + } + + @Nonnull + @Override + public KeyGroupedInternalPriorityQueue + create( + @Nonnull String stateName, + @Nonnull TypeSerializer byteOrderedElementSerializer, + @Nonnull PriorityComparator elementPriorityComparator, + @Nonnull KeyExtractorFunction keyExtractor) { + return new HeapPriorityQueueSet<>( + elementPriorityComparator, + keyExtractor, + 0, + keyGroupRange, + 0); + } + + private static class MockKeyedStateHandle implements KeyedStateHandle { + private static final long serialVersionUID = 1L; + + final Map>> snapshotStates; + + MockKeyedStateHandle(Map>> snapshotStates) { + this.snapshotStates = snapshotStates; + } + + @Override + public void discardState() { + snapshotStates.clear(); + } + + @Override + public long getStateSize() { + throw new UnsupportedOperationException(); + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry) { + throw new UnsupportedOperationException(); + } + + @Override + public KeyGroupRange getKeyGroupRange() { + throw new UnsupportedOperationException(); + } + + @Override + public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java deleted file mode 100644 index d8433529e0715..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state.ttl.mock; - -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.State; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.KeyedStateFactory; -import org.apache.flink.runtime.state.ttl.TtlStateFactory; -import org.apache.flink.util.FlinkRuntimeException; - -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/** State factory which produces in memory mock state objects. */ -public class MockKeyedStateFactory implements KeyedStateFactory { - @SuppressWarnings("deprecation") - private static final Map, KeyedStateFactory> STATE_FACTORIES = - Stream.of( - Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState), - Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState), - Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState), - Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState), - Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState), - Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState) - ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); - - @Override - public IS createState( - TypeSerializer namespaceSerializer, - StateDescriptor stateDesc) throws Exception { - KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); - if (stateFactory == null) { - String message = String.format("State %s is not supported by %s", - stateDesc.getClass(), TtlStateFactory.class); - throw new FlinkRuntimeException(message); - } - return stateFactory.createState(namespaceSerializer, stateDesc); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java new file mode 100644 index 0000000000000..a8d49dde3ff59 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl.mock; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; + +import javax.annotation.Nullable; + +/** mack state backend. */ +public class MockStateBackend extends AbstractStateBackend { + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) { + throw new UnsupportedOperationException(); + } + + @Override + public CheckpointStorage createCheckpointStorage(JobID jobId) { + return new CheckpointStorage() { + @Override + public boolean supportsHighlyAvailableStorage() { + return false; + } + + @Override + public boolean hasDefaultSavepointLocation() { + return false; + } + + @Override + public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) { + return null; + } + + @Override + public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) { + return null; + } + + @Override + public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) { + return null; + } + + @Override + public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) { + return null; + } + + @Override + public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() { + return null; + } + }; + } + + @Override + public AbstractKeyedStateBackend createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) { + return new MockKeyedStateBackend<>( + new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()), + keySerializer, + env.getUserClassLoader(), + numberOfKeyGroups, + keyGroupRange, + env.getExecutionConfig(), + ttlTimeProvider); + } + + @Override + public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 6b4e3e99db476..3bf0aea036078 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue; import org.apache.flink.runtime.state.heap.TreeOrderedSetCache; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; @@ -270,10 +271,12 @@ public RocksDBKeyedStateBackend( ExecutionConfig executionConfig, boolean enableIncrementalCheckpointing, LocalRecoveryConfig localRecoveryConfig, - RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType + RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType, + TtlTimeProvider ttlTimeProvider ) throws IOException { - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); + super(kvStateRegistry, keySerializer, userCodeClassLoader, + numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider); this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); @@ -1347,7 +1350,7 @@ private ColumnFamilyHandle createColumnFamily(String stateName, RocksDB db) { } @Override - public IS createState( + public IS createInternalState( TypeSerializer namespaceSerializer, StateDescriptor stateDesc) throws Exception { StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 998521b810af9..1794e17548c15 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.AbstractID; import org.apache.flink.util.TernaryBoolean; @@ -410,7 +411,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws IOException { // first, make sure that the RocksDB JNI library is loaded // we do this explicitly here to have better error handling @@ -442,7 +444,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( env.getExecutionConfig(), isIncrementalCheckpointsEnabled(), localRecoveryConfig, - priorityQueueStateType); + priorityQueueStateType, + ttlTimeProvider); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 69069d6e784b0..6b254ce50d302 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.runtime.util.BlockingCheckpointOutputStream; @@ -241,7 +242,8 @@ public void testCorrectMergeOperatorSet() throws IOException { new ExecutionConfig(), enableIncrementalCheckpointing, TestLocalRecoveryConfig.disabled(), - RocksDBStateBackend.PriorityQueueStateType.HEAP); + RocksDBStateBackend.PriorityQueueStateType.HEAP, + TtlTimeProvider.DEFAULT); verify(columnFamilyOptions, Mockito.times(1)) .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java new file mode 100644 index 0000000000000..a59082864bc76 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.ttl.StateBackendTestContext; +import org.apache.flink.runtime.state.ttl.TtlStateTestBase; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.TernaryBoolean; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; + +/** Test suite for rocksdb state TTL. */ +public class RocksDBTtlStateTest extends TtlStateTestBase { + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + @Override + protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) { + return new StateBackendTestContext(timeProvider) { + @Override + protected StateBackend createStateBackend() { + return RocksDBTtlStateTest.this.createStateBackend(); + } + }; + } + + private StateBackend createStateBackend() { + String dbPath; + String checkpointPath; + try { + dbPath = tempFolder.newFolder().getAbsolutePath(); + checkpointPath = tempFolder.newFolder().toURI().toString(); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to init rocksdb test state backend"); + } + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.FALSE); + backend.setDbStoragePath(dbPath); + return backend; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 594f337f41693..ca9cb0b606762 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.StatePartitionStreamProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.CloseableIterable; @@ -270,7 +271,8 @@ protected AbstractKeyedStateBackend keyedStatedBackend( keySerializer, taskInfo.getMaxNumberOfParallelSubtasks(), keyGroupRange, - environment.getTaskKvStateRegistry()), + environment.getTaskKvStateRegistry(), + TtlTimeProvider.DEFAULT), backendCloseableRegistry, logDescription); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index 000cf31c0861d..6233c4c12c475 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.TaskStateManagerImplTest; import org.apache.flink.runtime.state.TestTaskLocalStateStore; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -138,7 +139,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( String operatorIdentifier, TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws Exception { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws Exception { return mock(AbstractKeyedStateBackend.class); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index e5558f6d24620..aebad543d069f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerActions; @@ -261,7 +262,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) { return null; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java index 914326bdbeb03..214fab5b5a69c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -53,7 +54,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( TypeSerializer keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws IOException { return spy(delegate.createKeyedStateBackend( env, jobID, @@ -61,7 +63,8 @@ public AbstractKeyedStateBackend createKeyedStateBackend( keySerializer, numberOfKeyGroups, keyGroupRange, - kvStateRegistry)); + kvStateRegistry, + ttlTimeProvider)); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java index 46c408e19575a..f3bae166a03ea 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.ExceptionUtils; @@ -105,13 +106,14 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException @Override public AbstractKeyedStateBackend createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry, + TtlTimeProvider ttlTimeProvider) throws IOException { throw new SuccessException(); }