Skip to content

Commit

Permalink
[FLINK-20675][checkpointing] Refactor interfaces to decline checkpoin…
Browse files Browse the repository at this point in the history
…t with CheckpointException
  • Loading branch information
Myasuka committed Jan 28, 2021
1 parent c7ed2e8 commit f421c6f
Show file tree
Hide file tree
Showing 28 changed files with 88 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
Expand Down Expand Up @@ -245,7 +246,7 @@ public void acknowledgeCheckpoint(
}

@Override
public void declineCheckpoint(long checkpointId, Throwable cause) {
public void declineCheckpoint(long checkpointId, CheckpointException checkpointException) {
throw new UnsupportedOperationException(ERROR_MSG);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -203,9 +204,9 @@ void acknowledgeCheckpoint(
* to successfully complete a certain checkpoint.
*
* @param checkpointId The ID of the declined checkpoint.
* @param cause An optional reason why the checkpoint was declined.
* @param checkpointException The exception why the checkpoint was declined.
*/
void declineCheckpoint(long checkpointId, Throwable cause);
void declineCheckpoint(long checkpointId, CheckpointException checkpointException);

/**
* Marks task execution failed for an external reason (a reason other than the task code itself
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -253,7 +254,8 @@ public void triggerCheckpointOnBarrier(
* @param checkpointId The ID of the checkpoint to be aborted.
* @param cause The reason why the checkpoint was aborted during alignment
*/
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause)
throws IOException {
throw new UnsupportedOperationException(
String.format(
"abortCheckpointOnBarrier not supported by %s", this.getClass().getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,10 @@ public void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause) {
CheckpointException checkpointException) {

// TODO the passed parameter 'cause' is actually always instance of CheckpointException,
// we should change the interfaces to narrow all declined checkpoint's throwable to
// CheckpointException.
Preconditions.checkArgument(
cause instanceof CheckpointException,
"The given cause is "
+ cause.getClass()
+ " instead of expected CheckpointException.");
checkpointCoordinatorGateway.declineCheckpoint(
new DeclineCheckpoint(
jobID, executionAttemptID, checkpointId, (CheckpointException) cause));
jobID, executionAttemptID, checkpointId, checkpointException));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -64,8 +65,11 @@ void reportCheckpointMetrics(
* @param jobID Job ID of the running job
* @param executionAttemptID Execution attempt ID of the running task
* @param checkpointId The ID of the declined checkpoint
* @param cause The optional cause why the checkpoint was declined
* @param checkpointException The exception why the checkpoint was declined
*/
void declineCheckpoint(
JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, Throwable cause);
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointException checkpointException);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -292,8 +293,9 @@ public void acknowledgeCheckpoint(
}

@Override
public void declineCheckpoint(long checkpointId, Throwable cause) {
checkpointResponder.declineCheckpoint(jobId, executionId, checkpointId, cause);
public void declineCheckpoint(long checkpointId, CheckpointException checkpointException) {
checkpointResponder.declineCheckpoint(
jobId, executionId, checkpointId, checkpointException);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.operators.testutils;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand All @@ -34,5 +35,5 @@ public void triggerCheckpointOnBarrier(
CheckpointOptions checkpointOptions,
CheckpointMetricsBuilder checkpointMetrics) {}

public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {}
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -213,7 +214,7 @@ public void acknowledgeCheckpoint(
TaskStateSnapshot subtaskState) {}

@Override
public void declineCheckpoint(long checkpointId, Throwable cause) {
public void declineCheckpoint(long checkpointId, CheckpointException cause) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
Expand Down Expand Up @@ -351,7 +352,7 @@ public void acknowledgeCheckpoint(
}

@Override
public void declineCheckpoint(long checkpointId, Throwable cause) {
public void declineCheckpoint(long checkpointId, CheckpointException cause) {
throw new UnsupportedOperationException(cause);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.taskmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand All @@ -31,13 +32,13 @@ public enum NoOpCheckpointResponder implements CheckpointResponder {
public void acknowledgeCheckpoint(
JobID j, ExecutionAttemptID e, long i, CheckpointMetrics c, TaskStateSnapshot s) {}

@Override
public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, Throwable t) {}

@Override
public void reportCheckpointMetrics(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics) {}

@Override
public void declineCheckpoint(JobID j, ExecutionAttemptID e, long l, CheckpointException c) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -290,7 +291,7 @@ public void triggerCheckpointOnBarrier(
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
throw new UnsupportedOperationException("Should not be called");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
Expand Down Expand Up @@ -72,10 +73,10 @@ public void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause) {
CheckpointException checkpointException) {

DeclineReport declineReport =
new DeclineReport(jobID, executionAttemptID, checkpointId, cause);
new DeclineReport(jobID, executionAttemptID, checkpointId, checkpointException);

declineReports.add(declineReport);

Expand Down Expand Up @@ -137,19 +138,19 @@ public TaskStateSnapshot getSubtaskState() {

public static class DeclineReport extends AbstractReport {

public final Throwable cause;
public final CheckpointException cause;

public DeclineReport(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause) {
CheckpointException cause) {

super(jobID, executionAttemptID, checkpointId);
this.cause = cause;
}

public Throwable getCause() {
public CheckpointException getCause() {
return cause;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void declineCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
Throwable cause) {}
CheckpointException checkpointException) {}
};

JobID jobID = new JobID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -250,7 +251,8 @@ public void triggerCheckpointOnBarrier(
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause)
throws IOException {
CompletableFuture<Boolean> resultFuture =
pendingCheckpointCompletedFutures.remove(checkpointId);
if (resultFuture != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,8 @@ public void triggerCheckpointOnBarrier(
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws IOException {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause)
throws IOException {
subtaskCheckpointCoordinator.abortCheckpointOnBarrier(checkpointId, cause, operatorChain);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -50,7 +51,7 @@ public interface SubtaskCheckpointCoordinator extends Closeable {
CheckpointStorageWorkerView getCheckpointStorage();

void abortCheckpointOnBarrier(
long checkpointId, Throwable cause, OperatorChain<?, ?> operatorChain)
long checkpointId, CheckpointException cause, OperatorChain<?, ?> operatorChain)
throws IOException;

/** Must be called after {@link #initCheckpoint(long, CheckpointOptions)}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private static ChannelStateWriter openChannelStateWriter(

@Override
public void abortCheckpointOnBarrier(
long checkpointId, Throwable cause, OperatorChain<?, ?> operatorChain)
long checkpointId, CheckpointException cause, OperatorChain<?, ?> operatorChain)
throws IOException {
LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", checkpointId, taskName);
lastCheckpointId = Math.max(lastCheckpointId, checkpointId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.io.checkpointing;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -79,7 +80,7 @@ public void triggerCheckpointOnBarrier(
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
assertTrue(
"Unexpected abortCheckpointOnBarrier(" + checkpointId + ")",
i < checkpointIDs.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.runtime.io.checkpointing;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -141,7 +142,7 @@ public void triggerCheckpointOnBarrier(
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
checkpointAborted = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.streaming.runtime.io.checkpointing;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand Down Expand Up @@ -968,7 +969,7 @@ public ValidatingCheckpointHandler(long nextExpectedCheckpointId) {
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
super.abortCheckpointOnBarrier(checkpointId, cause);
nextExpectedCheckpointId = -1;
}
Expand Down Expand Up @@ -997,7 +998,7 @@ public void init() {}
protected void processInput(MailboxDefaultAction.Controller controller) {}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause)
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause)
throws IOException {
abortedCheckpointId = checkpointId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ public void triggerCheckpointOnBarrier(
}

@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {
public void abortCheckpointOnBarrier(long checkpointId, CheckpointException cause) {
lastCanceledCheckpointId = checkpointId;
failureReason = ((CheckpointException) cause).getCheckpointFailureReason();
failureReason = cause.getCheckpointFailureReason();
abortedCheckpointCounter++;
}

Expand Down
Loading

0 comments on commit f421c6f

Please sign in to comment.