diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java index a43793b04c4bf..291753bfa81e6 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java @@ -31,8 +31,10 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.minicluster.MiniCluster; import org.apache.flink.runtime.minicluster.MiniClusterJobClient; +import org.apache.flink.streaming.api.graph.StreamGraph; import java.io.IOException; import java.net.MalformedURLException; @@ -138,6 +140,9 @@ public CompletableFuture execute( Configuration configuration, ClassLoader userCodeClassLoader) throws Exception { final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); + if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none() && pipeline instanceof StreamGraph) { + jobGraph.setSavepointRestoreSettings(((StreamGraph) pipeline).getSavepointRestoreSettings()); + } return miniCluster.submitJob(jobGraph) .thenApply(result -> new MiniClusterJobClient( result.getJobID(),