Skip to content

Commit

Permalink
[FLINK-12693] [state-processor] Improve Javadocs for user-facing APIs
Browse files Browse the repository at this point in the history
This commit improves Javadocs for the State Processor API that are
either outdated, or not sufficiently informative.
  • Loading branch information
tzulitai committed Jul 4, 2019
1 parent cb535d9 commit c9815a7
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,35 @@
import java.util.OptionalInt;

/**
* Bootstrapped data that can be written into a {@code Savepoint}.
* A {@code BootstrapTransformation} represents a procedure of writing new operator state into a {@code Savepoint}.
* It is defined by a {@code DataSet} containing the data to bootstrap with, a factory for a stream operator
* that consumes the elements of the {@code DataSet} and generates state to be snapshotted, as well as an optional
* key selector if the new operator state is partitioned.
*
* @see OperatorTransformation
* @see OneInputOperatorTransformation
*
* @param <T> The input type of the transformation.
*/
@PublicEvolving
@SuppressWarnings("WeakerAccess")
public class BootstrapTransformation<T> {

/** The data set containing the data to bootstrap the operator state with. */
private final DataSet<T> dataSet;

/** Factory for the {@link StreamOperator} to consume and snapshot the bootstrapping data set. */
private final SavepointWriterOperatorFactory factory;

/** Partitioner for the bootstrapping data set. Only relevant if this bootstraps partitioned state. */
@Nullable
private final HashSelector<T> keySelector;

/** Type information for the key of the bootstrapped state. Only relevant if this bootstraps partitioned state. */
@Nullable
private final TypeInformation<?> keyType;

/** Local max parallelism for the bootstrapped operator. */
private final OptionalInt operatorMaxParallelism;

BootstrapTransformation(
Expand Down Expand Up @@ -134,7 +147,7 @@ MapPartitionOperator<T, TaggedOperatorSubtaskState> writeOperatorSubtaskStates(
config = new BoundedStreamConfig(keySerializer, keySelector);
}

StreamOperator<TaggedOperatorSubtaskState> operator = factory.getOperator(
StreamOperator<TaggedOperatorSubtaskState> operator = factory.createOperator(
System.currentTimeMillis(),
savepointPath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,32 @@
import java.io.IOException;

/**
* An existing savepoint.
* An existing savepoint. This class provides the entry points for reading previous
* existing operator states in savepoints. Operator states can be removed
* from and added to the set of existing operator states, and eventually, written to
* distributed storage as a new savepoint.
*
* <p>New savepoints written using this class are based on the previous existing savepoint.
* This means that for existing operators that remain untouched, the new savepoint only contains
* a shallow copy of pointers to state data that resides in the previous existing savepoint paths.
* This means that both savepoints share state and one cannot be deleted without corrupting the other!
*
* @see WritableSavepoint
*/
@PublicEvolving
@SuppressWarnings("WeakerAccess")
public class ExistingSavepoint extends WritableSavepoint<ExistingSavepoint> {

/** The batch execution environment. Used for creating inputs for reading state. */
private final ExecutionEnvironment env;

/** The savepoint metadata, which maintains the current set of existing / newly added operator states. */
private final SavepointMetadata metadata;

/**
* The state backend that was previously used to write existing operator states in this savepoint.
* This is also the state backend that will be used when writing again this existing savepoint.
*/
private final StateBackend stateBackend;

ExistingSavepoint(ExecutionEnvironment env, SavepointMetadata metadata, StateBackend stateBackend) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@
@PublicEvolving
@SuppressWarnings("WeakerAccess")
public class KeyedOperatorTransformation<K, T> {

/** The data set containing the data to bootstrap the operator state with. */
private final DataSet<T> dataSet;

/** Local max parallelism for the bootstrapped operator. */
private final OptionalInt operatorMaxParallelism;

/** Partitioner for the bootstrapping data set. */
private final KeySelector<T, K> keySelector;

/** Type information for the key of the bootstrapped operator. */
private final TypeInformation<K> keyType;

KeyedOperatorTransformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;

/**
* A new savepoint.
* A new savepoint. Operator states can be removed from and added to the savepoint, and eventually, written to
* distributed storage as a new savepoint.
*
* @see WritableSavepoint
*/
@PublicEvolving
public class NewSavepoint extends WritableSavepoint<NewSavepoint> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@
@PublicEvolving
@SuppressWarnings("WeakerAccess")
public class OneInputOperatorTransformation<T> {

/** The data set containing the data to bootstrap the operator state with. */
private final DataSet<T> dataSet;

/** Local max parallelism for the bootstrapped operator. */
private OptionalInt operatorMaxParallelism = OptionalInt.empty();

OneInputOperatorTransformation(DataSet<T> dataSet) {
this.dataSet = dataSet;
}


/**
* Sets the maximum parallelism of this operator.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,33 @@
import org.apache.flink.api.java.DataSet;

/**
* An OperatorTransformation represents a single operator within a {@link Savepoint}.
* This class provides the entry point for building {@link BootstrapTransformation}s,
* which represents procedures to bootstrap new operator states with a given {@code DataSet}.
*
* <h2>Example usage</h2>
*
* <pre>{@code
* DataSet<StateData> stateData = ...;
*
* // to bootstrap non-keyed state:
* BootstrapTransformation<StateData> nonKeyedStateBootstrap = OperatorTransformation
* .bootstrapWith(stateData)
* .transform(new StateBootstrapFunction<StateData>() {...})
*
* // to bootstrap keyed state:
* BootstrapTransformation<StateData> keyedStateBootstrap = OperatorTransformation
* .bootstrapWith(stateData)
* .keyBy(new KeySelector<StateData, KeyType>() {...})
* .transform(new KeyedStateBootstrapFunction<KeyType, StateData>() {...})
* }</pre>
*
* <p>The code example above demonstrates how to create {@code BootstrapTransformation}s for non-keyed and keyed
* state. The built bootstrap transformations can then be registered with your {@link ExistingSavepoint} or {@link Savepoint}
* prior to writing it.
*
* @see OneInputOperatorTransformation
* @see KeyedOperatorTransformation
* @see BootstrapTransformation
*/
@PublicEvolving
@SuppressWarnings("WeakerAccess")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;

/**
* A {@link Savepoint} is a collection of operator states that can be used to supply initial state
* when starting a {@link org.apache.flink.streaming.api.datastream.DataStream} job.
* This class provides entry points for loading an existing savepoint, or a new empty savepoint.
*
* @see ExistingSavepoint
* @see NewSavepoint
*/
@PublicEvolving
public final class Savepoint {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,25 @@

package org.apache.flink.state.api;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.fs.Path;
import org.apache.flink.state.api.output.TaggedOperatorSubtaskState;
import org.apache.flink.streaming.api.operators.StreamOperator;

/**
* Creates a savepoint writing operator from a savepoint path.
*/
@PublicEvolving
@FunctionalInterface
public interface SavepointWriterOperatorFactory {
StreamOperator<TaggedOperatorSubtaskState> getOperator(long timestamp, Path savepointPath);

/**
* Creates a {@link StreamOperator} to be used for generating and snapshotting state.
*
* @param savepointTimestamp the timestamp to associate with the generated savepoint.
* @param savepointPath the path to write the savepoint to.
*
* @return a stream operator for writing the savepoint.
*/
StreamOperator<TaggedOperatorSubtaskState> createOperator(long savepointTimestamp, Path savepointPath);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,21 @@
import java.util.List;

/**
* Any savepoint that can be written to from a batch context.
* A {@code WritableSavepoint} is any savepoint that can be written to from a batch context.
* Internally, a {@link SavepointMetadata} object is maintained that keeps track of the set
* of existing operator states in the savepoint, as well as newly added operator states defined by their
* {@link BootstrapTransformation}.
*
* @param <F> The implementation type.
*/
@PublicEvolving
@SuppressWarnings("WeakerAccess")
public abstract class WritableSavepoint<F extends WritableSavepoint> {

/** The savepoint metadata, which maintains the current set of existing / newly added operator states. */
protected final SavepointMetadata metadata;

/** The state backend to use when writing this savepoint. */
protected final StateBackend stateBackend;

WritableSavepoint(SavepointMetadata metadata, StateBackend stateBackend) {
Expand Down

0 comments on commit c9815a7

Please sign in to comment.