Skip to content

Commit

Permalink
[hotfix] Suppress emitting non-causal exceptions from closed checkpoi…
Browse files Browse the repository at this point in the history
…nting thread

This avoids that an exception that is caused by closing a running snapshot is reported.
With this we avoid that users get confused by their logs or that this exception could be
reported before its actual cause, thus hiding the real cause in logs.
  • Loading branch information
StefanRRichter committed Feb 25, 2018
1 parent 32e25eb commit 08d0881
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,8 @@ protected static final class AsyncCheckpointRunnable implements Runnable, Closea

private final long asyncStartNanos;

private final AtomicReference<CheckpointingOperation.AsynCheckpointState> asyncCheckpointState = new AtomicReference<>(
CheckpointingOperation.AsynCheckpointState.RUNNING);
private final AtomicReference<CheckpointingOperation.AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(
CheckpointingOperation.AsyncCheckpointState.RUNNING);

AsyncCheckpointRunnable(
StreamTask<?, ?> owner,
Expand Down Expand Up @@ -865,8 +865,8 @@ public void run() {

checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {

reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
Expand Down Expand Up @@ -917,63 +917,92 @@ private void reportCompletedSnapshotStates(
}

private void handleExecutionException(Exception e) {
// the state is completed if an exception occurred in the acknowledgeCheckpoint call
// in order to clean up, we have to set it to RUNNING again.
asyncCheckpointState.compareAndSet(
CheckpointingOperation.AsynCheckpointState.COMPLETED,
CheckpointingOperation.AsynCheckpointState.RUNNING);

try {
cleanup();
} catch (Exception cleanupException) {
e.addSuppressed(cleanupException);
}
boolean didCleanup = false;
CheckpointingOperation.AsyncCheckpointState currentState = asyncCheckpointState.get();

Exception checkpointException = new Exception(
"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +
owner.getName() + '.',
e);
while (CheckpointingOperation.AsyncCheckpointState.DISCARDED != currentState) {

owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
checkpointMetaData,
checkpointException);
if (asyncCheckpointState.compareAndSet(
currentState,
CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {

didCleanup = true;

try {
cleanup();
} catch (Exception cleanupException) {
e.addSuppressed(cleanupException);
}

Exception checkpointException = new Exception(
"Could not materialize checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +
owner.getName() + '.',
e);

// We only report the exception for the original cause of fail and cleanup.
// Otherwise this followup exception could race the original exception in failing the task.
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
checkpointMetaData,
checkpointException);

currentState = CheckpointingOperation.AsyncCheckpointState.DISCARDED;
} else {
currentState = asyncCheckpointState.get();
}
}

if (!didCleanup) {
LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", e);
}
}

@Override
public void close() {
try {
cleanup();
} catch (Exception cleanupException) {
LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException);
if (asyncCheckpointState.compareAndSet(
CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {

try {
cleanup();
} catch (Exception cleanupException) {
LOG.warn("Could not properly clean up the async checkpoint runnable.", cleanupException);
}
} else {
logFailedCleanupAttempt();
}
}

private void cleanup() throws Exception {
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING, CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", checkpointMetaData.getCheckpointId(), owner.getName());
Exception exception = null;
LOG.debug(
"Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.",
checkpointMetaData.getCheckpointId(),
owner.getName());

// clean up ongoing operator snapshot results and non partitioned state handles
for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
if (operatorSnapshotResult != null) {
try {
operatorSnapshotResult.cancel();
} catch (Exception cancelException) {
exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
}
Exception exception = null;

// clean up ongoing operator snapshot results and non partitioned state handles
for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
if (operatorSnapshotResult != null) {
try {
operatorSnapshotResult.cancel();
} catch (Exception cancelException) {
exception = ExceptionUtils.firstOrSuppressed(cancelException, exception);
}
}
}

if (null != exception) {
throw exception;
}
} else {
LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has " +
"already been completed. Thus, the state handles are not cleaned up.",
owner.getName(),
checkpointMetaData.getCheckpointId());
if (null != exception) {
throw exception;
}
}

private void logFailedCleanupAttempt() {
LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has " +
"already been completed. Thus, the state handles are not cleaned up.",
owner.getName(),
checkpointMetaData.getCheckpointId());
}
}

public CloseableRegistry getCancelables() {
Expand Down Expand Up @@ -1088,7 +1117,7 @@ private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
}
}

private enum AsynCheckpointState {
private enum AsyncCheckpointState {
RUNNING,
DISCARDED,
COMPLETED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
Expand All @@ -97,7 +99,6 @@ public class StreamTaskTerminationTest extends TestLogger {
public static final OneShotLatch RUN_LATCH = new OneShotLatch();
public static final OneShotLatch CHECKPOINTING_LATCH = new OneShotLatch();
private static final OneShotLatch CLEANUP_LATCH = new OneShotLatch();
private static final OneShotLatch HANDLE_ASYNC_EXCEPTION_LATCH = new OneShotLatch();

/**
* FLINK-6833
Expand Down Expand Up @@ -209,8 +210,7 @@ public BlockingStreamTask(Environment env) {
}

@Override
protected void init() throws Exception {

protected void init() {
}

@Override
Expand All @@ -226,24 +226,16 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();

// wait until handle async exception has been called to proceed with the termination of the
// StreamTask
HANDLE_ASYNC_EXCEPTION_LATCH.await();
// wait until all async checkpoint threads are terminated, so that no more exceptions can be reported
Assert.assertTrue(getAsyncOperationsThreadPool().awaitTermination(30L, TimeUnit.SECONDS));
}

@Override
protected void cancelTask() throws Exception {
}

@Override
public void handleAsyncException(String message, Throwable exception) {
super.handleAsyncException(message, exception);

HANDLE_ASYNC_EXCEPTION_LATCH.trigger();
protected void cancelTask() {
}
}

static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
private static class NoOpStreamOperator<T> extends AbstractStreamOperator<T> {
private static final long serialVersionUID = 4517845269225218312L;
}

Expand All @@ -252,7 +244,7 @@ static class BlockingStateBackend implements StateBackend {
private static final long serialVersionUID = -5053068148933314100L;

@Override
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) {
throw new UnsupportedOperationException();
}

Expand All @@ -269,7 +261,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws IOException {
TaskKvStateRegistry kvStateRegistry) {
return null;
}

Expand Down

0 comments on commit 08d0881

Please sign in to comment.