Skip to content

Commit

Permalink
[FLINK-16587][checkpointing] Implement checkpoint barrier overtake in…
Browse files Browse the repository at this point in the history
… output partitions.

Extended BufferAvailabilityListener with notifyPriorityEvent, which allows LocalInputChannels to completely bypass buffer queue and send barrier directly to Unaligner.
  • Loading branch information
zhijiangW authored and pnowojski committed Apr 17, 2020
1 parent 825cb25 commit 9b0477f
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,16 @@ protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IO
}

public void broadcastEvent(AbstractEvent event) throws IOException {
broadcastEvent(event, false);
}

public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event)) {
for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
tryFinishCurrentBufferBuilder(targetChannel);

// Retain the buffer so that it can be recycled by each channel of targetPartition
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel, isPriorityEvent);
}

if (flushAlways) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,22 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
*
* @return true if operation succeeded and bufferConsumer was enqueued for consumption.
*/
boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException;
boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex, boolean isPriorityEvent) throws IOException;

/**
* Adds the bufferConsumer to the subpartition with the given index.
*
* <p>This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing
* it's resources.
*
* <p>To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one
* the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}.
*
* @return true if operation succeeded and bufferConsumer was enqueued for consumption.
*/
default boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
return addBufferConsumer(bufferConsumer, subpartitionIndex, false);
}

/**
* Returns the subpartition with the given index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -115,7 +116,7 @@ public boolean isReleased() {
}

@Override
public boolean add(BufferConsumer bufferConsumer) throws IOException {
public boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException {
if (isFinished()) {
bufferConsumer.close();
return false;
Expand Down Expand Up @@ -145,6 +146,11 @@ private void flushCurrentBuffer() throws IOException {
}
}

@Override
public List<Buffer> requestInflightBufferSnapshot() {
throw new UnsupportedOperationException("The batch job does not support unaligned checkpoint.");
}

private void writeAndCloseBufferConsumer(BufferConsumer bufferConsumer) throws IOException {
try {
final Buffer buffer = bufferConsumer.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.io.network.buffer.BufferConsumer;

import java.io.IOException;

/**
* Listener interface implemented by consumers of {@link ResultSubpartitionView}
* that want to be notified of availability of further buffers.
Expand All @@ -28,4 +32,13 @@ public interface BufferAvailabilityListener {
* Called whenever there might be new data available.
*/
void notifyDataAvailable();

