Skip to content

Commit

Permalink
[FLINK-10473][State TTL] TTL state incremental cleanup for heap backend
Browse files Browse the repository at this point in the history
This closes apache#7188.
  • Loading branch information
azagrebin authored and StefanRRichter committed Jan 29, 2019
1 parent f80ab20 commit 592f3a9
Show file tree
Hide file tree
Showing 42 changed files with 1,140 additions and 181 deletions.
46 changes: 46 additions & 0 deletions docs/dev/stream/state/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ e.g. by calling `ValueState.value()`.
<span class="label label-danger">Attention</span> This means that by default if expired state is not read,
it won't be removed, possibly leading to ever growing state. This might change in future releases.

##### Cleanup in full snapshot

Additionally, you can activate the cleanup at the moment of taking the full state snapshot which
will reduce its size. The local state is not cleaned up under the current implementation
but it will not include the removed expired state in case of restoration from the previous snapshot.
Expand Down Expand Up @@ -394,6 +396,50 @@ val ttlConfig = StateTtlConfig

This option is not applicable for the incremental checkpointing in the RocksDB state backend.

##### Incremental cleanup

Another option is to trigger cleanup of some state entries incrementally.
The trigger can be a callback from each state access or/and each record processing.
If this cleanup strategy is active for certain state,
The storage backend keeps a lazy global iterator for this state over all its entries.
Every time incremental cleanup is triggered, the iterator is advanced.
The traversed state entries are checked and expired ones are cleaned up.

This feature can be activated in `StateTtlConfig`:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally()
.build();
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.flink.api.common.state.StateTtlConfig
val ttlConfig = StateTtlCon fig
.newBuilder(Time.seconds(1))
.cleanupIncrementally
.build
{% endhighlight %}
</div>
</div>

This strategy has two parameters. The first one is number of checked state entries per each cleanup triggering.
If enabled, it is always triggered per each state access.
The second parameter defines whether to trigger cleanup additionally per each record processing.

**Notes:**
- If no access happens to the state or no records are processed, expired state will persist.
- Time spent for the incremental cleanup increases record processing latency.
- At the moment incremental cleanup is implemented only for Heap state backend. Setting it for RocksDB will have no effect.
- If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys
while iterating because of its specific implementation which does not support concurrent modifications.
Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.

More strategies will be added in the future for cleaning up expired state automatically in the background.

### State in the Scala DataStream API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.EnumMap;
Expand Down Expand Up @@ -219,7 +221,44 @@ public Builder useProcessingTime() {
public Builder cleanupFullSnapshot() {
cleanupStrategies.strategies.put(
CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
new CleanupStrategies.CleanupStrategy() { });
new CleanupStrategies.EmptyCleanupStrategy());
return this;
}

/**
* Cleanup expired state incrementally cleanup local state.
*
* <p>Upon every state access this cleanup strategy checks a bunch of state keys for expiration
* and cleans up expired ones. It keeps a lazy iterator through all keys with relaxed consistency
* if backend supports it. This way all keys should be regularly checked and cleaned eventually over time
* if any state is constantly being accessed.
*
* <p>Additionally to the incremental cleanup upon state access, it can also run per every record.
* Caution: if there are a lot of registered states using this option,
* they all will be iterated for every record to check if there is something to cleanup.
*
* <p>Note: if no access happens to this state or no records are processed
* in case of {@code runCleanupForEveryRecord}, expired state will persist.
*
* <p>Note: Time spent for the incremental cleanup increases record processing latency.
*
* <p>Note: At the moment incremental cleanup is implemented only for Heap state backend.
* Setting it for RocksDB will have no effect.
*
* <p>Note: If heap state backend is used with synchronous snapshotting, the global iterator keeps a copy of all keys
* while iterating because of its specific implementation which does not support concurrent modifications.
* Enabling of this feature will increase memory consumption then. Asynchronous snapshotting does not have this problem.
*
* @param cleanupSize max number of keys pulled from queue for clean up upon state touch for any key
* @param runCleanupForEveryRecord run incremental cleanup per each processed record
*/
@Nonnull
public Builder cleanupIncrementally(
@Nonnegative int cleanupSize,
boolean runCleanupForEveryRecord) {
cleanupStrategies.strategies.put(
CleanupStrategies.Strategies.INCREMENTAL_CLEANUP,
new IncrementalCleanupStrategy(cleanupSize, runCleanupForEveryRecord));
return this;
}

