From 31e827e54bcf643dc0593227aa6e491061102ccd Mon Sep 17 00:00:00 2001 From: "Tzu-Li (Gordon) Tai" Date: Wed, 3 Jul 2019 13:05:37 +0800 Subject: [PATCH] [FLINK-12963] [state-processor] Refactor operator state access concerns into ModifiableSavepointMetadata --- .../flink/state/api/WritableSavepoint.java | 39 +++++++------------ .../metadata/ModifiableSavepointMetadata.java | 29 +++++++++++++- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java index 300d1ae214630..0f68ccefdcb09 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WritableSavepoint.java @@ -19,8 +19,10 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.OperatorState; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.state.api.output.MergeOperatorStates; import org.apache.flink.state.api.output.SavepointOutputFormat; @@ -28,7 +30,6 @@ import org.apache.flink.util.Preconditions; import java.util.List; -import java.util.stream.Collectors; /** * Any savepoint that can be written to from a batch context. @@ -79,9 +80,10 @@ public F withOperator(String uid, BootstrapTransformation transformation) public final void write(String path) { final Path savepointPath = new Path(path); - DataSet newOperatorStates = getOperatorStates(savepointPath); + List>> newOperatorTransformations = metadata.getNewOperatorTransformations(); + DataSet newOperatorStates = writeOperatorStates(newOperatorTransformations, savepointPath); - List existingOperators = getExistingOperatorStates(); + List existingOperators = metadata.getExistingOperators(); DataSet finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators); @@ -92,27 +94,6 @@ public final void write(String path) { .name(path); } - private List getExistingOperatorStates() { - return metadata - .getOperatorStates() - .entrySet() - .stream() - .filter(entry -> entry.getValue().isLeft()) - .map(entry -> entry.getValue().left()) - .collect(Collectors.toList()); - } - - private DataSet getOperatorStates(Path savepointPath) { - return metadata - .getOperatorStates() - .entrySet() - .stream() - .filter(entry -> entry.getValue().isRight()) - .map(entry -> entry.getValue().right().writeOperatorState(entry.getKey(), stateBackend, metadata, savepointPath)) - .reduce(DataSet::union) - .orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator")); - } - private DataSet unionOperatorStates(DataSet newOperatorStates, List existingOperators) { DataSet finalOperatorStates; if (existingOperators.isEmpty()) { @@ -126,4 +107,14 @@ private DataSet unionOperatorStates(DataSet newOpe } return finalOperatorStates; } + + private DataSet writeOperatorStates( + List>> newOperatorTransformations, + Path savepointWritePath) { + return newOperatorTransformations + .stream() + .map(transformation -> transformation.f1.writeOperatorState(transformation.f0, stateBackend, metadata, savepointWritePath)) + .reduce(DataSet::union) + .orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator")); + } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java index 158fa357a2803..8df872342a3cb 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/ModifiableSavepointMetadata.java @@ -1,6 +1,7 @@ package org.apache.flink.state.api.runtime.metadata; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -11,7 +12,10 @@ import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Savepoint metadata that can be modified. @@ -61,7 +65,28 @@ public void addOperator(String uid, BootstrapTransformation transformation) { operatorStateIndex.put(id, Either.Right(transformation)); } - public Map>> getOperatorStates() { - return operatorStateIndex; + /** + * @return List of {@link OperatorState} that already exists within the savepoint. + */ + public List getExistingOperators() { + return operatorStateIndex + .values() + .stream() + .filter(Either::isLeft) + .map(Either::left) + .collect(Collectors.toList()); + } + + /** + * @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}. + */ + public List>> getNewOperatorTransformations() { + Stream>> transformations = operatorStateIndex + .entrySet() + .stream() + .filter(entry -> entry.getValue().isRight()) + .map(entry -> Tuple2.of(entry.getKey(), entry.getValue().right())); + + return transformations.collect(Collectors.toList()); } }