Skip to content

Commit

Permalink
[FLINK-19585][minicluster] Use SavepointRestoreSettings of StreamGrap…
Browse files Browse the repository at this point in the history
…h while creating JobGraph.

There is currently no way to start from savepoints while using MiniCluster. The cluster config cannot be used (and would be a mismatch) because it's overridden by MiniClusterPipelineExecutorServiceLoader#createConfiguration.
However, the jobgraph that is being generated in MiniClusterPipelineExecutorServiceLoader#MiniClusterExecutor only uses this configuration to translate the Pipeline/StreamGraph. In production code, savepoint restore settings are usually applied last and taken from the StreamGraph.
  • Loading branch information
Arvid Heise authored and AHeise committed Oct 16, 2020
1 parent 2ff3b77 commit 3c184ac
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.io.IOException;
import java.net.MalformedURLException;
Expand Down Expand Up @@ -138,6 +140,9 @@ public CompletableFuture<JobClient> execute(
Configuration configuration,
ClassLoader userCodeClassLoader) throws Exception {
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none() && pipeline instanceof StreamGraph) {
jobGraph.setSavepointRestoreSettings(((StreamGraph) pipeline).getSavepointRestoreSettings());
}
return miniCluster.submitJob(jobGraph)
.thenApply(result -> new MiniClusterJobClient(
result.getJobID(),
Expand Down

0 comments on commit 3c184ac

Please sign in to comment.