From 9b0477f3c5af6c08f828392bdf9c7c2b3e34bde9 Mon Sep 17 00:00:00 2001 From: Zhijiang Date: Tue, 31 Dec 2019 09:09:42 +0100 Subject: [PATCH] [FLINK-16587][checkpointing] Implement checkpoint barrier overtake in output partitions. Extended BufferAvailabilityListener with notifyPriorityEvent, which allows LocalInputChannels to completely bypass buffer queue and send barrier directly to Unaligner. --- .../io/network/api/writer/RecordWriter.java | 6 +- .../api/writer/ResultPartitionWriter.java | 17 +++- .../BoundedBlockingSubpartition.java | 8 +- .../partition/BufferAvailabilityListener.java | 13 +++ .../partition/PipelinedSubpartition.java | 56 ++++++++++--- .../partition/PipelinedSubpartitionView.java | 6 ++ .../io/network/partition/ResultPartition.java | 7 +- .../network/partition/ResultSubpartition.java | 25 +++++- .../partition/consumer/LocalInputChannel.java | 24 ++++++ ...tifyingResultPartitionWriterDecorator.java | 7 +- ...stractCollectingResultPartitionWriter.java | 5 +- .../network/api/writer/RecordWriterTest.java | 4 +- .../AwaitableBufferAvailablityListener.java | 21 +++++ .../partition/MockResultPartitionWriter.java | 7 +- ...PipelinedSubpartitionWithReadViewTest.java | 79 ++++++++++++++++++- .../runtime/io/RecordWriterOutput.java | 4 +- .../runtime/tasks/OperatorChain.java | 6 +- 17 files changed, 269 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 0895bf48234cb..a678e5a48cb34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -158,12 +158,16 @@ protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IO } public void broadcastEvent(AbstractEvent event) throws IOException { + broadcastEvent(event, false); + } + + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) { for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) { tryFinishCurrentBufferBuilder(targetChannel); // Retain the buffer so that it can be recycled by each channel of targetPartition - targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); + targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel, isPriorityEvent); } if (flushAlways) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 64be3cb5b07f2..efb35dcc6fdec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -80,7 +80,22 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid * * @return true if operation succeeded and bufferConsumer was enqueued for consumption. */ - boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException; + boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex, boolean isPriorityEvent) throws IOException; + + /** + * Adds the bufferConsumer to the subpartition with the given index. + * + *

This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing + * it's resources. + * + *

To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one + * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}. + * + * @return true if operation succeeded and bufferConsumer was enqueued for consumption. + */ + default boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException { + return addBufferConsumer(bufferConsumer, subpartitionIndex, false); + } /** * Returns the subpartition with the given index. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java index 626ba3b8f874a..abb9bc8138692 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java @@ -30,6 +30,7 @@ import java.io.File; import java.io.IOException; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -115,7 +116,7 @@ public boolean isReleased() { } @Override - public boolean add(BufferConsumer bufferConsumer) throws IOException { + public boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException { if (isFinished()) { bufferConsumer.close(); return false; @@ -145,6 +146,11 @@ private void flushCurrentBuffer() throws IOException { } } + @Override + public List requestInflightBufferSnapshot() { + throw new UnsupportedOperationException("The batch job does not support unaligned checkpoint."); + } + private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException { try { final Buffer buffer = bufferConsumer.build(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java index e78f99afee80c..6686558a246a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferAvailabilityListener.java @@ -18,6 +18,10 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; + +import java.io.IOException; + /** * Listener interface implemented by consumers of {@link ResultSubpartitionView} * that want to be notified of availability of further buffers. @@ -28,4 +32,13 @@ public interface BufferAvailabilityListener { * Called whenever there might be new data available. */ void notifyDataAvailable(); + + /** + * Allows the listener to react to a priority event before it is added to the outgoing buffer queue. + * + * @return true if the event has been fully processed and should not be added to the buffer queue. + */ + default boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException { + return false; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java index 335456e1686d4..eb2f852606c47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java @@ -35,6 +35,8 @@ import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -42,10 +44,10 @@ /** * A pipelined in-memory only subpartition, which can be consumed once. * - *

Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second + *

Whenever {@link ResultSubpartition#add(BufferConsumer, boolean)} adds a finished {@link BufferConsumer} or a second * {@link BufferConsumer} (in which case we will assume the first one finished), we will * {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via - * {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling + * {@link ResultSubpartition#createReadView(BufferAvailabilityListener)} of new data availability. Except by calling * {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and * then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows * no more buffers being available. This results in a buffer queue which is either empty or has an @@ -86,6 +88,9 @@ public class PipelinedSubpartition extends ResultSubpartition { /** The total number of bytes (both data and event buffers). */ private long totalNumberOfBytes; + /** The collection of buffers which are spanned over by checkpoint barrier and needs to be persisted for snapshot. */ + private final List inflightBufferSnapshot = new ArrayList<>(); + // ------------------------------------------------------------------------ PipelinedSubpartition(int index, ResultPartition parent) { @@ -101,7 +106,7 @@ public void initializeState(ChannelStateReader stateReader) throws IOException, // check whether there are some states data filled in this time if (bufferConsumer.isDataAvailable()) { - add(bufferConsumer); + add(bufferConsumer, false, false); bufferBuilder.finish(); } else { bufferConsumer.close(); @@ -110,17 +115,24 @@ public void initializeState(ChannelStateReader stateReader) throws IOException, } @Override - public boolean add(BufferConsumer bufferConsumer) { - return add(bufferConsumer, false); + public boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException { + if (isPriorityEvent) { + if (readView != null && readView.notifyPriorityEvent(bufferConsumer)) { + bufferConsumer.close(); + return true; + } + return add(bufferConsumer, false, true); + } + return add(bufferConsumer, false, false); } @Override public void finish() throws IOException { - add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true); + add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true, false); LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this); } - private boolean add(BufferConsumer bufferConsumer, boolean finish) { + private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean insertAsHead) { checkNotNull(bufferConsumer); final boolean notifyDataAvailable; @@ -131,10 +143,10 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish) { } // Add the bufferConsumer and update the stats - buffers.add(bufferConsumer); + handleAddingBarrier(bufferConsumer, insertAsHead); updateStatistics(bufferConsumer); increaseBuffersInBacklog(bufferConsumer); - notifyDataAvailable = shouldNotifyDataAvailable() || finish; + notifyDataAvailable = insertAsHead || finish || shouldNotifyDataAvailable(); isFinished |= finish; } @@ -146,6 +158,32 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish) { return true; } + private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) { + assert Thread.holdsLock(buffers); + if (insertAsHead) { + checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " + + "checkpoints"); + + // Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later. + for (BufferConsumer buffer : buffers) { + try (BufferConsumer bc = buffer.copy()) { + inflightBufferSnapshot.add(bc.build()); + } + } + + buffers.addFirst(bufferConsumer); + } else { + buffers.add(bufferConsumer); + } + } + + @Override + public List requestInflightBufferSnapshot() { + List snapshot = new ArrayList<>(inflightBufferSnapshot); + inflightBufferSnapshot.clear(); + return snapshot; + } + @Override public void release() { // view reference accessible outside the lock, but assigned inside the locked scope diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java index bf37936b1fd9e..e47578438dc93 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import javax.annotation.Nullable; +import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -96,4 +98,8 @@ public String toString() { parent.getSubPartitionIndex(), parent.parent.getPartitionId()); } + + public boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException { + return availabilityListener.notifyPriorityEvent(eventBufferConsumer); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index e4c44f0a61326..a89b56453ca53 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -219,7 +219,10 @@ public BufferBuilder tryGetBufferBuilder() throws IOException { } @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException { + public boolean addBufferConsumer( + BufferConsumer bufferConsumer, + int subpartitionIndex, + boolean isPriorityEvent) throws IOException { checkNotNull(bufferConsumer); ResultSubpartition subpartition; @@ -232,7 +235,7 @@ public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartition throw ex; } - return subpartition.add(bufferConsumer); + return subpartition.add(bufferConsumer, isPriorityEvent); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java index cf48745af2af2..9ae098dd0f095 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import java.io.IOException; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -90,11 +91,33 @@ public void initializeState(ChannelStateReader stateReader) throws IOException, * * @param bufferConsumer * the buffer to add (transferring ownership to this writer) + * @param isPriorityEvent * @return true if operation succeeded and bufferConsumer was enqueued for consumption. * @throws IOException * thrown in case of errors while adding the buffer */ - public abstract boolean add(BufferConsumer bufferConsumer) throws IOException; + public abstract boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException; + + /** + * Adds the given buffer. + * + *

The request may be executed synchronously, or asynchronously, depending on the + * implementation. + * + *

IMPORTANT: Before adding new {@link BufferConsumer} previously added must be in finished + * state. Because of the performance reasons, this is only enforced during the data reading. + * + * @param bufferConsumer + * the buffer to add (transferring ownership to this writer) + * @return true if operation succeeded and bufferConsumer was enqueued for consumption. + * @throws IOException + * thrown in case of errors while adding the buffer + */ + public boolean add(BufferConsumer bufferConsumer) throws IOException { + return add(bufferConsumer, false); + } + + public abstract List requestInflightBufferSnapshot(); public abstract void flush(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java index 63d3ed9887dca..70d4a93cf94c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java @@ -21,6 +21,9 @@ import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.network.TaskEventPublisher; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; @@ -268,4 +271,25 @@ public int unsynchronizedGetNumberOfQueuedBuffers() { public String toString() { return "LocalInputChannel [" + partitionId + "]"; } + + @Override + public boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException { + if (inputGate.getBufferReceivedListener() == null) { + // in rare cases and very low checkpointing intervals, we may receive the first barrier, before setting + // up CheckpointedInputGate + return false; + } + Buffer buffer = eventBufferConsumer.build(); + try { + CheckpointBarrier event = parseCheckpointBarrierOrNull(buffer); + if (event == null) { + throw new IllegalStateException("Currently only checkpoint barriers are known priority events"); + } + inputGate.getBufferReceivedListener().notifyBarrierReceived(event, channelInfo); + } finally { + buffer.recycleBuffer(); + } + // already processed + return true; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java index 35c1e501895a5..6dd507bd5eadd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java @@ -106,8 +106,11 @@ public void initializeState(ChannelStateReader stateReader) throws IOException, } @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException { - boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex); + public boolean addBufferConsumer( + BufferConsumer bufferConsumer, + int subpartitionIndex, + boolean isPriorityEvent) throws IOException { + boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex, isPriorityEvent); if (success) { notifyPipelinedConsumers(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java index 4751dc29bdd85..7b12a987eb7b7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java @@ -55,7 +55,10 @@ public BufferBuilder tryGetBufferBuilder() throws IOException { } @Override - public synchronized boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { + public synchronized boolean addBufferConsumer( + BufferConsumer bufferConsumer, + int targetChannel, + boolean isPriorityEvent) throws IOException { checkState(targetChannel < getNumberOfSubpartitions()); bufferConsumers.add(bufferConsumer); processBufferConsumers(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index ad74aa26682f7..af9a270d887f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -647,7 +647,7 @@ public BufferBuilder tryGetBufferBuilder() throws IOException { } @Override - public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel) throws IOException { + public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel, boolean isPriorityEvent) { return queues[targetChannel].add(buffer); } } @@ -704,7 +704,7 @@ public BufferBuilder tryGetBufferBuilder() throws IOException { } @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { + public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel, boolean isPriorityEvent) { // keep the buffer occupied. produced.putIfAbsent(targetChannel, new ArrayList<>()); produced.get(targetChannel).add(bufferConsumer); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java index 6cf9d64f1b3bc..9372d4e3c66bd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/AwaitableBufferAvailablityListener.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; + +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** @@ -27,6 +30,10 @@ class AwaitableBufferAvailablityListener implements BufferAvailabilityListener { private final AtomicLong numNotifications = new AtomicLong(); + private final AtomicLong numPriorityEvents = new AtomicLong(); + + private final AtomicBoolean consumePriorityEvents = new AtomicBoolean(); + @Override public void notifyDataAvailable() { numNotifications.getAndIncrement(); @@ -36,6 +43,20 @@ public long getNumNotifications() { return numNotifications.get(); } + @Override + public boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) { + numPriorityEvents.getAndIncrement(); + return consumePriorityEvents.get(); + } + + public long getNumPriorityEvents() { + return numPriorityEvents.get(); + } + + public void consumePriorityEvents() { + consumePriorityEvents.set(true); + } + public void resetNotificationCounters() { numNotifications.set(0L); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java index e6f55a259cc66..c2ee06fcc792b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java @@ -59,7 +59,12 @@ public int getNumTargetKeyGroups() { } @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { + public final boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException { + return addBufferConsumer(bufferConsumer, subpartitionIndex, false); + } + + @Override + public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel, boolean isPriorityEvent) throws IOException { bufferConsumer.close(); return true; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java index 8fe9cd40d500c..21417e3192ba4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java @@ -19,10 +19,16 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.NoOpFileChannelManager; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.junit.After; import org.junit.Assert; @@ -34,6 +40,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; @@ -90,14 +99,14 @@ public void tearDown() { } @Test(expected = IllegalStateException.class) - public void testAddTwoNonFinishedBuffer() { + public void testAddTwoNonFinishedBuffer() throws IOException { subpartition.add(createBufferBuilder().createBufferConsumer()); subpartition.add(createBufferBuilder().createBufferConsumer()); assertNull(readView.getNextBuffer()); } @Test - public void testAddEmptyNonFinishedBuffer() { + public void testAddEmptyNonFinishedBuffer() throws IOException { assertEquals(0, availablityListener.getNumNotifications()); BufferBuilder bufferBuilder = createBufferBuilder(); @@ -300,6 +309,72 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception { assertEquals(1, availablityListener.getNumNotifications()); } + @Test + public void testBarrierOvertaking() throws Exception { + subpartition.add(createFilledFinishedBufferConsumer(1)); + assertEquals(0, availablityListener.getNumNotifications()); + assertEquals(0, availablityListener.getNumPriorityEvents()); + + subpartition.add(createFilledFinishedBufferConsumer(2)); + assertEquals(1, availablityListener.getNumNotifications()); + assertEquals(0, availablityListener.getNumPriorityEvents()); + + subpartition.add(createFilledFinishedBufferConsumer(3)); + assertEquals(1, availablityListener.getNumNotifications()); + assertEquals(0, availablityListener.getNumPriorityEvents()); + + CheckpointOptions options = new CheckpointOptions( + CheckpointType.CHECKPOINT, + new CheckpointStorageLocationReference(new byte[]{0, 1, 2})); + BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options)); + subpartition.add(barrierBuffer, true); + assertEquals(2, availablityListener.getNumNotifications()); + assertEquals(1, availablityListener.getNumPriorityEvents()); + + List inflight = subpartition.requestInflightBufferSnapshot(); + assertEquals(Arrays.asList(1, 2, 3), inflight.stream().map(Buffer::getSize).collect(Collectors.toList())); + inflight.forEach(Buffer::recycleBuffer); + + assertNextEvent(readView, barrierBuffer.getWrittenBytes(), CheckpointBarrier.class, true, 2, false, true); + assertNextBuffer(readView, 1, true, 1, false, true); + assertNextBuffer(readView, 2, false, 0, false, true); + assertNextBuffer(readView, 3, false, 0, false, true); + assertNoNextBuffer(readView); + } + + @Test + public void testBarrierConsumedByAvailabilityListener() throws Exception { + availablityListener.consumePriorityEvents(); + + subpartition.add(createFilledFinishedBufferConsumer(1)); + assertEquals(0, availablityListener.getNumNotifications()); + assertEquals(0, availablityListener.getNumPriorityEvents()); + + subpartition.add(createFilledFinishedBufferConsumer(2)); + assertEquals(1, availablityListener.getNumNotifications()); + assertEquals(0, availablityListener.getNumPriorityEvents()); + + subpartition.add(createFilledFinishedBufferConsumer(3)); + assertEquals(1, availablityListener.getNumNotifications()); + assertEquals(0, availablityListener.getNumPriorityEvents()); + + CheckpointOptions options = new CheckpointOptions( + CheckpointType.CHECKPOINT, + new CheckpointStorageLocationReference(new byte[]{0, 1, 2})); + BufferConsumer barrierBuffer = EventSerializer.toBufferConsumer(new CheckpointBarrier(0, 0, options)); + subpartition.add(barrierBuffer, true); + assertEquals(1, availablityListener.getNumNotifications()); + assertEquals(1, availablityListener.getNumPriorityEvents()); + + List inflight = subpartition.requestInflightBufferSnapshot(); + assertEquals(Arrays.asList(), inflight.stream().map(Buffer::getSize).collect(Collectors.toList())); + + assertNextBuffer(readView, 1, true, 1, false, true); + assertNextBuffer(readView, 2, false, 0, false, true); + assertNextBuffer(readView, 3, false, 0, false, true); + assertNoNextBuffer(readView); + } + @Test public void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception { testBacklogConsistentWithNumberOfConsumableBuffers(false, false); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java index 0624f0fbd7e2b..e55c4638e4ec2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java @@ -148,8 +148,8 @@ public void emitLatencyMarker(LatencyMarker latencyMarker) { } } - public void broadcastEvent(AbstractEvent event) throws IOException { - recordWriter.broadcastEvent(event); + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + recordWriter.broadcastEvent(event, isPriorityEvent); } public void flush() throws IOException { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 04731a7754b45..43b1690d57bad 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -248,8 +248,12 @@ public void toggleStreamStatus(StreamStatus status) { } public void broadcastEvent(AbstractEvent event) throws IOException { + broadcastEvent(event, false); + } + + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { for (RecordWriterOutput streamOutput : streamOutputs) { - streamOutput.broadcastEvent(event); + streamOutput.broadcastEvent(event, isPriorityEvent); } }