Expand Down Expand Up @@ -256,18 +295,56 @@ public static class CleanupStrategies implements Serializable {

/** Fixed strategies ordinals in {@code strategies} config field. */
enum Strategies {
FULL_STATE_SCAN_SNAPSHOT
FULL_STATE_SCAN_SNAPSHOT,
INCREMENTAL_CLEANUP
}

/** Base interface for cleanup strategies configurations. */
interface CleanupStrategy extends Serializable {

}

static class EmptyCleanupStrategy implements CleanupStrategy {
private static final long serialVersionUID = 1373998465131443873L;
}

final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

public boolean inFullSnapshot() {
return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
}

@Nullable
public IncrementalCleanupStrategy getIncrementalCleanupStrategy() {
return (IncrementalCleanupStrategy) strategies.get(Strategies.INCREMENTAL_CLEANUP);
}
}

/** Configuration of cleanup strategy while taking the full snapshot. */
public static class IncrementalCleanupStrategy implements CleanupStrategies.CleanupStrategy {
private static final long serialVersionUID = 3109278696501988780L;

/** Max number of keys pulled from queue for clean up upon state touch for any key. */
private final int cleanupSize;

/** Whether to run incremental cleanup per each processed record. */
private final boolean runCleanupForEveryRecord;

private IncrementalCleanupStrategy(
int cleanupSize,
boolean runCleanupForEveryRecord) {
Preconditions.checkArgument(cleanupSize >= 0,
"Number of incrementally cleaned up state entries cannot be negative.");
this.cleanupSize = cleanupSize;
this.runCleanupForEveryRecord = runCleanupForEveryRecord;
}

public int getCleanupSize() {
return cleanupSize;
}

public boolean runCleanupForEveryRecord() {
return runCleanupForEveryRecord;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
Expand All @@ -48,26 +45,6 @@
* </ul>
*/
public class DataStreamStateTTLTestProgram {
private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
.key("update_generator_source.keyspace")
.defaultValue(100);

private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
.key("update_generator_source.sleep_time")
.defaultValue(0L);

private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
.key("update_generator_source.sleep_after_elements")
.defaultValue(0L);

private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions
.key("state_ttl_verifier.ttl_milli")
.defaultValue(1000L);

private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions
.key("report_stat.after_updates_num")
.defaultValue(200L);

public static void main(String[] args) throws Exception {
final ParameterTool pt = ParameterTool.fromArgs(args);

Expand All @@ -77,23 +54,17 @@ public static void main(String[] args) throws Exception {

final MonotonicTTLTimeProvider ttlTimeProvider = setBackendWithCustomTTLTimeProvider(env);

int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());

StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(ttl).build();
TtlTestConfig config = TtlTestConfig.fromArgs(pt);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(config.ttl)
.cleanupIncrementally(5, true)
.cleanupFullSnapshot()
.build();

env
.addSource(new TtlStateUpdateSource(keySpace, sleepAfterElements, sleepTime))
.addSource(new TtlStateUpdateSource(config.keySpace, config.sleepAfterElements, config.sleepTime))
.name("TtlStateUpdateSource")
.keyBy(TtlStateUpdate::getKey)
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, reportStatAfterUpdatesNum))
.flatMap(new TtlVerifyUpdateFunction(ttlConfig, ttlTimeProvider, config.reportStatAfterUpdatesNum))
.name("TtlVerifyUpdateFunction")
.addSink(new PrintSinkFunction<>())
.name("PrintFailedVerifications");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

class TtlTestConfig {
private static final ConfigOption<Integer> UPDATE_GENERATOR_SRC_KEYSPACE = ConfigOptions
.key("update_generator_source.keyspace")
.defaultValue(100);

private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_TIME = ConfigOptions
.key("update_generator_source.sleep_time")
.defaultValue(0L);

private static final ConfigOption<Long> UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS = ConfigOptions
.key("update_generator_source.sleep_after_elements")
.defaultValue(0L);

private static final ConfigOption<Long> STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions
.key("state_ttl_verifier.ttl_milli")
.defaultValue(1000L);

private static final ConfigOption<Long> REPORT_STAT_AFTER_UPDATES_NUM = ConfigOptions
.key("report_stat.after_updates_num")
.defaultValue(200L);

final int keySpace;
final long sleepAfterElements;
final long sleepTime;
final Time ttl;
final long reportStatAfterUpdatesNum;

private TtlTestConfig(int keySpace, long sleepAfterElements, long sleepTime, Time ttl, long reportStatAfterUpdatesNum) {
this.keySpace = keySpace;
this.sleepAfterElements = sleepAfterElements;
this.sleepTime = sleepTime;
this.ttl = ttl;
this.reportStatAfterUpdatesNum = reportStatAfterUpdatesNum;
}

static TtlTestConfig fromArgs(ParameterTool pt) {
int keySpace = pt.getInt(UPDATE_GENERATOR_SRC_KEYSPACE.key(), UPDATE_GENERATOR_SRC_KEYSPACE.defaultValue());
long sleepAfterElements = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.key(),
UPDATE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue());
long sleepTime = pt.getLong(UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
Time ttl = Time.milliseconds(pt.getLong(STATE_TTL_VERIFIER_TTL_MILLI.key(),
STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
long reportStatAfterUpdatesNum = pt.getLong(REPORT_STAT_AFTER_UPDATES_NUM.key(),
REPORT_STAT_AFTER_UPDATES_NUM.defaultValue());
return new TtlTestConfig(keySpace, sleepAfterElements, sleepTime, ttl, reportStatAfterUpdatesNum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,7 @@
* - emits verification context in case of failure
*/
class TtlVerifyUpdateFunction extends RichFlatMapFunction<TtlStateUpdate, String> implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(TtlVerifyUpdateFunction.class);

@Nonnull
Expand Down Expand Up @@ -154,6 +152,8 @@ public void initializeState(FunctionInitializationContext context) {
}

private static class UpdateStat implements Serializable {
private static final long serialVersionUID = -4557720969995878873L;

final long reportStatAfterUpdatesNum;
long updates = 0;
long prevUpdatesNum = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
class TtlAggregatingStateVerifier extends AbstractTtlStateVerifier<
AggregatingStateDescriptor<Integer, Long, String>, AggregatingState<Integer, String>, Long, Integer, String> {
TtlAggregatingStateVerifier() {
super(new AggregatingStateDescriptor<>("TtlAggregatingStateVerifier", AGG_FUNC, LongSerializer.INSTANCE));
super(new AggregatingStateDescriptor<>(TtlAggregatingStateVerifier.class.getSimpleName(), AGG_FUNC, LongSerializer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class TtlFoldingStateVerifier extends AbstractTtlStateVerifier<

TtlFoldingStateVerifier() {
super(new FoldingStateDescriptor<>(
"TtlFoldingStateVerifier", INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE));
TtlFoldingStateVerifier.class.getSimpleName(), INIT_VAL, (v, acc) -> acc + v, LongSerializer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
class TtlListStateVerifier extends AbstractTtlStateVerifier<
ListStateDescriptor<String>, ListState<String>, List<String>, String, List<String>> {
TtlListStateVerifier() {
super(new ListStateDescriptor<>("TtlListStateVerifier", StringSerializer.INSTANCE));
super(new ListStateDescriptor<>(TtlListStateVerifier.class.getSimpleName(), StringSerializer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class TtlMapStateVerifier extends AbstractTtlStateVerifier<
}

TtlMapStateVerifier() {
super(new MapStateDescriptor<>("TtlMapStateVerifier", StringSerializer.INSTANCE, StringSerializer.INSTANCE));
super(new MapStateDescriptor<>(TtlMapStateVerifier.class.getSimpleName(), StringSerializer.INSTANCE, StringSerializer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TtlReducingStateVerifier extends AbstractTtlStateVerifier<
ReducingStateDescriptor<Integer>, ReducingState<Integer>, Integer, Integer, Integer> {
TtlReducingStateVerifier() {
super(new ReducingStateDescriptor<>(
"TtlReducingStateVerifier",
TtlReducingStateVerifier.class.getSimpleName(),
(ReduceFunction<Integer>) (value1, value2) -> value1 + value2,
IntSerializer.INSTANCE));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
class TtlValueStateVerifier
extends AbstractTtlStateVerifier<ValueStateDescriptor<String>, ValueState<String>, String, String, String> {
TtlValueStateVerifier() {
super(new ValueStateDescriptor<>("TtlValueStateVerifier", StringSerializer.INSTANCE));
super(new ValueStateDescriptor<>(TtlValueStateVerifier.class.getSimpleName(), StringSerializer.INSTANCE));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
source "$(dirname "$0")"/common.sh

STATE_BACKEND_TYPE="${1:-file}"
STATE_BACKEND_FILE_ASYNC="${2:-false}"
STATE_BACKEND_FILE_ASYNC="${2:-true}"
TTL="${3:-1000}"
PRECISION="${4:-5}"
PARALLELISM="${5-3}"
Expand Down
Loading

0 comments on commit 592f3a9

Please sign in to comment.