From 5cebfb76c717568ede8f7e8a82e9bbdf774c19ed Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Fri, 3 Jan 2020 05:14:59 +0100 Subject: [PATCH] [FLINK-16587][checkpointing] Spill the in-flight input and output buffers during checkpointing. --- .../io/CheckpointBarrierUnaligner.java | 21 ++++++- .../runtime/io/CheckpointedInputGate.java | 4 ++ .../runtime/io/InputProcessorUtil.java | 7 +++ .../runtime/io/StreamInputProcessor.java | 5 ++ .../io/StreamMultipleInputProcessor.java | 18 ++++++ .../runtime/io/StreamOneInputProcessor.java | 8 +++ .../streaming/runtime/io/StreamTaskInput.java | 8 +++ .../runtime/io/StreamTaskNetworkInput.java | 26 +++++++++ .../runtime/io/StreamTaskSourceInput.java | 8 +++ .../runtime/io/StreamTwoInputProcessor.java | 10 ++++ .../tasks/AsyncCheckpointRunnable.java | 6 ++ .../tasks/MultipleInputStreamTask.java | 1 + .../runtime/tasks/OneInputStreamTask.java | 1 + .../streaming/runtime/tasks/StreamTask.java | 18 +++++- .../SubtaskCheckpointCoordinatorImpl.java | 57 +++++++++++++++++-- .../runtime/tasks/TwoInputStreamTask.java | 1 + .../tasks/LocalStateForwardingTest.java | 2 + .../runtime/tasks/StreamTaskTest.java | 13 +++++ 18 files changed, 206 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java index 541f4ee106733..56839db03a387 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; @@ -43,6 +44,8 @@ import java.util.function.Function; import java.util.stream.IntStream; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while reading the first barrier * and keeping track of the number of received barriers and consumed barriers. @@ -86,6 +89,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { CheckpointBarrierUnaligner( int[] numberOfInputChannelsPerGate, + ChannelStateWriter channelStateWriter, String taskName, AbstractInvokable toNotifyOnCheckpoint) { super(toNotifyOnCheckpoint); @@ -108,7 +112,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { .flatMap(Function.identity()) .toArray(InputChannelInfo[]::new); - threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, this); + threadSafeUnaligner = new ThreadSafeUnaligner(totalNumChannels, checkNotNull(channelStateWriter), this); } @Override @@ -315,12 +319,16 @@ private static class ThreadSafeUnaligner implements BufferReceivedListener, Clos /** The number of opened channels. */ private int numOpenChannels; + private final ChannelStateWriter channelStateWriter; + private final CheckpointBarrierUnaligner handler; public ThreadSafeUnaligner( int totalNumChannels, + ChannelStateWriter channelStateWriter, CheckpointBarrierUnaligner handler) { storeNewBuffers = new boolean[totalNumChannels]; + this.channelStateWriter = channelStateWriter; this.handler = handler; numOpenChannels = totalNumChannels; } @@ -350,7 +358,15 @@ public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputC @Override public synchronized void notifyBufferReceived(Buffer buffer, InputChannelInfo channelInfo) { - buffer.recycleBuffer(); + if (storeNewBuffers[handler.getFlattenedChannelIndex(channelInfo)]) { + channelStateWriter.addInputData( + currentReceivedCheckpointId, + channelInfo, + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + buffer); + } else { + buffer.recycleBuffer(); + } } @Override @@ -382,6 +398,7 @@ private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws Arrays.fill(storeNewBuffers, true); numBarriersReceived = 0; allBarriersReceivedFuture = new CompletableFuture<>(); + channelStateWriter.start(barrierId, barrier.getCheckpointOptions()); } public synchronized void resetReceivedBarriers(long checkpointId) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java index cfcba48aa686c..1388dd145e451 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java @@ -184,6 +184,10 @@ public List requestInflightBuffers(long checkpointId, int channelIndex) return Collections.emptyList(); } + public CompletableFuture getAllBarriersReceivedFuture(long checkpointId) { + return ((CheckpointBarrierUnaligner) barrierHandler).getAllBarriersReceivedFuture(checkpointId); + } + private int offsetChannelIndex(int channelIndex) { return channelIndex + channelIndexOffset; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index 42ad81d621722..27156578e0521 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.metrics.MetricNames; @@ -41,6 +42,7 @@ public class InputProcessorUtil { public static CheckpointedInputGate createCheckpointedInputGate( AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, + ChannelStateWriter channelStateWriter, InputGate inputGate, Configuration taskManagerConfig, TaskIOMetricGroup taskIOMetricGroup, @@ -52,6 +54,7 @@ public static CheckpointedInputGate createCheckpointedInputGate( CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( config, IntStream.of(inputGate.getNumberOfInputChannels()), + channelStateWriter, taskName, toNotifyOnCheckpoint); registerCheckpointMetrics(taskIOMetricGroup, barrierHandler); @@ -68,6 +71,7 @@ public static CheckpointedInputGate createCheckpointedInputGate( public static CheckpointedInputGate[] createCheckpointedInputGatePair( AbstractInvokable toNotifyOnCheckpoint, StreamConfig config, + ChannelStateWriter channelStateWriter, Configuration taskManagerConfig, TaskIOMetricGroup taskIOMetricGroup, String taskName, @@ -92,6 +96,7 @@ public static CheckpointedInputGate[] createCheckpointedInputGatePair( CheckpointBarrierHandler barrierHandler = createCheckpointBarrierHandler( config, Arrays.stream(inputGates).mapToInt(InputGate::getNumberOfInputChannels), + channelStateWriter, taskName, toNotifyOnCheckpoint); registerCheckpointMetrics(taskIOMetricGroup, barrierHandler); @@ -125,6 +130,7 @@ private static BufferStorage[] copyBufferStoragesExceptOf( private static CheckpointBarrierHandler createCheckpointBarrierHandler( StreamConfig config, IntStream numberOfInputChannelsPerGate, + ChannelStateWriter channelStateWriter, String taskName, AbstractInvokable toNotifyOnCheckpoint) { switch (config.getCheckpointMode()) { @@ -132,6 +138,7 @@ private static CheckpointBarrierHandler createCheckpointBarrierHandler( if (config.isUnalignedCheckpointsEnabled()) { return new CheckpointBarrierUnaligner( numberOfInputChannelsPerGate.toArray(), + channelStateWriter, taskName, toNotifyOnCheckpoint); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 3c06239295787..699037984baad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -19,9 +19,12 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.AvailabilityProvider; import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Interface for processing records by {@link org.apache.flink.streaming.runtime.tasks.StreamTask}. @@ -34,4 +37,6 @@ public interface StreamInputProcessor extends AvailabilityProvider, Closeable { * state and/or {@link #getAvailableFuture()}. */ InputStatus processInput() throws Exception; + + CompletableFuture prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java index 3505f75584335..a16cc9518c1c6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.InputSelection; @@ -176,6 +177,17 @@ public void close() throws IOException { } } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + CompletableFuture[] inputFutures = new CompletableFuture[inputProcessors.length]; + for (int index = 0; index < inputFutures.length; index++) { + inputFutures[index] = inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId); + } + return CompletableFuture.allOf(inputFutures); + } + private int selectNextReadingInputIndex() { if (!inputSelectionHandler.isAnyInputAvailable()) { fullCheckAndSetAvailable(); @@ -238,6 +250,12 @@ public InputStatus processInput() throws Exception { public void close() throws IOException { networkInput.close(); } + + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + return networkInput.prepareSnapshot(channelStateWriter, checkpointId); + } } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java index 1f6ca27a6c8c3..2ca78cef136a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput; import org.apache.flink.streaming.runtime.tasks.OperatorChain; @@ -71,6 +72,13 @@ public InputStatus processInput() throws Exception { return status; } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + return input.prepareSnapshot(channelStateWriter, checkpointId); + } + @Override public void close() throws IOException { input.close(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java index 484d9cd8d883c..a645051e1f01f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java @@ -18,8 +18,11 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Basic interface for inputs of stream operators. @@ -32,4 +35,9 @@ public interface StreamTaskInput extends PushingAsyncDataInput, Closeable * Returns the input index of this input. */ int getInputIndex(); + + /** + * Prepares to spill the in-flight input buffers as checkpoint snapshot. + */ + CompletableFuture prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java index 84fa8aef7723e..8572404c40600 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; @@ -29,6 +30,7 @@ import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; import org.apache.flink.streaming.api.watermark.Watermark; @@ -197,6 +199,30 @@ public CompletableFuture getAvailableFuture() { return checkpointedInputGate.getAvailableFuture(); } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + for (int channelIndex = 0; channelIndex < recordDeserializers.length; channelIndex++) { + final InputChannel channel = checkpointedInputGate.getChannel(channelIndex); + + // Assumption for retrieving buffers = one concurrent checkpoint + recordDeserializers[channelIndex].getUnconsumedBuffer().ifPresent(buffer -> + channelStateWriter.addInputData( + checkpointId, + channel.getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + buffer)); + + channelStateWriter.addInputData( + checkpointId, + channel.getChannelInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + checkpointedInputGate.requestInflightBuffers(checkpointId, channelIndex).toArray(new Buffer[0])); + } + return checkpointedInputGate.getAllBarriersReceivedFuture(checkpointId); + } + @Override public void close() throws IOException { // release the deserializers . this part should not ever fail diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java index a213558296f1f..95ce9bb5ed76b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.streaming.api.operators.SourceReaderOperator; import org.apache.flink.util.IOUtils; @@ -62,5 +63,12 @@ public int getInputIndex() { public void close() { IOUtils.closeQuietly(operator::close); } + + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) { + return CompletableFuture.completedFuture(null); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index aad275374722e..071c5d6966abb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.streaming.api.operators.InputSelection; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -184,6 +185,15 @@ public InputStatus processInput() throws Exception { return getInputStatus(); } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + long checkpointId) throws IOException { + return CompletableFuture.allOf( + input1.prepareSnapshot(channelStateWriter, checkpointId), + input2.prepareSnapshot(channelStateWriter, checkpointId)); + } + private int selectFirstReadingInputIndex() throws IOException { // Note: the first call to nextSelection () on the operator must be made after this operator // is opened to ensure that any changes about the input selection in its open() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index 098384a89cfe5..b09c5a936d13a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -34,6 +34,7 @@ import java.io.Closeable; import java.util.Map; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -58,6 +59,7 @@ private enum AsyncCheckpointState { private final Map operatorSnapshotsInProgress; private final CheckpointMetaData checkpointMetaData; private final CheckpointMetrics checkpointMetrics; + private final Future channelWrittenFuture; private final long asyncStartNanos; private final AtomicReference asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING); @@ -65,6 +67,7 @@ private enum AsyncCheckpointState { Map operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, + Future channelWrittenFuture, long asyncStartNanos, String taskName, CloseableRegistry closeableRegistry, @@ -74,6 +77,7 @@ private enum AsyncCheckpointState { this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress); this.checkpointMetaData = checkNotNull(checkpointMetaData); this.checkpointMetrics = checkNotNull(checkpointMetrics); + this.channelWrittenFuture = checkNotNull(channelWrittenFuture); this.asyncStartNanos = asyncStartNanos; this.taskName = checkNotNull(taskName); this.closeableRegistry = checkNotNull(closeableRegistry); @@ -113,6 +117,8 @@ public void run() { checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); + channelWrittenFuture.get(); + if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) { reportCompletedSnapshotStates( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java index 535389252aa5f..69afaafdce26e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java @@ -101,6 +101,7 @@ protected void createInputProcessor( CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair( this, getConfiguration(), + getChannelStateWriter(), getEnvironment().getTaskManagerInfo().getConfiguration(), getEnvironment().getMetricGroup().getIOMetricGroup(), getTaskNameWithSubtaskAndId(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index ea449d78100ca..2f9053d0a95b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -107,6 +107,7 @@ private CheckpointedInputGate createCheckpointedInputGate() { return InputProcessorUtil.createCheckpointedInputGate( this, configuration, + getChannelStateWriter(), inputGate, getEnvironment().getTaskManagerInfo().getConfiguration(), getEnvironment().getMetricGroup().getIOMetricGroup(), diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index ad6d3cff6e9d5..1e4c5dda3aa03 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; @@ -80,6 +81,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -281,7 +283,8 @@ protected StreamTask( getAsyncOperationsThreadPool(), getEnvironment(), this, - false); // todo: pass true if unaligned checkpoints enabled + configuration.isUnalignedCheckpointsEnabled(), + this::prepareInputSnapshot); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { @@ -292,6 +295,17 @@ protected StreamTask( } } + private CompletableFuture prepareInputSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException { + if (inputProcessor == null) { + return FutureUtils.completedVoidFuture(); + } + return inputProcessor.prepareSnapshot(channelStateWriter, checkpointId); + } + + protected ChannelStateWriter getChannelStateWriter() { + return subtaskCheckpointCoordinator.getChannelStateWriter(); + } + // ------------------------------------------------------------------------ // Life cycle methods for specific implementations // ------------------------------------------------------------------------ @@ -726,6 +740,7 @@ private boolean triggerCheckpoint( .setBytesBufferedInAlignment(0L) .setAlignmentDurationNanos(0L); + subtaskCheckpointCoordinator.getChannelStateWriter().start(checkpointMetaData.getCheckpointId(), checkpointOptions); boolean success = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, advanceToEndOfEventTime); if (!success) { declineCheckpoint(checkpointMetaData.getCheckpointId()); @@ -865,6 +880,7 @@ private void notifyCheckpointComplete(long checkpointId) { LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName()); } + subtaskCheckpointCoordinator.getChannelStateWriter().notifyCheckpointComplete(checkpointId); getEnvironment().getTaskStateManager().notifyCheckpointComplete(checkpointId); if (isRunning && isSynchronousSavepointId(checkpointId)) { finishTask(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 528e80ac4c7f4..1125838b3e367 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -25,9 +25,13 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; @@ -36,6 +40,7 @@ import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.function.BiFunctionWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +48,10 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.function.Supplier; import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT; @@ -62,6 +69,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { private final AsyncExceptionHandler asyncExceptionHandler; private final ChannelStateWriter channelStateWriter; private final StreamTaskActionExecutor actionExecutor; + private final boolean unalignedCheckpointEnabled; + private final BiFunctionWithException, IOException> prepareInputSnapshot; SubtaskCheckpointCoordinatorImpl( CheckpointStorageWorkerView checkpointStorage, @@ -71,7 +80,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { ExecutorService executorService, Environment env, AsyncExceptionHandler asyncExceptionHandler, - boolean sendChannelState) throws IOException { + boolean unalignedCheckpointEnabled, + BiFunctionWithException, IOException> prepareInputSnapshot) throws IOException { this.checkpointStorage = new CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage)); this.taskName = checkNotNull(taskName); this.closeableRegistry = checkNotNull(closeableRegistry); @@ -79,11 +89,13 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { this.env = checkNotNull(env); this.asyncExceptionHandler = checkNotNull(asyncExceptionHandler); this.actionExecutor = checkNotNull(actionExecutor); - this.channelStateWriter = sendChannelState ? openChannelStateWriter() : ChannelStateWriter.NO_OP; + this.channelStateWriter = unalignedCheckpointEnabled ? openChannelStateWriter() : ChannelStateWriter.NO_OP; + this.unalignedCheckpointEnabled = unalignedCheckpointEnabled; + this.prepareInputSnapshot = prepareInputSnapshot; this.closeableRegistry.registerCloseable(this); } - private ChannelStateWriterImpl openChannelStateWriter() { + private ChannelStateWriter openChannelStateWriter() { ChannelStateWriterImpl writer = new ChannelStateWriterImpl(this.checkpointStorage); writer.open(); return writer; @@ -136,9 +148,16 @@ public void checkpointState( // Step (2): Send the checkpoint barrier downstream operatorChain.broadcastEvent( - new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options)); + new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options), + unalignedCheckpointEnabled); - // Step (3): Take the state snapshot. This should be largely asynchronous, to not impact progress of the streaming topology + // Step (3): Prepare to spill the in-flight buffers for input and output + if (unalignedCheckpointEnabled) { + prepareInflightDataSnapshot(metadata.getCheckpointId()); + } + + // Step (4): Take the state snapshot. This should be largely asynchronous, to not impact progress of the + // streaming topology Map snapshotFutures = new HashMap<>(operatorChain.getNumberOfOperators()); try { @@ -184,12 +203,40 @@ private void cleanup( } } + private void prepareInflightDataSnapshot(long checkpointId) throws IOException { + prepareInputSnapshot.apply(channelStateWriter, checkpointId) + .thenAccept(unused -> channelStateWriter.finishInput(checkpointId)); + + ResultPartitionWriter[] writers = env.getAllWriters(); + for (ResultPartitionWriter writer : writers) { + for (int i = 0; i < writer.getNumberOfSubpartitions(); i++) { + ResultSubpartition subpartition = writer.getSubpartition(i); + channelStateWriter.addOutputData( + checkpointId, + subpartition.getSubpartitionInfo(), + ChannelStateWriter.SEQUENCE_NUMBER_UNKNOWN, + subpartition.requestInflightBufferSnapshot().toArray(new Buffer[0])); + } + } + channelStateWriter.finishOutput(checkpointId); + } + private void finishAndReportAsync(Map snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics) { + final Future channelWrittenFuture; + if (unalignedCheckpointEnabled) { + ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId()); + channelWrittenFuture = CompletableFuture.allOf( + writeResult.getInputChannelStateHandles(), + writeResult.getResultSubpartitionStateHandles()); + } else { + channelWrittenFuture = FutureUtils.completedVoidFuture(); + } // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit executorService.execute(new AsyncCheckpointRunnable( snapshotFutures, metadata, metrics, + channelWrittenFuture, System.nanoTime(), taskName, closeableRegistry, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 78078bca93433..37d550f34771e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -62,6 +62,7 @@ protected void createInputProcessor( CheckpointedInputGate[] checkpointedInputGates = InputProcessorUtil.createCheckpointedInputGatePair( this, getConfiguration(), + getChannelStateWriter(), getEnvironment().getTaskManagerInfo().getConfiguration(), getEnvironment().getMetricGroup().getIOMetricGroup(), getTaskNameWithSubtaskAndId(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index ae6cb8e4ac45f..8d4fae2aca817 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -59,6 +59,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; @@ -114,6 +115,7 @@ public void testReportingFromSnapshotToTaskStateManager() throws Exception { snapshots, checkpointMetaData, checkpointMetrics, + CompletableFuture.completedFuture(null), 0L, testStreamTask.getName(), testStreamTask.getCancelables(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index b5e8430f4d247..cbc39f728d900 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; +import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.TestingUncaughtExceptionHandler; import org.apache.flink.runtime.execution.CancelTaskException; @@ -1173,6 +1174,13 @@ public InputStatus processInput() { return ++currentNumProcessCalls < totalProcessCalls ? InputStatus.MORE_AVAILABLE : InputStatus.END_OF_INPUT; } + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, + final long checkpointId) { + return FutureUtils.completedVoidFuture(); + } + @Override public void close() throws IOException { } @@ -1338,6 +1346,11 @@ public InputStatus processInput() throws Exception { return isFinished ? InputStatus.END_OF_INPUT : InputStatus.NOTHING_AVAILABLE; } + @Override + public CompletableFuture prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) { + return FutureUtils.completedVoidFuture(); + } + @Override public void close() throws IOException { }