Skip to content

Commit

Permalink
[FLINK-12963] [state-processor] Refactor operator state access concer…
Browse files Browse the repository at this point in the history
…ns into ModifiableSavepointMetadata
  • Loading branch information
tzulitai committed Jul 4, 2019
1 parent 03bb097 commit 31e827e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.output.MergeOperatorStates;
import org.apache.flink.state.api.output.SavepointOutputFormat;
import org.apache.flink.state.api.runtime.metadata.ModifiableSavepointMetadata;
import org.apache.flink.util.Preconditions;

import java.util.List;
import java.util.stream.Collectors;

/**
* Any savepoint that can be written to from a batch context.
Expand Down Expand Up @@ -79,9 +80,10 @@ public <T> F withOperator(String uid, BootstrapTransformation<T> transformation)
public final void write(String path) {
final Path savepointPath = new Path(path);

DataSet<OperatorState> newOperatorStates = getOperatorStates(savepointPath);
List<Tuple2<OperatorID, BootstrapTransformation<?>>> newOperatorTransformations = metadata.getNewOperatorTransformations();
DataSet<OperatorState> newOperatorStates = writeOperatorStates(newOperatorTransformations, savepointPath);

List<OperatorState> existingOperators = getExistingOperatorStates();
List<OperatorState> existingOperators = metadata.getExistingOperators();

DataSet<OperatorState> finalOperatorStates = unionOperatorStates(newOperatorStates, existingOperators);

Expand All @@ -92,27 +94,6 @@ public final void write(String path) {
.name(path);
}

private List<OperatorState> getExistingOperatorStates() {
return metadata
.getOperatorStates()
.entrySet()
.stream()
.filter(entry -> entry.getValue().isLeft())
.map(entry -> entry.getValue().left())
.collect(Collectors.toList());
}

private DataSet<OperatorState> getOperatorStates(Path savepointPath) {
return metadata
.getOperatorStates()
.entrySet()
.stream()
.filter(entry -> entry.getValue().isRight())
.map(entry -> entry.getValue().right().writeOperatorState(entry.getKey(), stateBackend, metadata, savepointPath))
.reduce(DataSet::union)
.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
}

private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOperatorStates, List<OperatorState> existingOperators) {
DataSet<OperatorState> finalOperatorStates;
if (existingOperators.isEmpty()) {
Expand All @@ -126,4 +107,14 @@ private DataSet<OperatorState> unionOperatorStates(DataSet<OperatorState> newOpe
}
return finalOperatorStates;
}

private DataSet<OperatorState> writeOperatorStates(
List<Tuple2<OperatorID, BootstrapTransformation<?>>> newOperatorTransformations,
Path savepointWritePath) {
return newOperatorTransformations
.stream()
.map(transformation -> transformation.f1.writeOperatorState(transformation.f0, stateBackend, metadata, savepointWritePath))
.reduce(DataSet::union)
.orElseThrow(() -> new IllegalStateException("Savepoint's must contain at least one operator"));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.flink.state.api.runtime.metadata;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand All @@ -11,7 +12,10 @@
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Savepoint metadata that can be modified.
Expand Down Expand Up @@ -61,7 +65,28 @@ public void addOperator(String uid, BootstrapTransformation<?> transformation) {
operatorStateIndex.put(id, Either.Right(transformation));
}

public Map<OperatorID, Either<OperatorState, BootstrapTransformation<?>>> getOperatorStates() {
return operatorStateIndex;
/**
* @return List of {@link OperatorState} that already exists within the savepoint.
*/
public List<OperatorState> getExistingOperators() {
return operatorStateIndex
.values()
.stream()
.filter(Either::isLeft)
.map(Either::left)
.collect(Collectors.toList());
}

/**
* @return List of new operator states for the savepoint, represented by their target {@link OperatorID} and {@link BootstrapTransformation}.
*/
public List<Tuple2<OperatorID, BootstrapTransformation<?>>> getNewOperatorTransformations() {
Stream<Tuple2<OperatorID, BootstrapTransformation<?>>> transformations = operatorStateIndex
.entrySet()
.stream()
.filter(entry -> entry.getValue().isRight())
.map(entry -> Tuple2.of(entry.getKey(), entry.getValue().right()));

return transformations.collect(Collectors.toList());
}
}

0 comments on commit 31e827e

Please sign in to comment.