Skip to content

Commit

Permalink
[FLINK-11667][checkpointing] Add Synchronous Checkpoint handling in S…
Browse files Browse the repository at this point in the history
…treamTask.

This is a necessary step towards implementing the "stop with a savepoint".
Essentially, a synchronous savepoint is one that waits until also the
notifyOnCheckpointComplete is successfully executed, before releasing the
checkpoint lock (on the StreamTask side).
  • Loading branch information
kl0u committed Apr 17, 2019
1 parent 8a91b07 commit f22dd5b
Show file tree
Hide file tree
Showing 17 changed files with 1,145 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public CheckpointType getCheckpointType() {
* @return <code>true</code> if the properties describe a savepoint, <code>false</code> otherwise.
*/
public boolean isSavepoint() {
return checkpointType == CheckpointType.SAVEPOINT;
return checkpointType.isSavepoint();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -230,6 +230,15 @@ public String toString() {
// Factories and pre-configured properties
// ------------------------------------------------------------------------

private static final CheckpointProperties SYNC_SAVEPOINT = new CheckpointProperties(
true,
CheckpointType.SYNC_SAVEPOINT,
false,
false,
false,
false,
false);

private static final CheckpointProperties SAVEPOINT = new CheckpointProperties(
true,
CheckpointType.SAVEPOINT,
Expand Down Expand Up @@ -279,6 +288,10 @@ public static CheckpointProperties forSavepoint() {
return SAVEPOINT;
}

public static CheckpointProperties forSyncSavepoint() {
return SYNC_SAVEPOINT;
}

/**
* Creates the checkpoint properties for a checkpoint.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,36 @@
package org.apache.flink.runtime.checkpoint;

/**
* The type of checkpoint to perform.
* The type of checkpoint to perform.
*/
public enum CheckpointType {

/** A checkpoint, full or incremental. */
CHECKPOINT,
CHECKPOINT(false, false),

/** A savepoint. */
SAVEPOINT;
/** A regular savepoint. */
SAVEPOINT(true, false),

/** A savepoint taken while suspending/terminating the job. */
SYNC_SAVEPOINT(true, true);

private final boolean isSavepoint;

private final boolean isSynchronous;

CheckpointType(
final boolean isSavepoint,
final boolean isSynchronous) {

this.isSavepoint = isSavepoint;
this.isSynchronous = isSynchronous;
}

public boolean isSavepoint() {
return isSavepoint;
}

public boolean isSynchronous() {
return isSynchronous;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class EventSerializer {

private static final int CHECKPOINT_TYPE_SAVEPOINT = 1;

private static final int CHECKPOINT_TYPE_SYNC_SAVEPOINT = 2;

// ------------------------------------------------------------------------
// Serialization Logic
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -215,6 +217,8 @@ private static ByteBuffer serializeCheckpointBarrier(CheckpointBarrier barrier)
typeInt = CHECKPOINT_TYPE_CHECKPOINT;
} else if (checkpointType == CheckpointType.SAVEPOINT) {
typeInt = CHECKPOINT_TYPE_SAVEPOINT;
} else if (checkpointType == CheckpointType.SYNC_SAVEPOINT) {
typeInt = CHECKPOINT_TYPE_SYNC_SAVEPOINT;
} else {
throw new IOException("Unknown checkpoint type: " + checkpointType);
}
Expand Down Expand Up @@ -247,6 +251,8 @@ private static CheckpointBarrier deserializeCheckpointBarrier(ByteBuffer buffer)
checkpointType = CheckpointType.CHECKPOINT;
} else if (checkpointTypeCode == CHECKPOINT_TYPE_SAVEPOINT) {
checkpointType = CheckpointType.SAVEPOINT;
} else if (checkpointTypeCode == CHECKPOINT_TYPE_SYNC_SAVEPOINT) {
checkpointType = CheckpointType.SYNC_SAVEPOINT;
} else {
throw new IOException("Unknown checkpoint type code: " + checkpointTypeCode);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskmanager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A Thread Pool used to monitor the number of in-flight calls that block and wait for another task executed
* by the same pool in order to get unblocked. When a call (blocking or non-blocking) is submitted, the size
* of the pool is set to {@code 1 + activeBlockingCalls}. This allows the thread pool size to follow the needs
* of the system and to avoid any redundant idle threads consuming resources.
*/
public class BlockingCallMonitoringThreadPool {

private static final Logger LOG = LoggerFactory.getLogger(BlockingCallMonitoringThreadPool.class);

private final AtomicInteger inFlightBlockingCallCounter = new AtomicInteger(0);

private final ThreadPoolExecutor executor;

public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) {
this.executor = new ThreadPoolExecutor(
1,
1,
10L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
checkNotNull(dispatcherThreadFactory));
}

public void submit(final Runnable runnable, final boolean blocking) {
if (blocking) {
submitBlocking(runnable);
} else {
submit(runnable);
}
}

private void submit(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.get());
executor.execute(task);
}

private void submitBlocking(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
CompletableFuture.runAsync(task, executor).whenComplete(
(ignored, e) -> inFlightBlockingCallCounter.decrementAndGet());
}

private void adjustThreadPoolSize(final int activeBlockingCalls) {
if (activeBlockingCalls > 1) {
LOG.debug("There are {} active threads with blocking calls", activeBlockingCalls);
}

final int newPoolSize = 1 + activeBlockingCalls;

// We have to reset the core pool size because (quoted from the official docs):
// ``
// If there are more than corePoolSize but less than maximumPoolSize threads running,
// ** a new thread will be created ONLY IF THE QUEUE IS FULL **.
// ``

executor.setCorePoolSize(newPoolSize);
executor.setMaximumPoolSize(newPoolSize);
}

public void shutdown() {
executor.shutdown();
}

public boolean isShutdown() {
return executor.isShutdown();
}

public void shutdownNow() {
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -262,8 +260,8 @@ public class Task implements Runnable, TaskActions, CheckpointListener {
/** The observed exception, in case the task execution failed. */
private volatile Throwable failureCause;

/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized. */
private volatile ExecutorService asyncCallDispatcher;
/** Executor for asynchronous calls (checkpoints, etc), lazily initialized. */
private volatile BlockingCallMonitoringThreadPool asyncCallDispatcher;

/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
private long taskCancellationInterval;
Expand Down Expand Up @@ -831,7 +829,7 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {

// stop the async dispatcher.
// copy dispatcher reference to stack, against concurrent release
ExecutorService dispatcher = this.asyncCallDispatcher;
final BlockingCallMonitoringThreadPool dispatcher = this.asyncCallDispatcher;
if (dispatcher != null && !dispatcher.isShutdown()) {
dispatcher.shutdownNow();
}
Expand Down Expand Up @@ -966,7 +964,10 @@ public void stopExecution() {
taskManagerActions.failTask(executionId, t);
}
};
executeAsyncCallRunnable(runnable, String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId));
executeAsyncCallRunnable(
runnable,
String.format("Stopping source task %s (%s).", taskNameWithSubtask, executionId),
false);
} else {
throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", taskNameWithSubtask, executionId));
}
Expand Down Expand Up @@ -1166,7 +1167,7 @@ public void triggerPartitionProducerStateCheck(
*/
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {

final AbstractInvokable invokable = this.invokable;
Expand Down Expand Up @@ -1209,7 +1210,10 @@ public void run() {
}
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
executeAsyncCallRunnable(
runnable,
String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId),
checkpointOptions.getCheckpointType().isSynchronous());
}
else {
LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
Expand Down Expand Up @@ -1242,8 +1246,10 @@ public void run() {
}
}
};
executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +
taskNameWithSubtask);
executeAsyncCallRunnable(
runnable,
"Checkpoint Confirmation for " + taskNameWithSubtask,
false);
}
else {
LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);
Expand Down Expand Up @@ -1316,19 +1322,28 @@ void onPartitionStateUpdate(
* @param runnable The async call runnable.
* @param callName The name of the call, for logging purposes.
*/
private void executeAsyncCallRunnable(Runnable runnable, String callName) {
private void executeAsyncCallRunnable(Runnable runnable, String callName, boolean blocking) {
// make sure the executor is initialized. lock against concurrent calls to this function
synchronized (this) {
if (executionState != ExecutionState.RUNNING) {
return;
}

// get ourselves a reference on the stack that cannot be concurrently modified
ExecutorService executor = this.asyncCallDispatcher;
BlockingCallMonitoringThreadPool executor = this.asyncCallDispatcher;
if (executor == null) {
// first time use, initialize
checkState(userCodeClassLoader != null, "userCodeClassLoader must not be null");
executor = Executors.newSingleThreadExecutor(

// Under normal execution, we expect that one thread will suffice, this is why we
// keep the core threads to 1. In the case of a synchronous savepoint, we will block
// the checkpointing thread, so we need an additional thread to execute the
// notifyCheckpointComplete() callback. Finally, we aggressively purge (potentially)
// idle thread so that we do not risk to have many idle thread on machines with multiple
// tasks on them. Either way, only one of them can execute at a time due to the
// checkpoint lock.

executor = new BlockingCallMonitoringThreadPool(
new DispatcherThreadFactory(
TASK_THREADS_GROUP,
"Async calls on " + taskNameWithSubtask,
Expand All @@ -1347,13 +1362,13 @@ private void executeAsyncCallRunnable(Runnable runnable, String callName) {
LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask);

try {
executor.submit(runnable);
executor.submit(runnable, blocking);
}
catch (RejectedExecutionException e) {
// may be that we are concurrently finished or canceled.
// if not, report that something is fishy
if (executionState == ExecutionState.RUNNING) {
throw new RuntimeException("Async call was rejected, even though the task is running.", e);
throw new RuntimeException("Async call with a " + (blocking ? "" : "non-") + "blocking call was rejected, even though the task is running.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ public class CheckpointTypeTest {
public void testOrdinalsAreConstant() {
assertEquals(0, CheckpointType.CHECKPOINT.ordinal());
assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
assertEquals(2, CheckpointType.SYNC_SAVEPOINT.ordinal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,21 @@ public void testCanBeSubsumed() throws Exception {
assertTrue(pending.canBeSubsumed());
}

@Test
public void testSyncSavepointCannotBeSubsumed() throws Exception {
// Forced checkpoints cannot be subsumed
CheckpointProperties forced = CheckpointProperties.forSyncSavepoint();
PendingCheckpoint pending = createPendingCheckpoint(forced);
assertFalse(pending.canBeSubsumed());

try {
pending.abortSubsumed();
fail("Did not throw expected Exception");
} catch (IllegalStateException ignored) {
// Expected
}
}

/**
* Tests that the completion future is succeeded on finalize and failed on
* abort and failures during finalize.
Expand Down
Loading

0 comments on commit f22dd5b

Please sign in to comment.