Skip to content

Commit

Permalink
[FLINK-16587][checkpointing] Spill the in-flight input and output buf…
Browse files Browse the repository at this point in the history
…fers during checkpointing.
  • Loading branch information
zhijiangW authored and pnowojski committed Apr 17, 2020
1 parent f661d18 commit 5cebfb7
Show file tree
Hide file tree
Showing 18 changed files with 206 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
Expand All @@ -43,6 +44,8 @@
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while reading the first barrier
* and keeping track of the number of received barriers and consumed barriers.
Expand Down Expand Up @@ -86,6 +89,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {

CheckpointBarrierUnaligner(
int[] numberOfInputChannelsPerGate,
ChannelStateWriter channelStateWriter,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
super(toNotifyOnCheckpoint);
Expand All @@ -108,7 +112,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
.flatMap(Function.identity())
.toArray(InputChannelInfo[]::new);

threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, this);
threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this);
}

@Override
Expand Down Expand Up @@ -315,12 +319,16 @@ private static class ThreadSafeUnaligner implements BufferReceivedListener, Clos
/** The number of opened channels. */
private int numOpenChannels;

private final ChannelStateWriter channelStateWriter;

private final CheckpointBarrierUnaligner handler;

public ThreadSafeUnaligner(
int totalNumChannels,
ChannelStateWriter channelStateWriter,
CheckpointBarrierUnaligner handler) {
storeNewBuffers = new boolean[totalNumChannels];
this.channelStateWriter = channelStateWriter;
this.handler = handler;
numOpenChannels = totalNumChannels;
}
Expand Down Expand Up @@ -350,7 +358,15 @@ public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputC

@Override
public synchronized void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) {
buffer.recycleBuffer();
if (storeNewBuffers[handler.getFlattenedChannelIndex(channelInfo)]) {
channelStateWriter.addInputData(
currentReceivedCheckpointId,
channelInfo,
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
buffer);
} else {
buffer.recycleBuffer();
}
}

@Override
Expand Down Expand Up @@ -382,6 +398,7 @@ private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws
Arrays.fill(storeNewBuffers, true);
numBarriersReceived = 0;
allBarriersReceivedFuture = new CompletableFuture<>();
channelStateWriter.start(barrierId, barrier.getCheckpointOptions());
}

public synchronized void resetReceivedBarriers(long checkpointId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public List<Buffer> requestInflightBuffers(long checkpointId, int channelIndex)
return Collections.emptyList();
}

public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
return ((CheckpointBarrierUnaligner) barrierHandler).getAllBarriersReceivedFuture(checkpointId);
}

