Skip to content

Commit

Permalink
[FLINK-21357][runtime/statebackend]Periodic materialization for gener…
Browse files Browse the repository at this point in the history
…alized incremental checkpoints (apache#16606)

* [hotfix][state] Introduce ChangelogOptions and backend materialization related configurations

* [hotfix][state] Fix log4j2-test.properties for flink-statebackend-changelog

* [hotfix][runtime] Move AsyncExceptionHandler from flink-streaming-java to flink-runtime

* [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

* [do not commit][for testing only]

* fixup! [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

Remove waiting for termination of periodicExecutor

* fixup! [hotfix][state] Introduce ChangelogOptions and backend materialization related configurations

Remove changelog_configuration.html

* fixup! [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

close periodicMaterializationManager properly

* fixup! [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

add synchronized access to periodicExecutor in PeriodicMaterializationManager

* fixup! [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

make the semantics of allowedNumberOfFailures consistent

* fixup! [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

Remove Test Code

* fixup! [hotfix][state] Introduce ChangelogOptions and backend materialization related configurations

add filesystem for STATE_CHANGE_LOG_STORAGE config description

* fixup! [FLINK-21357][state] Periodic materialization for generalized incremental checkpoints

several minior fixes

* fixup! [hotfix][state] Introduce ChangelogOptions and backend materialization related configurations

compiling the docs

* fixup! [do not commit][for testing only]

reset random flag of changelog
  • Loading branch information
curcur authored Oct 25, 2021
1 parent 955bd58 commit 446bc70
Show file tree
Hide file tree
Showing 42 changed files with 1,091 additions and 189 deletions.
12 changes: 0 additions & 12 deletions docs/layouts/shortcodes/generated/checkpointing_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,6 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
</tr>
<tr>
<td><h5>state.backend.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@
<td>String</td>
<td>The default directory for savepoints. Used by the state backends that write savepoints to file systems (HashMapStateBackend, EmbeddedRocksDBStateBackend).</td>
</tr>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
</tr>
<tr>
<td><h5>state.backend.incremental</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.max-failures-allowed</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Max number of consecutive materialization failures allowed.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>Defines the interval in milliseconds to perform periodic materialization for state backend.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.max-failures-allowed</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Max number of consecutive materialization failures allowed.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>Defines the interval in milliseconds to perform periodic materialization for state backend.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public static final class Sections {
public static final String STATE_BACKEND_LATENCY_TRACKING =
"state_backend_latency_tracking";

public static final String STATE_BACKEND_CHANGELOG = "state_backend_changelog";

public static final String EXPERT_CLASS_LOADING = "expert_class_loading";
public static final String EXPERT_DEBUGGING_AND_TUNING = "expert_debugging_and_tuning";
public static final String EXPERT_SCHEDULING = "expert_scheduling";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.util.Preconditions;

import com.esotericsoftware.kryo.Serializer;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand Down Expand Up @@ -127,13 +129,22 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
private boolean forceAvro = false;
private long autoWatermarkInterval = 200;

// ---------- statebackend related configurations ------------------------------
/**
* Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
*/
private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();

private boolean isLatencyTrackingConfigured = false;

/** Interval in milliseconds to perform periodic changelog materialization. */
private long periodicMaterializeIntervalMillis =
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue().toMillis();

/** Max allowed number of consecutive failures for changelog materialization */
private int materializationMaxAllowedFailures =
StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED.defaultValue();

/**
* @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
*/
Expand Down Expand Up @@ -281,6 +292,26 @@ public boolean isLatencyTrackingConfigured() {
return isLatencyTrackingConfigured;
}

@Internal
public long getPeriodicMaterializeIntervalMillis() {
return periodicMaterializeIntervalMillis;
}

@Internal
public void setPeriodicMaterializeIntervalMillis(Duration periodicMaterializeInterval) {
this.periodicMaterializeIntervalMillis = periodicMaterializeInterval.toMillis();
}

@Internal
public int getMaterializationMaxAllowedFailures() {
return materializationMaxAllowedFailures;
}

@Internal
public void setMaterializationMaxAllowedFailures(int materializationMaxAllowedFailures) {
this.materializationMaxAllowedFailures = materializationMaxAllowedFailures;
}

/**
* Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism.
Expand Down Expand Up @@ -1111,6 +1142,13 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.getOptional(MetricOptions.LATENCY_INTERVAL)
.ifPresent(this::setLatencyTrackingInterval);

configuration
.getOptional(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)
.ifPresent(this::setPeriodicMaterializeIntervalMillis);
configuration
.getOptional(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED)
.ifPresent(this::setMaterializationMaxAllowedFailures);

configuration
.getOptional(PipelineOptions.MAX_PARALLELISM)
.ifPresent(this::setMaxParallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,44 +100,6 @@ public class CheckpointingOptions {
"Recognized shortcut names are 'jobmanager' and 'filesystem'.")
.build());

/** Whether to enable state change log. */
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
ConfigOptions.key("state.backend.changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable state backend to write state changes to StateChangelog. "
+ "If this config is not set explicitly, it means no preference "
+ "for enabling the change log, and the value in lower config "
+ "level will take effect. The default value 'false' here means "
+ "if no value set (job or cluster), the change log will not be "
+ "enabled.");

/**
* Which storage to use to store state changelog.
*
* <p>Recognized shortcut name is 'memory' from {@code
* InMemoryStateChangelogStorageFactory.getIdentifier()}, which is also the default value.
*/
@Documentation.Section(value = Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<String> STATE_CHANGE_LOG_STORAGE =
ConfigOptions.key("state.backend.changelog.storage")
.stringType()
.defaultValue("memory")
.withDescription(
Description.builder()
.text("The storage to be used to store state changelog.")
.linebreak()
.text(
"The implementation can be specified via their"
+ " shortcut name.")
.linebreak()
.text(
"The list of recognized shortcut names currently includes"
+ " 'memory' and 'filesystem'.")
.build());

/** The maximum number of completed checkpoints to retain. */
@Documentation.Section(Documentation.Sections.COMMON_STATE_BACKENDS)
public static final ConfigOption<Integer> MAX_RETAINED_CHECKPOINTS =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;

import java.time.Duration;

/** A collection of all configuration options that relate to changelog. */
@PublicEvolving
public class StateChangelogOptions {

@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
public static final ConfigOption<Duration> PERIODIC_MATERIALIZATION_INTERVAL =
ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
.durationType()
.defaultValue(Duration.ofMinutes(10))
.withDescription(
"Defines the interval in milliseconds to perform "
+ "periodic materialization for state backend.");

@Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
public static final ConfigOption<Integer> MATERIALIZATION_MAX_FAILURES_ALLOWED =
ConfigOptions.key("state.backend.changelog.max-failures-allowed")
.intType()
.defaultValue(3)
.withDescription("Max number of consecutive materialization failures allowed.");

/** Whether to enable state change log. */
@Documentation.Section(value = Documentation.Sections.STATE_BACKEND_CHANGELOG)
public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
ConfigOptions.key("state.backend.changelog.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to enable state backend to write state changes to StateChangelog. "
+ "If this config is not set explicitly, it means no preference "
+ "for enabling the change log, and the value in lower config "
+ "level will take effect. The default value 'false' here means "
+ "if no value set (job or cluster), the change log will not be "
+ "enabled.");

/**
* Which storage to use to store state changelog.
*
* <p>Recognized shortcut name is 'memory' from {@code
* InMemoryStateChangelogStorageFactory.getIdentifier()}, which is also the default value.
*/
@Documentation.Section(value = Documentation.Sections.STATE_BACKEND_CHANGELOG)
public static final ConfigOption<String> STATE_CHANGE_LOG_STORAGE =
ConfigOptions.key("state.backend.changelog.storage")
.stringType()
.defaultValue("memory")
.withDescription(
Description.builder()
.text("The storage to be used to store state changelog.")
.linebreak()
.text(
"The implementation can be specified via their"
+ " shortcut name.")
.linebreak()
.text(
"The list of recognized shortcut names currently includes"
+ " 'memory' and 'filesystem'.")
.build());
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.io.IOException;

import static org.apache.flink.changelog.fs.FsStateChangelogOptions.BASE_PATH;
import static org.apache.flink.configuration.CheckpointingOptions.STATE_CHANGE_LOG_STORAGE;
import static org.apache.flink.configuration.StateChangelogOptions.STATE_CHANGE_LOG_STORAGE;

/** {@link FsStateChangelogStorage} factory. */
@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
Expand Down Expand Up @@ -258,4 +259,10 @@ default void setAsyncOperationsThreadPool(ExecutorService executorService) {}
default ExecutorService getAsyncOperationsThreadPool() {
throw new UnsupportedOperationException();
}

default void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess) {}

default CheckpointStorageAccess getCheckpointStorageAccess() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackendFactory;
Expand Down Expand Up @@ -300,7 +301,7 @@ public static StateBackend fromApplicationOrConfigOrDefault(
TernaryBoolean.TRUE.equals(isChangelogStateBackendEnableFromApplication)
|| (TernaryBoolean.UNDEFINED.equals(
isChangelogStateBackendEnableFromApplication)
&& config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG));
&& config.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG));

StateBackend backend;
if (enableChangeLog) {
Expand Down
Loading

0 comments on commit 446bc70

Please sign in to comment.