Skip to content

Commit

Permalink
[FLINK-11669][checkpointing] Wire the Suspend/Terminate to the JM/TM.
Browse files Browse the repository at this point in the history
Wires the functionality introduced in FLINK-11667 and FLINK-11668
with Flink's distributed RPC mechanism. This is a pre-requisite
in order to expose the "suspend" and "terminate"/"drain" functionality
to the user.
  • Loading branch information
kl0u committed Apr 17, 2019
1 parent 093ddc1 commit 1b5a251
Show file tree
Hide file tree
Showing 22 changed files with 657 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,49 @@ public boolean isShutdown() {
* configured
*/
public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
long timestamp,
@Nullable String targetLocation) {
final long timestamp,
@Nullable final String targetLocation) {

CheckpointProperties props = CheckpointProperties.forSavepoint();
final CheckpointProperties properties = CheckpointProperties.forSavepoint();
return triggerSavepointInternal(timestamp, properties, false, targetLocation);
}

/**
* Triggers a synchronous savepoint with the given savepoint directory as a target.
*
* @param timestamp The timestamp for the savepoint.
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers.
* @param targetLocation Target location for the savepoint, optional. If null, the
* state backend's configured default will be used.
* @return A future to the completed checkpoint
* @throws IllegalStateException If no savepoint directory has been
* specified and no default savepoint directory has been
* configured
*/
public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
final long timestamp,
final boolean advanceToEndOfEventTime,
@Nullable final String targetLocation) {

final CheckpointProperties properties = CheckpointProperties.forSyncSavepoint();
return triggerSavepointInternal(timestamp, properties, advanceToEndOfEventTime, targetLocation);
}

private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
final long timestamp,
final CheckpointProperties checkpointProperties,
final boolean advanceToEndOfEventTime,
@Nullable final String targetLocation) {

checkNotNull(checkpointProperties);

CheckpointTriggerResult triggerResult = triggerCheckpoint(
timestamp,
props,
targetLocation,
false);
timestamp,
checkpointProperties,
targetLocation,
false,
advanceToEndOfEventTime);

if (triggerResult.isSuccess()) {
return triggerResult.getPendingCheckpoint().getCompletionFuture();
Expand All @@ -398,15 +431,20 @@ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
* @return <code>true</code> if triggering the checkpoint succeeded.
*/
public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false).isSuccess();
}

@VisibleForTesting
public CheckpointTriggerResult triggerCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
boolean isPeriodic,
boolean advanceToEndOfTime) {

if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}

// make some eager pre-checks
synchronized (lock) {
Expand Down Expand Up @@ -631,7 +669,11 @@ else if (!props.forceCheckpoint()) {

// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}

numUnsuccessfulCheckpointsTriggers.set(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ public boolean isSavepoint() {
return checkpointType.isSavepoint();
}

/**
* Returns whether the checkpoint properties describe a synchronous savepoint/checkpoint.
*
* @return <code>true</code> if the properties describe a synchronous operation, <code>false</code> otherwise.
*/
public boolean isSynchronous() {
return checkpointType.isSynchronous();
}

// ------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,19 @@ public CompletableFuture<String> triggerSavepoint(
jobMasterGateway.triggerSavepoint(targetDirectory, cancelJob, timeout));
}

@Override
public CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final String targetDirectory,
final boolean advanceToEndOfEventTime,
final Time timeout) {
final CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getJobMasterGatewayFuture(jobId);

return jobMasterGatewayFuture.thenCompose(
(JobMasterGateway jobMasterGateway) ->
jobMasterGateway.stopWithSavepoint(targetDirectory, advanceToEndOfEventTime, timeout));
}

@Override
public CompletableFuture<Acknowledge> shutDownCluster() {
closeAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -965,15 +966,37 @@ public void notifyCheckpointComplete(long checkpointId, long timestamp) {
* @param checkpointOptions of the checkpoint to trigger
*/
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, false);
}

/**
* Trigger a new checkpoint on the task of this execution.
*
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers
*/
public void triggerSynchronousSavepoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
}

private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}

