diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java index efae292f199fc..c2b64245419d4 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/EmbeddedJobClient.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; @@ -93,12 +94,18 @@ public CompletableFuture cancel() { public CompletableFuture stopWithSavepoint( final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory) { return dispatcherGateway.stopWithSavepoint( - jobId, savepointDirectory, advanceToEndOfEventTime, timeout); + jobId, + savepointDirectory, + advanceToEndOfEventTime + ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT + : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, + timeout); } @Override public CompletableFuture triggerSavepoint(@Nullable final String savepointDirectory) { - return dispatcherGateway.triggerSavepoint(jobId, savepointDirectory, false, timeout); + return dispatcherGateway.triggerSavepoint( + jobId, savepointDirectory, TriggerSavepointMode.SAVEPOINT, timeout); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index aac1b7bfac8a8..6079d1e57e2ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -699,21 +699,27 @@ public CompletableFuture triggerCheckpoint(JobID jobID, Time timeout) { public CompletableFuture triggerSavepoint( final JobID jobId, final String targetDirectory, - final boolean cancelJob, + final TriggerSavepointMode savepointMode, final Time timeout) { return performOperationOnJobMasterGateway( - jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout)); + jobId, + gateway -> + gateway.triggerSavepoint( + targetDirectory, savepointMode.isTerminalMode(), timeout)); } @Override public CompletableFuture stopWithSavepoint( final JobID jobId, final String targetDirectory, - final boolean terminate, + TriggerSavepointMode savepointMode, final Time timeout) { return performOperationOnJobMasterGateway( - jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout)); + jobId, + gateway -> + gateway.stopWithSavepoint( + targetDirectory, savepointMode.isTerminalMode(), timeout)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointMode.java new file mode 100644 index 0000000000000..6cd97eba201e0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/TriggerSavepointMode.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.dispatcher; + +/** + * Describes the context of taking a savepoint: Whether it is a savepoint for a running job or + * whether the job is cancelled, suspended or terminated with a savepoint. + */ +public enum TriggerSavepointMode { + SAVEPOINT, + CANCEL_WITH_SAVEPOINT, + SUSPEND_WITH_SAVEPOINT, + TERMINATE_WITH_SAVEPOINT; + + /** Whether the operation will result in a globally terminal job status. */ + public boolean isTerminalMode() { + return this == CANCEL_WITH_SAVEPOINT || this == TERMINATE_WITH_SAVEPOINT; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 6272198376787..2fbc70d445a08 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore; +import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory; @@ -741,7 +742,12 @@ public CompletableFuture triggerSavepoint( return runDispatcherCommand( dispatcherGateway -> dispatcherGateway.triggerSavepoint( - jobId, targetDirectory, cancelJob, rpcTimeout)); + jobId, + targetDirectory, + cancelJob + ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT + : TriggerSavepointMode.SAVEPOINT, + rpcTimeout)); } public CompletableFuture triggerCheckpoint(JobID jobID) { @@ -754,7 +760,12 @@ public CompletableFuture stopWithSavepoint( return runDispatcherCommand( dispatcherGateway -> dispatcherGateway.stopWithSavepoint( - jobId, targetDirectory, terminate, rpcTimeout)); + jobId, + targetDirectory, + terminate + ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT + : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, + rpcTimeout)); } public CompletableFuture disposeSavepoint(String savepointPath) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java index 76fbd2af0fe05..8e3e9729eaddb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers; @@ -164,13 +165,16 @@ protected CompletableFuture triggerOperation( HttpResponseStatus.BAD_REQUEST); } - final boolean shouldDrain = request.getRequestBody().shouldDrain(); + final TriggerSavepointMode savepointMode = + request.getRequestBody().shouldDrain() + ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT + : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT; final String targetDirectory = requestedTargetDirectory != null ? requestedTargetDirectory : defaultSavepointDir; return gateway.stopWithSavepoint( - jobId, targetDirectory, shouldDrain, RpcUtils.INF_TIMEOUT); + jobId, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT); } } @@ -200,13 +204,16 @@ protected CompletableFuture triggerOperation( HttpResponseStatus.BAD_REQUEST); } - final boolean cancelJob = request.getRequestBody().isCancelJob(); + final TriggerSavepointMode savepointMode = + request.getRequestBody().isCancelJob() + ? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT + : TriggerSavepointMode.SAVEPOINT; final String targetDirectory = requestedTargetDirectory != null ? requestedTargetDirectory : defaultSavepointDir; return gateway.triggerSavepoint( - jobId, targetDirectory, cancelJob, RpcUtils.INF_TIMEOUT); + jobId, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index 5c4e690e337cc..34392801ef781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; @@ -136,12 +137,16 @@ CompletableFuture> requestMetricQueryServiceAddresses( * * @param jobId ID of the job for which the savepoint should be triggered. * @param targetDirectory Target directory for the savepoint. + * @param savepointMode context of the savepoint operation * @param timeout Timeout for the asynchronous operation * @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of * the savepoint. */ default CompletableFuture triggerSavepoint( - JobID jobId, String targetDirectory, boolean cancelJob, @RpcTimeout Time timeout) { + JobID jobId, + String targetDirectory, + TriggerSavepointMode savepointMode, + @RpcTimeout Time timeout) { throw new UnsupportedOperationException(); } @@ -151,14 +156,14 @@ default CompletableFuture triggerSavepoint( * @param jobId ID of the job for which the savepoint should be triggered. * @param targetDirectory to which to write the savepoint data or null if the default savepoint * directory should be used - * @param terminate flag indicating if the job should terminate or just suspend + * @param savepointMode context of the savepoint operation * @param timeout for the rpc call * @return Future which is completed with the savepoint path once completed */ default CompletableFuture stopWithSavepoint( final JobID jobId, final String targetDirectory, - final boolean terminate, + final TriggerSavepointMode savepointMode, @RpcTimeout final Time timeout) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index a2887c4a015ee..2cb246d6d0c2e 100755 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -330,7 +330,8 @@ public void testInvalidCallDuringInitialization() throws Exception { // this call is supposed to fail try { dispatcherGateway - .triggerSavepoint(jobId, "file:///tmp/savepoint", false, TIMEOUT) + .triggerSavepoint( + jobId, "file:///tmp/savepoint", TriggerSavepointMode.SAVEPOINT, TIMEOUT) .get(); fail("Previous statement should have failed"); } catch (ExecutionException t) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java index 551e533a890d7..00079e0c2bf6f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.dispatcher.TriggerSavepointMode; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobmaster.JobResult; @@ -256,13 +257,13 @@ public CompletableFuture> requestMetricQueryServiceAddresses( @Override public CompletableFuture triggerSavepoint( - JobID jobId, String targetDirectory, boolean cancelJob, Time timeout) { + JobID jobId, String targetDirectory, TriggerSavepointMode savepointMode, Time timeout) { return triggerSavepointFunction.apply(jobId, targetDirectory); } @Override public CompletableFuture stopWithSavepoint( - JobID jobId, String targetDirectory, boolean terminate, Time timeout) { + JobID jobId, String targetDirectory, TriggerSavepointMode savepointMode, Time timeout) { return stopWithSavepointFunction.apply(jobId, targetDirectory); }