From 3c184acf8b209f205d2c97c6acf59ec8335b5182 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Wed, 14 Oct 2020 00:00:12 +0200 Subject: [PATCH] [FLINK-19585][minicluster] Use SavepointRestoreSettings of StreamGraph while creating JobGraph. There is currently no way to start from savepoints while using MiniCluster. The cluster config cannot be used (and would be a mismatch) because it's overridden by MiniClusterPipelineExecutorServiceLoader#createConfiguration. However, the jobgraph that is being generated in MiniClusterPipelineExecutorServiceLoader#MiniClusterExecutor only uses this configuration to translate the Pipeline/StreamGraph. In production code, savepoint restore settings are usually applied last and taken from the StreamGraph. --- .../test/util/MiniClusterPipelineExecutorServiceLoader.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java index a43793b04c4bf..291753bfa81e6 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java @@ -31,8 +31,10 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterJobClient; +import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.IOException; import java.net.MalformedURLException; @@ -138,6 +140,9 @@ public CompletableFuture execute( Configuration configuration, ClassLoader userCodeClassLoader) throws Exception { final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); + if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none() && pipeline instanceof StreamGraph) { + jobGraph.setSavepointRestoreSettings(((StreamGraph) pipeline).getSavepointRestoreSettings()); + } return miniCluster.submitJob(jobGraph) .thenApply(result -> new MiniClusterJobClient( result.getJobID(),