Skip to content

Commit

Permalink
[FLINK-9701] [state] (follow up) Use StateTtlConfiguration.DISABLED i…
Browse files Browse the repository at this point in the history
…nstead of null, make it Serializable and add

convenience methods to its builder

This closes apache#6331.
  • Loading branch information
azagrebin authored and tillrohrmann committed Jul 13, 2018
1 parent 57872d5 commit 1632681
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -94,8 +95,8 @@ public enum Type {
private String queryableStateName;

/** Name for queries against state created from this StateDescriptor. */
@Nullable
private StateTtlConfiguration ttlConfig;
@Nonnull
private StateTtlConfiguration ttlConfig = StateTtlConfiguration.DISABLED;

/** The default value returned by the state when no other value is bound to a key. */
@Nullable
Expand Down Expand Up @@ -208,7 +209,8 @@ public TypeSerializer<T> getSerializer() {
* @throws IllegalStateException If queryable state name already set
*/
public void setQueryable(String queryableStateName) {
Preconditions.checkArgument(ttlConfig == null,
Preconditions.checkArgument(
ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled,
"Queryable state is currently not supported with TTL");
if (this.queryableStateName == null) {
this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
Expand Down Expand Up @@ -247,12 +249,14 @@ public boolean isQueryable() {
*/
public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
Preconditions.checkNotNull(ttlConfig);
Preconditions.checkArgument(queryableStateName == null,
Preconditions.checkArgument(
ttlConfig.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled &&
queryableStateName == null,
"Queryable state is currently not supported with TTL");
this.ttlConfig = ttlConfig;
}

@Nullable
@Nonnull
@Internal
public StateTtlConfiguration getTtlConfig() {
return ttlConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,28 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime;
import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite;

/**
* Configuration of state TTL logic.
*/
public class StateTtlConfiguration {
public class StateTtlConfiguration implements Serializable {

private static final long serialVersionUID = -7592693245044289793L;

public static final StateTtlConfiguration DISABLED =
newBuilder(Time.milliseconds(Long.MAX_VALUE)).setTtlUpdateType(TtlUpdateType.Disabled).build();

/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum TtlUpdateType {
/** TTL is disabled. State does not expire. */
Disabled,
/** Last access timestamp is initialised when state is created and updated on every write operation. */
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
Expand Down Expand Up @@ -91,6 +101,10 @@ public TtlTimeCharacteristic getTimeCharacteristic() {
return timeCharacteristic;
}

public boolean isEnabled() {
return ttlUpdateType != TtlUpdateType.Disabled;
}

@Override
public String toString() {
return "StateTtlConfiguration{" +
Expand Down Expand Up @@ -129,6 +143,14 @@ public Builder setTtlUpdateType(TtlUpdateType ttlUpdateType) {
return this;
}

public Builder updateTtlOnCreateAndWrite() {
return setTtlUpdateType(TtlUpdateType.OnCreateAndWrite);
}

public Builder updateTtlOnReadAndWrite() {
return setTtlUpdateType(TtlUpdateType.OnReadAndWrite);
}

/**
* Sets the state visibility.
*
Expand All @@ -139,6 +161,14 @@ public Builder setStateVisibility(TtlStateVisibility stateVisibility) {
return this;
}

public Builder returnExpiredIfNotCleanedUp() {
return setStateVisibility(TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
}

public Builder neverReturnExpired() {
return setStateVisibility(TtlStateVisibility.NeverReturnExpired);
}

/**
* Sets the time characteristic.
*
Expand All @@ -149,6 +179,10 @@ public Builder setTimeCharacteristic(TtlTimeCharacteristic timeCharacteristic) {
return this;
}

public Builder useProcessingTime() {
return setTimeCharacteristic(TtlTimeCharacteristic.ProcessingTime);
}

/**
* Sets the ttl time.
* @param ttl The ttl time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTt
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(originalStateFactory);
Preconditions.checkNotNull(timeProvider);
return stateDesc.getTtlConfig() == null ?
originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
return stateDesc.getTtlConfig().isEnabled() ?
new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider)
.createState(namespaceSerializer, stateDesc);
.createState(namespaceSerializer, stateDesc) :
originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
}

private final Map<Class<? extends StateDescriptor>, KeyedStateFactory> stateFactories;
Expand Down

0 comments on commit 1632681

Please sign in to comment.