Skip to content

Commit

Permalink
[FLINK-18312][rest] Introduce TriggerSavepointMode enum
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolaus Weidner authored and zentol committed Oct 26, 2021
1 parent e759f19 commit a126f45
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
Expand Down Expand Up @@ -93,12 +94,18 @@ public CompletableFuture<Void> cancel() {
public CompletableFuture<String> stopWithSavepoint(
final boolean advanceToEndOfEventTime, @Nullable final String savepointDirectory) {
return dispatcherGateway.stopWithSavepoint(
jobId, savepointDirectory, advanceToEndOfEventTime, timeout);
jobId,
savepointDirectory,
advanceToEndOfEventTime
? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT
: TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT,
timeout);
}

@Override
public CompletableFuture<String> triggerSavepoint(@Nullable final String savepointDirectory) {
return dispatcherGateway.triggerSavepoint(jobId, savepointDirectory, false, timeout);
return dispatcherGateway.triggerSavepoint(
jobId, savepointDirectory, TriggerSavepointMode.SAVEPOINT, timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,21 +699,27 @@ public CompletableFuture<String> triggerCheckpoint(JobID jobID, Time timeout) {
public CompletableFuture<String> triggerSavepoint(
final JobID jobId,
final String targetDirectory,
final boolean cancelJob,
final TriggerSavepointMode savepointMode,
final Time timeout) {

return performOperationOnJobMasterGateway(
jobId, gateway -> gateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
jobId,
gateway ->
gateway.triggerSavepoint(
targetDirectory, savepointMode.isTerminalMode(), timeout));
}

@Override
public CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final String targetDirectory,
final boolean terminate,
TriggerSavepointMode savepointMode,
final Time timeout) {
return performOperationOnJobMasterGateway(
jobId, gateway -> gateway.stopWithSavepoint(targetDirectory, terminate, timeout));
jobId,
gateway ->
gateway.stopWithSavepoint(
targetDirectory, savepointMode.isTerminalMode(), timeout));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.dispatcher;

/**
* Describes the context of taking a savepoint: Whether it is a savepoint for a running job or
* whether the job is cancelled, suspended or terminated with a savepoint.
*/
public enum TriggerSavepointMode {
SAVEPOINT,
CANCEL_WITH_SAVEPOINT,
SUSPEND_WITH_SAVEPOINT,
TERMINATE_WITH_SAVEPOINT;

/** Whether the operation will result in a globally terminal job status. */
public boolean isTerminalMode() {
return this == CANCEL_WITH_SAVEPOINT || this == TERMINATE_WITH_SAVEPOINT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
Expand Down Expand Up @@ -741,7 +742,12 @@ public CompletableFuture<String> triggerSavepoint(
return runDispatcherCommand(
dispatcherGateway ->
dispatcherGateway.triggerSavepoint(
jobId, targetDirectory, cancelJob, rpcTimeout));
jobId,
targetDirectory,
cancelJob
? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT
: TriggerSavepointMode.SAVEPOINT,
rpcTimeout));
}

public CompletableFuture<String> triggerCheckpoint(JobID jobID) {
Expand All @@ -754,7 +760,12 @@ public CompletableFuture<String> stopWithSavepoint(
return runDispatcherCommand(
dispatcherGateway ->
dispatcherGateway.stopWithSavepoint(
jobId, targetDirectory, terminate, rpcTimeout));
jobId,
targetDirectory,
terminate
? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT
: TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT,
rpcTimeout));
}

public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers;
Expand Down Expand Up @@ -164,13 +165,16 @@ protected CompletableFuture<String> triggerOperation(
HttpResponseStatus.BAD_REQUEST);
}

final boolean shouldDrain = request.getRequestBody().shouldDrain();
final TriggerSavepointMode savepointMode =
request.getRequestBody().shouldDrain()
? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT
: TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT;
final String targetDirectory =
requestedTargetDirectory != null
? requestedTargetDirectory
: defaultSavepointDir;
return gateway.stopWithSavepoint(
jobId, targetDirectory, shouldDrain, RpcUtils.INF_TIMEOUT);
jobId, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT);
}
}

Expand Down Expand Up @@ -200,13 +204,16 @@ protected CompletableFuture<String> triggerOperation(
HttpResponseStatus.BAD_REQUEST);
}

final boolean cancelJob = request.getRequestBody().isCancelJob();
final TriggerSavepointMode savepointMode =
request.getRequestBody().isCancelJob()
? TriggerSavepointMode.CANCEL_WITH_SAVEPOINT
: TriggerSavepointMode.SAVEPOINT;
final String targetDirectory =
requestedTargetDirectory != null
? requestedTargetDirectory
: defaultSavepointDir;
return gateway.triggerSavepoint(
jobId, targetDirectory, cancelJob, RpcUtils.INF_TIMEOUT);
jobId, targetDirectory, savepointMode, RpcUtils.INF_TIMEOUT);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
Expand Down Expand Up @@ -136,12 +137,16 @@ CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(
*
* @param jobId ID of the job for which the savepoint should be triggered.
* @param targetDirectory Target directory for the savepoint.
* @param savepointMode context of the savepoint operation
* @param timeout Timeout for the asynchronous operation
* @return A future to the {@link CompletedCheckpoint#getExternalPointer() external pointer} of
* the savepoint.
*/
default CompletableFuture<String> triggerSavepoint(
JobID jobId, String targetDirectory, boolean cancelJob, @RpcTimeout Time timeout) {
JobID jobId,
String targetDirectory,
TriggerSavepointMode savepointMode,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}

Expand All @@ -151,14 +156,14 @@ default CompletableFuture<String> triggerSavepoint(
* @param jobId ID of the job for which the savepoint should be triggered.
* @param targetDirectory to which to write the savepoint data or null if the default savepoint
* directory should be used
* @param terminate flag indicating if the job should terminate or just suspend
* @param savepointMode context of the savepoint operation
* @param timeout for the rpc call
* @return Future which is completed with the savepoint path once completed
*/
default CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final String targetDirectory,
final boolean terminate,
final TriggerSavepointMode savepointMode,
@RpcTimeout final Time timeout) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ public void testInvalidCallDuringInitialization() throws Exception {
// this call is supposed to fail
try {
dispatcherGateway
.triggerSavepoint(jobId, "file:https:///tmp/savepoint", false, TIMEOUT)
.triggerSavepoint(
jobId, "file:https:///tmp/savepoint", TriggerSavepointMode.SAVEPOINT, TIMEOUT)
.get();
fail("Previous statement should have failed");
} catch (ExecutionException t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
Expand Down Expand Up @@ -256,13 +257,13 @@ public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(

@Override
public CompletableFuture<String> triggerSavepoint(
JobID jobId, String targetDirectory, boolean cancelJob, Time timeout) {
JobID jobId, String targetDirectory, TriggerSavepointMode savepointMode, Time timeout) {
return triggerSavepointFunction.apply(jobId, targetDirectory);
}

@Override
public CompletableFuture<String> stopWithSavepoint(
JobID jobId, String targetDirectory, boolean terminate, Time timeout) {
JobID jobId, String targetDirectory, TriggerSavepointMode savepointMode, Time timeout) {
return stopWithSavepointFunction.apply(jobId, targetDirectory);
}

Expand Down

0 comments on commit a126f45

Please sign in to comment.