Skip to content

Commit

Permalink
[FLINK-10471][State TTL] State TTL cleanup using RocksDb compaction f…
Browse files Browse the repository at this point in the history
…ilter

This closes apache#7163.
  • Loading branch information
azagrebin authored and StefanRRichter committed Feb 19, 2019
1 parent adc5eef commit 735b514
Show file tree
Hide file tree
Showing 46 changed files with 1,232 additions and 370 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/rocks_db_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,10 @@
<td style="word-wrap: break-word;">"HEAP"</td>
<td>This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.ttl.compaction.filter.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>This determines if compaction filter to cleanup state with TTL is enabled for backend.Note: User can still decide in state TTL configuration in state descriptor whether the filter is active for particular state or not.</td>
</tr>
</tbody>
</table>
67 changes: 65 additions & 2 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ If the serializer does not support null values, it can be wrapped with `Nullable

#### Cleanup of Expired State

Currently, expired values are only removed when they are read out explicitly,
By default, expired values are only removed when they are read out explicitly,
e.g. by calling `ValueState.value()`.

<span class="label label-danger">Attention</span> This means that by default if expired state is not read,
Expand Down Expand Up @@ -396,6 +396,10 @@ val ttlConfig = StateTtlConfig

This option is not applicable for the incremental checkpointing in the RocksDB state backend.

**Notes:**
- For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`,
e.g. after restart from savepoint.

##### Incremental cleanup