private int offsetChannelIndex(int channelIndex) {
return channelIndex + channelIndexOffset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.MetricNames;
Expand All @@ -41,6 +42,7 @@ public class InputProcessorUtil {
public static CheckpointedInputGate createCheckpointedInputGate(
AbstractInvokable toNotifyOnCheckpoint,
StreamConfig config,
ChannelStateWriter channelStateWriter,
InputGate inputGate,
Configuration taskManagerConfig,
TaskIOMetricGroup taskIOMetricGroup,
Expand All @@ -52,6 +54,7 @@ public static CheckpointedInputGate createCheckpointedInputGate(
CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
config,
IntStream.of(inputGate.getNumberOfInputChannels()),
channelStateWriter,
taskName,
toNotifyOnCheckpoint);
registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
Expand All @@ -68,6 +71,7 @@ public static CheckpointedInputGate createCheckpointedInputGate(
public static CheckpointedInputGate[] createCheckpointedInputGatePair(
AbstractInvokable toNotifyOnCheckpoint,
StreamConfig config,
ChannelStateWriter channelStateWriter,
Configuration taskManagerConfig,
TaskIOMetricGroup taskIOMetricGroup,
String taskName,
Expand All @@ -92,6 +96,7 @@ public static CheckpointedInputGate[] createCheckpointedInputGatePair(
CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler(
config,
Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels),
channelStateWriter,
taskName,
toNotifyOnCheckpoint);
registerCheckpointMetrics(taskIOMetricGroup, barrierHandler);
Expand Down Expand Up @@ -125,13 +130,15 @@ private static BufferStorage[] copyBufferStoragesExceptOf(
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
StreamConfig config,
IntStream numberOfInputChannelsPerGate,
ChannelStateWriter channelStateWriter,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
switch (config.getCheckpointMode()) {
case EXACTLY_ONCE:
if (config.isUnalignedCheckpointsEnabled()) {
return new CheckpointBarrierUnaligner(
numberOfInputChannelsPerGate.toArray(),
channelStateWriter,
taskName,
toNotifyOnCheckpoint);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* Interface for processing records by {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
Expand All @@ -34,4 +37,6 @@ public interface StreamInputProcessor extends AvailabilityProvider, Closeable {
* state and/or {@link #getAvailableFuture()}.
*/
InputStatus processInput() throws Exception;

CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelection;
Expand Down Expand Up @@ -176,6 +177,17 @@ public void close() throws IOException {
}
}

@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
CompletableFuture<?>[] inputFutures = new CompletableFuture[inputProcessors.length];
for (int index = 0; index < inputFutures.length; index++) {
inputFutures[index] = inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId);
}
return CompletableFuture.allOf(inputFutures);
}

private int selectNextReadingInputIndex() {
if (!inputSelectionHandler.isAnyInputAvailable()) {
fullCheckAndSetAvailable();
Expand Down Expand Up @@ -238,6 +250,12 @@ public InputStatus processInput() throws Exception {
public void close() throws IOException {
networkInput.close();
}

public CompletableFuture<?> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
return networkInput.prepareSnapshot(channelStateWriter, checkpointId);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;

Expand Down Expand Up @@ -71,6 +72,13 @@ public InputStatus processInput() throws Exception {
return status;
}

@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
return input.prepareSnapshot(channelStateWriter, checkpointId);
}

@Override
public void close() throws IOException {
input.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* Basic interface for inputs of stream operators.
Expand All @@ -32,4 +35,9 @@ public interface StreamTaskInput<T> extends PushingAsyncDataInput<T>, Closeable
* Returns the input index of this input.
*/
int getInputIndex();

/**
* Prepares to spill the in-flight input buffers as checkpoint snapshot.
*/
CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -197,6 +199,30 @@ public CompletableFuture<?> getAvailableFuture() {
return checkpointedInputGate.getAvailableFuture();
}

@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) {
final InputChannel channel = checkpointedInputGate.getChannel(channelIndex);

// Assumption for retrieving buffers = one concurrent checkpoint
recordDeserializers[channelIndex].getUnconsumedBuffer().ifPresent(buffer ->
channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
buffer));

channelStateWriter.addInputData(
checkpointId,
channel.getChannelInfo(),
ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN,
checkpointedInputGate.requestInflightBuffers(checkpointId, channelIndex).toArray(new Buffer[0]));
}
return checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId);
}

@Override
public void close() throws IOException {
// release the deserializers . this part should not ever fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.streaming.api.operators.SourceReaderOperator;
import org.apache.flink.util.IOUtils;

Expand Down Expand Up @@ -62,5 +63,12 @@ public int getInputIndex() {
public void close() {
IOUtils.closeQuietly(operator::close);
}

@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) {
return CompletableFuture.completedFuture(null);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
Expand Down Expand Up @@ -184,6 +185,15 @@ public InputStatus processInput() throws Exception {
return getInputStatus();
}

@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter,
long checkpointId) throws IOException {
return CompletableFuture.allOf(
input1.prepareSnapshot(channelStateWriter, checkpointId),
input2.prepareSnapshot(channelStateWriter, checkpointId));
}

private int selectFirstReadingInputIndex() throws IOException {
// Note: the first call to nextSelection () on the operator must be made after this operator
// is opened to ensure that any changes about the input selection in its open()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -58,13 +59,15 @@ private enum AsyncCheckpointState {
private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
private final CheckpointMetaData checkpointMetaData;
private final CheckpointMetrics checkpointMetrics;
private final Future<?> channelWrittenFuture;
private final long asyncStartNanos;
private final AtomicReference<AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING);

AsyncCheckpointRunnable(
Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,
CheckpointMetaData checkpointMetaData,
CheckpointMetrics checkpointMetrics,
Future<?> channelWrittenFuture,
long asyncStartNanos,
String taskName,
CloseableRegistry closeableRegistry,
Expand All @@ -74,6 +77,7 @@ private enum AsyncCheckpointState {
this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress);
this.checkpointMetaData = checkNotNull(checkpointMetaData);
this.checkpointMetrics = checkNotNull(checkpointMetrics);
this.channelWrittenFuture = checkNotNull(channelWrittenFuture);
this.asyncStartNanos = asyncStartNanos;
this.taskName = checkNotNull(taskName);
this.closeableRegistry = checkNotNull(closeableRegistry);
Expand Down Expand Up @@ -113,6 +117,8 @@ public void run() {

checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

channelWrittenFuture.get();

if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {

reportCompletedSnapshotStates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ protected void createInputProcessor(
CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair(
this,
getConfiguration(),
getChannelStateWriter(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getEnvironment().getMetricGroup().getIOMetricGroup(),
getTaskNameWithSubtaskAndId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private CheckpointedInputGate createCheckpointedInputGate() {
return InputProcessorUtil.createCheckpointedInputGate(
this,
configuration,
getChannelStateWriter(),
inputGate,
getEnvironment().getTaskManagerInfo().getConfiguration(),
getEnvironment().getMetricGroup().getIOMetricGroup(),
Expand Down
Loading

0 comments on commit 5cebfb7

Please sign in to comment.