Skip to content

Commit

Permalink
[FLINK-18312][rest] Move savepoint operation state into Dispatcherdis…
Browse files Browse the repository at this point in the history
…patcher
  • Loading branch information
Nicolaus Weidner authored and zentol committed Oct 26, 2021
1 parent f3c13fb commit fbabbb9
Show file tree
Hide file tree
Showing 18 changed files with 838 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -193,7 +195,10 @@ public Dispatcher(
this.recoveredJobs = new HashSet<>(recoveredJobs);

this.dispatcherCachedOperationsHandler =
new DispatcherCachedOperationsHandler(dispatcherServices.getOperationCaches());
new DispatcherCachedOperationsHandler(
dispatcherServices.getOperationCaches(),
this::triggerSavepointAndGetLocation,
this::stopWithSavepointAndGetLocation);
}

// ------------------------------------------------------
Expand Down Expand Up @@ -701,24 +706,46 @@ public CompletableFuture<String> triggerCheckpoint(JobID jobID, Time timeout) {
}

@Override
public CompletableFuture<String> triggerSavepointAndGetLocation(
final JobID jobId,
public CompletableFuture<Acknowledge> triggerSavepoint(
final AsynchronousJobOperationKey operationKey,
final String targetDirectory,
final TriggerSavepointMode savepointMode,
final Time timeout) {
return dispatcherCachedOperationsHandler.triggerSavepoint(
operationKey, targetDirectory, savepointMode, timeout);
}

@Override
public CompletableFuture<String> triggerSavepointAndGetLocation(
JobID jobId, String targetDirectory, TriggerSavepointMode savepointMode, Time timeout) {
return performOperationOnJobMasterGateway(
jobId,
gateway ->
gateway.triggerSavepoint(
targetDirectory, savepointMode.isTerminalMode(), timeout));
}

@Override
public CompletableFuture<OperationResult<String>> getTriggeredSavepointStatus(
AsynchronousJobOperationKey operationKey) {
return dispatcherCachedOperationsHandler.getSavepointStatus(operationKey);
}

@Override
public CompletableFuture<Acknowledge> stopWithSavepoint(
AsynchronousJobOperationKey operationKey,
String targetDirectory,
TriggerSavepointMode savepointMode,
final Time timeout) {
return dispatcherCachedOperationsHandler.stopWithSavepoint(
operationKey, targetDirectory, savepointMode, timeout);
}

@Override
public CompletableFuture<String> stopWithSavepointAndGetLocation(
final JobID jobId,
final String targetDirectory,
TriggerSavepointMode savepointMode,
final TriggerSavepointMode savepointMode,
final Time timeout) {
return performOperationOnJobMasterGateway(
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,109 @@

package org.apache.flink.runtime.dispatcher;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.async.OperationResultStatus;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;
import org.apache.flink.util.concurrent.FutureUtils;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
* A handler for async operations triggered by the {@link Dispatcher} whose keys and results are
* cached.
*/
public class DispatcherCachedOperationsHandler {
DispatcherCachedOperationsHandler(DispatcherOperationCaches operationCaches) {}

private final CompletedOperationCache<AsynchronousJobOperationKey, String>
savepointTriggerCache;

private final TriggerSavepointFunction triggerSavepointFunction;

private final TriggerSavepointFunction stopWithSavepointFunction;

DispatcherCachedOperationsHandler(
DispatcherOperationCaches operationCaches,
TriggerSavepointFunction triggerSavepointFunction,
TriggerSavepointFunction stopWithSavepointFunction) {
this(
triggerSavepointFunction,
stopWithSavepointFunction,
operationCaches.getSavepointTriggerCache());
}

@VisibleForTesting
DispatcherCachedOperationsHandler(
TriggerSavepointFunction triggerSavepointFunction,
TriggerSavepointFunction stopWithSavepointFunction,
CompletedOperationCache<AsynchronousJobOperationKey, String> savepointTriggerCache) {
this.triggerSavepointFunction = triggerSavepointFunction;
this.stopWithSavepointFunction = stopWithSavepointFunction;
this.savepointTriggerCache = savepointTriggerCache;
}

public CompletableFuture<Acknowledge> triggerSavepoint(
AsynchronousJobOperationKey operationKey,
String targetDirectory,
TriggerSavepointMode savepointMode,
Time timeout) {
return registerOperationIdempotently(
operationKey,
() ->
triggerSavepointFunction.apply(
operationKey.getJobId(), targetDirectory, savepointMode, timeout));
}

public CompletableFuture<Acknowledge> stopWithSavepoint(
AsynchronousJobOperationKey operationKey,
String targetDirectory,
TriggerSavepointMode savepointMode,
Time timeout) {
return registerOperationIdempotently(
operationKey,
() ->
stopWithSavepointFunction.apply(
operationKey.getJobId(), targetDirectory, savepointMode, timeout));
}

public CompletableFuture<OperationResult<String>> getSavepointStatus(
AsynchronousJobOperationKey operationKey) {
return savepointTriggerCache
.get(operationKey)
.map(CompletableFuture::completedFuture)
.orElse(
FutureUtils.completedExceptionally(
new UnknownOperationKeyException(operationKey)));
}

private CompletableFuture<Acknowledge> registerOperationIdempotently(
AsynchronousJobOperationKey operationKey,
Supplier<CompletableFuture<String>> operation) {
Optional<OperationResult<String>> resultOptional = savepointTriggerCache.get(operationKey);
if (resultOptional.isPresent()) {
return convertToFuture(resultOptional.get());
}

savepointTriggerCache.registerOngoingOperation(operationKey, operation.get());

return savepointTriggerCache
.get(operationKey)
.map(DispatcherCachedOperationsHandler::convertToFuture)
// This shouldn't happen as we just registered the operation. We assume it is a
// temporary issue with the cache
.orElse(CompletableFuture.completedFuture(Acknowledge.get()));
}

private static CompletableFuture<Acknowledge> convertToFuture(OperationResult<String> result) {
if (result.getStatus() == OperationResultStatus.FAILURE) {
return FutureUtils.completedExceptionally(
new OperationAlreadyFailedException(result.getThrowable()));
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,40 @@ default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applica
default CompletableFuture<String> triggerCheckpoint(JobID jobID, @RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}

/**
* Triggers a savepoint with the given savepoint directory as a target, returning a future that
* completes with the savepoint location when it is complete.
*
* @param jobId the job id
* @param targetDirectory Target directory for the savepoint.
* @param savepointMode context of the savepoint operation
* @param timeout Timeout for the asynchronous operation
* @return Future which is completed once the operation is triggered successfully
*/
default CompletableFuture<String> triggerSavepointAndGetLocation(
JobID jobId,
String targetDirectory,
TriggerSavepointMode savepointMode,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}

/**
* Stops the job with a savepoint, returning a future that completes with the savepoint location
* when the savepoint is completed.
*
* @param jobId the job id
* @param targetDirectory Target directory for the savepoint.
* @param savepointMode context of the savepoint operation
* @param timeout for the rpc call
* @return Future which is completed with the savepoint location once it is completed
*/
default CompletableFuture<String> stopWithSavepointAndGetLocation(
JobID jobId,
String targetDirectory,
TriggerSavepointMode savepointMode,
@RpcTimeout final Time timeout) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@

package org.apache.flink.runtime.dispatcher;

import org.apache.flink.runtime.rest.handler.async.CompletedOperationCache;
import org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey;

import java.util.concurrent.CompletableFuture;

/**
* Encapsulates caches for results of asynchronous operations triggered by the {@link Dispatcher}.
*/
public class DispatcherOperationCaches {
private final CompletedOperationCache<AsynchronousJobOperationKey, String>
savepointTriggerCache = new CompletedOperationCache<>();

public CompletedOperationCache<AsynchronousJobOperationKey, String> getSavepointTriggerCache() {
return savepointTriggerCache;
}

public CompletableFuture<Void> shutdownCaches() {
return CompletableFuture.completedFuture(null);
return savepointTriggerCache.closeAsync();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.dispatcher;

/** Exception indicating that a requested operation already exists and has failed. */
public class OperationAlreadyFailedException extends DispatcherException {
public OperationAlreadyFailedException(Throwable throwable) {
super(throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.dispatcher;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.util.function.QuadFunction;

import java.util.concurrent.CompletableFuture;

/**
* Wrapper interface for functions triggering savepoints. Currently only serves to shorten
* signatures.
*/
public interface TriggerSavepointFunction
extends QuadFunction<
JobID, String, TriggerSavepointMode, Time, CompletableFuture<String>> {}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.async;
package org.apache.flink.runtime.dispatcher;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.TriggerId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* operations will be removed from the cache automatically after a fixed timeout.
*/
@ThreadSafe
class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseableAsync {
public class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseableAsync {

private static final long COMPLETED_OPERATION_RESULT_CACHE_DURATION_SECONDS = 300L;

Expand All @@ -71,7 +71,7 @@ class CompletedOperationCache<K extends OperationKey, R> implements AutoCloseabl

@Nullable private CompletableFuture<Void> terminationFuture;

CompletedOperationCache() {
public CompletedOperationCache() {
this(Ticker.systemTicker());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public static AsynchronousJobOperationKey of(final TriggerId triggerId, final Jo
return new AsynchronousJobOperationKey(triggerId, jobId);
}

/**
* Get the job id for the given operation key.
*
* @return job id
*/
public JobID getJobId() {
return jobId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -70,4 +79,15 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(super.hashCode(), jobId);
}

@Override
public String toString() {
return getClass().getSimpleName()
+ "{"
+ "triggerId="
+ getTriggerId()
+ ", jobId="
+ jobId
+ '}';
}
}
Loading

0 comments on commit fbabbb9

Please sign in to comment.