Another option is to trigger cleanup of some state entries incrementally.
Expand Down Expand Up @@ -439,8 +443,67 @@ The second parameter defines whether to trigger cleanup additionally per each re
- If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys
while iterating because of its specific implementation which does not support concurrent modifications.
Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
- For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`,
e.g. after restart from savepoint.

##### Cleanup during RocksDB compaction

If RocksDB state backend is used, another cleanup strategy is to activate Flink specific compaction filter.
RocksDB periodically runs asynchronous compactions to merge state updates and reduce storage.
Flink compaction filter checks expiration timestamp of state entries with TTL
and excludes expired values.

This feature is disabled by default. It has to be firstly activated for the RocksDB backend
by setting Flink configuration option `state.backend.rocksdb.ttl.compaction.filter.enabled`
or by calling `RocksDBStateBackend::enableTtlCompactionFilter` if a custom RocksDB state backend is created for a job.
Then any state with TTL can be configured to use the filter:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.state.StateTtlConfig;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter()
.build();
{% endhighlight %}
</div>

More strategies will be added in the future for cleaning up expired state automatically in the background.
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.state.StateTtlConfig

val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter
.build
{% endhighlight %}
</div>
</div>

RocksDB compaction filter will query current timestamp, used to check expiration, from Flink every time
after processing certain number of state entries. This number is 1000 by default.
You can optionally change it and pass a custom value to
`StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries)` method.
Updating the timestamp more often can improve cleanup speed
but it decreases compaction performance because it uses JNI call from native code.

You can activate debug logs from the native code of RocksDB filter
by activating debug level for `FlinkCompactionFilter`:

`log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG`

**Notes:**
- Calling of TTL filter during compaction slows it down.
The TTL filter has to parse timestamp of last access and check its expiration
for every stored state entry per key which is being compacted.
In case of collection state type (list or map) the check is also invoked per stored element.
- If this feature is used with a list state which has elements with non-fixed byte length,
the native TTL filter has to call additionally a Flink java type serializer of the element over JNI per each state entry
where at least the first element has expired to determine the offset of the next unexpired element.
- For existing jobs, this cleanup strategy can be activated or deactivated anytime in `StateTtlConfig`,
e.g. after restart from savepoint.

### State in the Scala DataStream API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ public Builder useProcessingTime() {
/** Cleanup expired state in full snapshot on checkpoint. */
@Nonnull
public Builder cleanupFullSnapshot() {
cleanupStrategies.strategies.put(
CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
new CleanupStrategies.EmptyCleanupStrategy());
cleanupStrategies.activate(CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT);
return this;
}

Expand Down Expand Up @@ -256,12 +254,49 @@ public Builder cleanupFullSnapshot() {
public Builder cleanupIncrementally(
@Nonnegative int cleanupSize,
boolean runCleanupForEveryRecord) {
cleanupStrategies.strategies.put(
cleanupStrategies.activate(
CleanupStrategies.Strategies.INCREMENTAL_CLEANUP,
new IncrementalCleanupStrategy(cleanupSize, runCleanupForEveryRecord));
return this;
}

/**
* Cleanup expired state while Rocksdb compaction is running.
*
* <p>RocksDB runs periodic compaction of state updates and merges them to free storage.
* During this process, the TTL filter checks timestamp of state entries and drops expired ones.
* The feature has to be activated in RocksDb backend firstly
* using the following Flink configuration option:
* state.backend.rocksdb.ttl.compaction.filter.enabled.
*
* <p>Due to specifics of RocksDB compaction filter,
* cleanup is not properly guaranteed if put and merge operations are used at the same time:
* https://github.com/facebook/rocksdb/blob/master/include/rocksdb/compaction_filter.h#L69
* It means that the TTL filter should be tested for List state taking into account this caveat.
*
*/
@Nonnull
public Builder cleanupInRocksdbCompactFilter() {
return cleanupInRocksdbCompactFilter(1000L);
}

/**
* Cleanup expired state while Rocksdb compaction is running.
*
* <p>RocksDB compaction filter will query current timestamp,
* used to check expiration, from Flink every time after processing {@code queryTimeAfterNumEntries} number of state entries.
* Updating the timestamp more often can improve cleanup speed
* but it decreases compaction performance because it uses JNI call from native code.
*
* @param queryTimeAfterNumEntries number of state entries to process by compaction filter before updating current timestamp
*/
@Nonnull
public Builder cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) {
cleanupStrategies.activate(CleanupStrategies.Strategies.ROCKSDB_COMPACTION_FILTER,
new RocksdbCompactFilterCleanupStrategy(queryTimeAfterNumEntries));
return this;
}

/**
* Sets the ttl time.
* @param ttl The ttl time.
Expand Down Expand Up @@ -293,10 +328,13 @@ public StateTtlConfig build() {
public static class CleanupStrategies implements Serializable {
private static final long serialVersionUID = -1617740467277313524L;

private static final CleanupStrategy EMPTY_STRATEGY = new EmptyCleanupStrategy();

/** Fixed strategies ordinals in {@code strategies} config field. */
enum Strategies {
FULL_STATE_SCAN_SNAPSHOT,
INCREMENTAL_CLEANUP
INCREMENTAL_CLEANUP,
ROCKSDB_COMPACTION_FILTER
}

/** Base interface for cleanup strategies configurations. */
Expand All @@ -310,6 +348,14 @@ static class EmptyCleanupStrategy implements CleanupStrategy {

final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

public void activate(Strategies strategy) {
activate(strategy, EMPTY_STRATEGY);
}

public void activate(Strategies strategy, CleanupStrategy config) {
strategies.put(strategy, config);
}

public boolean inFullSnapshot() {
return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}
Expand All @@ -318,6 +364,15 @@ public boolean inFullSnapshot() {
public IncrementalCleanupStrategy getIncrementalCleanupStrategy() {
return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP);
}

public boolean inRocksdbCompactFilter() {
return strategies.containsKey(Strategies.ROCKSDB_COMPACTION_FILTER);
}

@Nullable
public RocksdbCompactFilterCleanupStrategy getRocksdbCompactFilterCleanupStrategy() {
return (RocksdbCompactFilterCleanupStrategy) strategies.get(Strategies.ROCKSDB_COMPACTION_FILTER);
}
}

/** Configuration of cleanup strategy while taking the full snapshot. */
Expand Down Expand Up @@ -347,4 +402,20 @@ public boolean runCleanupForEveryRecord() {
return runCleanupForEveryRecord;
}
}

/** Configuration of cleanup strategy using custom compaction filter in RocksDB. */
public static class RocksdbCompactFilterCleanupStrategy implements CleanupStrategies.CleanupStrategy {
private static final long serialVersionUID = 3109278796506988980L;

/** Number of state entries to process by compaction filter before updating current timestamp. */
private final long queryTimeAfterNumEntries;

private RocksdbCompactFilterCleanupStrategy(long queryTimeAfterNumEntries) {
this.queryTimeAfterNumEntries = queryTimeAfterNumEntries;
}

public long getQueryTimeAfterNumEntries() {
return queryTimeAfterNumEntries;
}
}
}
5 changes: 5 additions & 0 deletions flink-end-to-end-tests/flink-stream-state-ttl-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ under the License.
<artifactId>flink-datastream-allround-test</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
Expand Down Expand Up @@ -52,19 +53,20 @@ public static void main(String[] args) throws Exception {

setupEnvironment(env, pt);

final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);
setBackendWithCustomTTLTimeProvider(env);

TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
.cleanupIncrementally(5, true)
.cleanupFullSnapshot()
.cleanupIncrementally(5, true)
.cleanupInRocksdbCompactFilter()
.build();

env
.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
Expand All @@ -76,15 +78,15 @@ public static void main(String[] args) throws Exception {
* Sets the state backend to a new {@link StubStateBackend} which has a {@link MonotonicTTLTimeProvider}.
*
* @param env The {@link StreamExecutionEnvironment} of the job.
* @return The {@link MonotonicTTLTimeProvider}.
*/
private static MonotonicTTLTimeProvider setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
private static void setBackendWithCustomTTLTimeProvider(StreamExecutionEnvironment env) {
final MonotonicTTLTimeProvider ttlTimeProvider = new MonotonicTTLTimeProvider();

final StateBackend configuredBackend = env.getStateBackend();
if (configuredBackend instanceof RocksDBStateBackend) {
((RocksDBStateBackend) configuredBackend).enableTtlCompactionFilter();
}
final StateBackend stubBackend = new StubStateBackend(configuredBackend, ttlTimeProvider);
env.setStateBackend(stubBackend);

return ttlTimeProvider;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;

import java.io.Serializable;
Expand All @@ -42,21 +43,41 @@ final class MonotonicTTLTimeProvider implements TtlTimeProvider, Serializable {
* the time, but the backend would not be notified about it, resulting in inconsistent
* state.
*
* If the number of task slots per TM changes, then we may need to add also synchronization.
* We have to add synchronization because the time provider is also accessed concurrently
* from RocksDB compaction filter threads.
*/

private static boolean timeIsFrozen = false;

private static long lastReturnedProcessingTime = Long.MIN_VALUE;

private static final Object lock = new Object();

@GuardedBy("lock")
static long freeze() {
synchronized (lock) {
if (!timeIsFrozen || lastReturnedProcessingTime == Long.MIN_VALUE) {
timeIsFrozen = true;
return getCurrentTimestamp();
} else {
return lastReturnedProcessingTime;
}
}
}

@Override
@GuardedBy("lock")
public long currentTimestamp() {
if (timeIsFrozen && lastReturnedProcessingTime != Long.MIN_VALUE) {
return lastReturnedProcessingTime;
synchronized (lock) {
if (timeIsFrozen && lastReturnedProcessingTime != Long.MIN_VALUE) {
return lastReturnedProcessingTime;
}
return getCurrentTimestamp();
}
}

timeIsFrozen = true;

@GuardedBy("lock")
private static long getCurrentTimestamp() {
final long currentProcessingTime = System.currentTimeMillis();
if (currentProcessingTime < lastReturnedProcessingTime) {
return lastReturnedProcessingTime;
Expand All @@ -66,8 +87,11 @@ public long currentTimestamp() {
return lastReturnedProcessingTime;
}

long unfreezeTime() {
timeIsFrozen = false;
return lastReturnedProcessingTime;
@GuardedBy("lock")
static long unfreezeTime() {
synchronized (lock) {
timeIsFrozen = false;
return lastReturnedProcessingTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,15 @@ class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String

@Nonnull
private final StateTtlConfig ttlConfig;
private final MonotonicTTLTimeProvider ttlTimeProvider;
private final UpdateStat stat;

private transient Map<String, State> states;
private transient Map<String, ListState<ValueWithTs<?>>> prevUpdatesByVerifierId;

TtlVerifyUpdateFunction(
@Nonnull StateTtlConfig ttlConfig,
MonotonicTTLTimeProvider ttlTimeProvider,
long reportStatAfterUpdatesNum) {
this.ttlConfig = ttlConfig;
this.ttlTimeProvider = checkNotNull(ttlTimeProvider);
this.stat = new UpdateStat(reportStatAfterUpdatesNum);
}

Expand Down Expand Up @@ -117,12 +114,12 @@ private List<ValueWithTs<?>> getPrevUpdates(String verifierId) throws Exception
TtlStateVerifier<?, ?> verifier,
Object update) throws Exception {

final long timestampBeforeUpdate = ttlTimeProvider.currentTimestamp();
final long timestampBeforeUpdate = MonotonicTTLTimeProvider.freeze();
State state = states.get(verifier.getId());
Object valueBeforeUpdate = verifier.get(state);
verifier.update(state, update);
Object updatedValue = verifier.get(state);
final long timestampAfterUpdate = ttlTimeProvider.unfreezeTime();
final long timestampAfterUpdate = MonotonicTTLTimeProvider.unfreezeTime();

checkState(
timestampAfterUpdate == timestampBeforeUpdate,
Expand Down
Loading

0 comments on commit 735b514

Please sign in to comment.