/**
* Allows the listener to react to a priority event before it is added to the outgoing buffer queue.
*
* @return true if the event has been fully processed and should not be added to the buffer queue.
*/
default boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,19 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A pipelined in-memory only subpartition, which can be consumed once.
*
* <p>Whenever {@link #add(BufferConsumer)} adds a finished {@link BufferConsumer} or a second
* <p>Whenever {@link ResultSubpartition#add(BufferConsumer, boolean)} adds a finished {@link BufferConsumer} or a second
* {@link BufferConsumer} (in which case we will assume the first one finished), we will
* {@link PipelinedSubpartitionView#notifyDataAvailable() notify} a read view created via
* {@link #createReadView(BufferAvailabilityListener)} of new data availability. Except by calling
* {@link ResultSubpartition#createReadView(BufferAvailabilityListener)} of new data availability. Except by calling
* {@link #flush()} explicitly, we always only notify when the first finished buffer turns up and
* then, the reader has to drain the buffers via {@link #pollBuffer()} until its return value shows
* no more buffers being available. This results in a buffer queue which is either empty or has an
Expand Down Expand Up @@ -86,6 +88,9 @@ public class PipelinedSubpartition extends ResultSubpartition {
/** The total number of bytes (both data and event buffers). */
private long totalNumberOfBytes;

/** The collection of buffers which are spanned over by checkpoint barrier and needs to be persisted for snapshot. */
private final List<Buffer> inflightBufferSnapshot = new ArrayList<>();

// ------------------------------------------------------------------------

PipelinedSubpartition(int index, ResultPartition parent) {
Expand All @@ -101,7 +106,7 @@ public void initializeState(ChannelStateReader stateReader) throws IOException,

// check whether there are some states data filled in this time
if (bufferConsumer.isDataAvailable()) {
add(bufferConsumer);
add(bufferConsumer, false, false);
bufferBuilder.finish();
} else {
bufferConsumer.close();
Expand All @@ -110,17 +115,24 @@ public void initializeState(ChannelStateReader stateReader) throws IOException,
}

@Override
public boolean add(BufferConsumer bufferConsumer) {
return add(bufferConsumer, false);
public boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException {
if (isPriorityEvent) {
if (readView != null && readView.notifyPriorityEvent(bufferConsumer)) {
bufferConsumer.close();
return true;
}
return add(bufferConsumer, false, true);
}
return add(bufferConsumer, false, false);
}

@Override
public void finish() throws IOException {
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true);
add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true, false);
LOG.debug("{}: Finished {}.", parent.getOwningTaskName(), this);
}

private boolean add(BufferConsumer bufferConsumer, boolean finish) {
private boolean add(BufferConsumer bufferConsumer, boolean finish, boolean insertAsHead) {
checkNotNull(bufferConsumer);

final boolean notifyDataAvailable;
Expand All @@ -131,10 +143,10 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish) {
}

// Add the bufferConsumer and update the stats
buffers.add(bufferConsumer);
handleAddingBarrier(bufferConsumer, insertAsHead);
updateStatistics(bufferConsumer);
increaseBuffersInBacklog(bufferConsumer);
notifyDataAvailable = shouldNotifyDataAvailable() || finish;
notifyDataAvailable = insertAsHead || finish || shouldNotifyDataAvailable();

isFinished |= finish;
}
Expand All @@ -146,6 +158,32 @@ private boolean add(BufferConsumer bufferConsumer, boolean finish) {
return true;
}

private void handleAddingBarrier(BufferConsumer bufferConsumer, boolean insertAsHead) {
assert Thread.holdsLock(buffers);
if (insertAsHead) {
checkState(inflightBufferSnapshot.isEmpty(), "Supporting only one concurrent checkpoint in unaligned " +
"checkpoints");

// Meanwhile prepare the collection of in-flight buffers which would be fetched in the next step later.
for (BufferConsumer buffer : buffers) {
try (BufferConsumer bc = buffer.copy()) {
inflightBufferSnapshot.add(bc.build());
}
}

buffers.addFirst(bufferConsumer);
} else {
buffers.add(bufferConsumer);
}
}

@Override
public List<Buffer> requestInflightBufferSnapshot() {
List<Buffer> snapshot = new ArrayList<>(inflightBufferSnapshot);
inflightBufferSnapshot.clear();
return snapshot;
}

@Override
public void release() {
// view reference accessible outside the lock, but assigned inside the locked scope
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -96,4 +98,8 @@ public String toString() {
parent.getSubPartitionIndex(),
parent.parent.getPartitionId());
}

public boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException {
return availabilityListener.notifyPriorityEvent(eventBufferConsumer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,10 @@ public BufferBuilder tryGetBufferBuilder() throws IOException {
}

@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
public boolean addBufferConsumer(
BufferConsumer bufferConsumer,
int subpartitionIndex,
boolean isPriorityEvent) throws IOException {
checkNotNull(bufferConsumer);

ResultSubpartition subpartition;
Expand All @@ -232,7 +235,7 @@ public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartition
throw ex;
}

return subpartition.add(bufferConsumer);
return subpartition.add(bufferConsumer, isPriorityEvent);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;

import java.io.IOException;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -90,11 +91,33 @@ public void initializeState(ChannelStateReader stateReader) throws IOException,
*
* @param bufferConsumer
* the buffer to add (transferring ownership to this writer)
* @param isPriorityEvent
* @return true if operation succeeded and bufferConsumer was enqueued for consumption.
* @throws IOException
* thrown in case of errors while adding the buffer
*/
public abstract boolean add(BufferConsumer bufferConsumer) throws IOException;
public abstract boolean add(BufferConsumer bufferConsumer, boolean isPriorityEvent) throws IOException;

/**
* Adds the given buffer.
*
* <p>The request may be executed synchronously, or asynchronously, depending on the
* implementation.
*
* <p><strong>IMPORTANT:</strong> Before adding new {@link BufferConsumer} previously added must be in finished
* state. Because of the performance reasons, this is only enforced during the data reading.
*
* @param bufferConsumer
* the buffer to add (transferring ownership to this writer)
* @return true if operation succeeded and bufferConsumer was enqueued for consumption.
* @throws IOException
* thrown in case of errors while adding the buffer
*/
public boolean add(BufferConsumer bufferConsumer) throws IOException {
return add(bufferConsumer, false);
}

public abstract List<Buffer> requestInflightBufferSnapshot();

public abstract void flush();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
Expand Down Expand Up @@ -268,4 +271,25 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
public String toString() {
return "LocalInputChannel [" + partitionId + "]";
}

@Override
public boolean notifyPriorityEvent(BufferConsumer eventBufferConsumer) throws IOException {
if (inputGate.getBufferReceivedListener() == null) {
// in rare cases and very low checkpointing intervals, we may receive the first barrier, before setting
// up CheckpointedInputGate
return false;
}
Buffer buffer = eventBufferConsumer.build();
try {
CheckpointBarrier event = parseCheckpointBarrierOrNull(buffer);
if (event == null) {
throw new IllegalStateException("Currently only checkpoint barriers are known priority events");
}
inputGate.getBufferReceivedListener().notifyBarrierReceived(event, channelInfo);
} finally {
buffer.recycleBuffer();
}
// already processed
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ public void initializeState(ChannelStateReader stateReader) throws IOException,
}

@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex);
public boolean addBufferConsumer(
BufferConsumer bufferConsumer,
int subpartitionIndex,
boolean isPriorityEvent) throws IOException {
boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex, isPriorityEvent);
if (success) {
notifyPipelinedConsumers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public BufferBuilder tryGetBufferBuilder() throws IOException {
}

@Override
public synchronized boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
public synchronized boolean addBufferConsumer(
BufferConsumer bufferConsumer,
int targetChannel,
boolean isPriorityEvent) throws IOException {
checkState(targetChannel < getNumberOfSubpartitions());
bufferConsumers.add(bufferConsumer);
processBufferConsumers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public BufferBuilder tryGetBufferBuilder() throws IOException {
}

@Override
public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel) throws IOException {
public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel, boolean isPriorityEvent) {
return queues[targetChannel].add(buffer);
}
}
Expand Down Expand Up @@ -704,7 +704,7 @@ public BufferBuilder tryGetBufferBuilder() throws IOException {
}

@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel, boolean isPriorityEvent) {
// keep the buffer occupied.
produced.putIfAbsent(targetChannel, new ArrayList<>());
produced.get(targetChannel).add(bufferConsumer);
Expand Down
Loading

0 comments on commit 9b0477f

Please sign in to comment.