final LogicalSlot slot = assignedResource;

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ public void triggerCheckpoint(
JobID jobId,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions) {
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {

// we ignore the `advanceToEndOfEventTime` because this is dead code.

Preconditions.checkNotNull(executionAttemptID);
Preconditions.checkNotNull(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,16 @@ void notifyCheckpointComplete(
* @param checkpointId of the checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers
*/
void triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions);
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime);

/**
* Frees the slot with the given allocation ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,64 @@ public CompletableFuture<String> triggerSavepoint(
}, getMainThreadExecutor());
}

@Override
public CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory,
final boolean advanceToEndOfEventTime,
final Time timeout) {

final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();

if (checkpointCoordinator == null) {
return FutureUtils.completedExceptionally(new IllegalStateException(
String.format("Job %s is not a streaming job.", jobGraph.getJobID())));
}

if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

return FutureUtils.completedExceptionally(new IllegalStateException(
"No savepoint directory configured. You can either specify a directory " +
"while cancelling via -s :targetDirectory or configure a cluster-wide " +
"default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
}

final long now = System.currentTimeMillis();

// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();

final CompletableFuture<String> savepointFuture = checkpointCoordinator
.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory)
.handleAsync((completedCheckpoint, throwable) -> {
if (throwable != null) {
log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
throw new CompletionException(throwable);
}
return completedCheckpoint.getExternalPointer();
}, getMainThreadExecutor());

final CompletableFuture<JobStatus> terminationFuture = executionGraph
.getTerminationFuture()
.handleAsync((jobstatus, throwable) -> {

if (throwable != null) {
log.info("Failed during stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(), throwable.getMessage());
throw new CompletionException(throwable);
} else if(jobstatus != JobStatus.FINISHED) {
log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", jobGraph.getJobID(), jobstatus);
throw new CompletionException(new FlinkException("Reached state " + jobstatus + " instead of FINISHED."));
}
return jobstatus;
}, getMainThreadExecutor());

return savepointFuture.thenCompose((path) ->
terminationFuture.thenApply((jobStatus -> path)));
}

private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) {
if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,21 @@ CompletableFuture<String> triggerSavepoint(
final boolean cancelJob,
@RpcTimeout final Time timeout);

/**
* Stops the job with a savepoint.
*
* @param targetDirectory to which to write the savepoint data or null if the
* default savepoint directory should be used
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers
* @param timeout for the rpc call
* @return Future which is completed with the savepoint path once completed
*/
CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory,
final boolean advanceToEndOfEventTime,
@RpcTimeout final Time timeout);

/**
* Requests the statistics on operator back pressure.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ public void notifyCheckpointComplete(ExecutionAttemptID executionAttemptID, JobI
}

@Override
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
checkpointId,
timestamp,
checkpointOptions);
checkpointOptions,
advanceToEndOfEventTime);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,10 @@ public CompletableFuture<String> triggerSavepoint(JobID jobId, String targetDire
return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.triggerSavepoint(jobId, targetDirectory, cancelJob, rpcTimeout));
}

public CompletableFuture<String> stopWithSavepoint(JobID jobId, String targetDirectory, boolean advanceToEndOfTime) {
return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.stopWithSavepoint(jobId, targetDirectory, advanceToEndOfTime, rpcTimeout));
}

public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) {
return runDispatcherCommand(dispatcherGateway -> dispatcherGateway.disposeSavepoint(savepointPath, rpcTimeout));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.blob.TransientBlobCache;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -703,13 +704,19 @@ public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointId,
long checkpointTimestamp,
CheckpointOptions checkpointOptions) {
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime) {
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
}

final Task task = taskSlotTable.getTask(executionAttemptID);

if (task != null) {
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, false);
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

return CompletableFuture.completedFuture(Acknowledge.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,16 @@ CompletableFuture<Acknowledge> updatePartitions(
* @param checkpointID unique id for the checkpoint
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
* @param checkpointOptions for performing the checkpoint
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers
* @return Future acknowledge if the checkpoint has been successfully triggered
*/
CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
long checkpointID,
long checkpointTimestamp,
CheckpointOptions checkpointOptions,
boolean advanceToEndOfEventTime);

/**
* Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ default CompletableFuture<String> triggerSavepoint(
throw new UnsupportedOperationException();
}

/**
* Stops the job with a savepoint.
*
* @param jobId ID of the job for which the savepoint should be triggered.
* @param targetDirectory to which to write the savepoint data or null if the
* default savepoint directory should be used
* @param advanceToEndOfEventTime Flag indicating if the source should inject a {@code MAX_WATERMARK} in the pipeline
* to fire any registered event-time timers
* @param timeout for the rpc call
* @return Future which is completed with the savepoint path once completed
*/
default CompletableFuture<String> stopWithSavepoint(
final JobID jobId,
final String targetDirectory,
final boolean advanceToEndOfEventTime,
@RpcTimeout final Time timeout) {
throw new UnsupportedOperationException();
}

/**
* Dispose the given savepoint.
*
Expand Down
Empty file.
Loading

0 comments on commit 1b5a251

Please sign in to comment.