Skip to content

Commit

Permalink
Merge pull request apache#8361 from pnowojski/f12434
Browse files Browse the repository at this point in the history
[FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
  • Loading branch information
pnowojski committed May 10, 2019
1 parent c4cb904 commit d3fd7a6
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* An input gate consumes one or more partitions of a single produced intermediate result.
Expand Down Expand Up @@ -65,33 +68,65 @@
* will have an input gate attached to it. This will provide its input, which will consist of one
* subpartition from each partition of the intermediate result.
*/
public interface InputGate extends AutoCloseable {
public abstract class InputGate implements AutoCloseable {

public static final CompletableFuture<?> AVAILABLE = CompletableFuture.completedFuture(null);

protected CompletableFuture<?> isAvailable = new CompletableFuture<>();

int getNumberOfInputChannels();
public abstract int getNumberOfInputChannels();

String getOwningTaskName();
public abstract String getOwningTaskName();

boolean isFinished();
public abstract boolean isFinished();

void requestPartitions() throws IOException, InterruptedException;
public abstract void requestPartitions() throws IOException, InterruptedException;

/**
* Blocking call waiting for next {@link BufferOrEvent}.
*
* @return {@code Optional.empty()} if {@link #isFinished()} returns true.
*/
Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException;
public abstract Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException;

/**
* Poll the {@link BufferOrEvent}.
*
* @return {@code Optional.empty()} if there is no data to return or if {@link #isFinished()} returns true.
*/
Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException;
public abstract Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException;

public abstract void sendTaskEvent(TaskEvent event) throws IOException;

public abstract int getPageSize();

/**
* @return a future that is completed if there are more records available. If there more records
* available immediately, {@link #AVAILABLE} should be returned.
*/
public CompletableFuture<?> isAvailable() {
return isAvailable;
}

void sendTaskEvent(TaskEvent event) throws IOException;
protected void resetIsAvailable() {
// try to avoid volatile access in isDone()}
if (isAvailable == AVAILABLE || isAvailable.isDone()) {
isAvailable = new CompletableFuture<>();
}
}

void registerListener(InputGateListener listener);
/**
* Simple pojo for INPUT, DATA and moreAvailable.
*/
protected static class InputWithData<INPUT, DATA> {
protected final INPUT input;
protected final DATA data;
protected final boolean moreAvailable;

int getPageSize();
InputWithData(INPUT input, DATA data, boolean moreAvailable) {
this.input = checkNotNull(input);
this.data = checkNotNull(data);
this.moreAvailable = moreAvailable;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -101,7 +102,7 @@
* in two partitions (Partition 1 and 2). Each of these partitions is further partitioned into two
* subpartitions -- one for each parallel reduce subtask.
*/
public class SingleInputGate implements InputGate {
public class SingleInputGate extends InputGate {

private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);

Expand Down Expand Up @@ -172,9 +173,6 @@ public class SingleInputGate implements InputGate {
/** Flag indicating whether all resources have been released. */
private volatile boolean isReleased;

/** Registered listener to forward buffer notifications to. */
private volatile InputGateListener inputGateListener;

private final List<TaskEvent> pendingEvents = new ArrayList<>();

private int numberOfUninitializedChannels;
Expand Down Expand Up @@ -531,12 +529,21 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}

requestPartitions();
Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
}

InputChannel currentChannel;
boolean moreAvailable;
Optional<BufferAndAvailability> result = Optional.empty();
InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
return Optional.of(transformToBufferOrEvent(
inputWithData.data.buffer(),
inputWithData.moreAvailable,
inputWithData.input));
}

do {
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
throws IOException, InterruptedException {
while (true) {
synchronized (inputChannelsWithData) {
while (inputChannelsWithData.size() == 0) {
if (isReleased) {
Expand All @@ -547,30 +554,43 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
inputChannelsWithData.wait();
}
else {
resetIsAvailable();
return Optional.empty();
}
}

currentChannel = inputChannelsWithData.remove();
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
moreAvailable = !inputChannelsWithData.isEmpty();
}
InputChannel inputChannel = inputChannelsWithData.remove();

result = currentChannel.getNextBuffer();
} while (!result.isPresent());
Optional<BufferAndAvailability> result = inputChannel.getNextBuffer();

// this channel was now removed from the non-empty channels queue
// we re-add it in case it has more data, because in that case no "non-empty" notification
// will come for that channel
if (result.get().moreAvailable()) {
queueChannel(currentChannel);
moreAvailable = true;
if (result.isPresent() && result.get().moreAvailable()) {
// enqueue the inputChannel at the end to avoid starvation
inputChannelsWithData.add(inputChannel);
} else {
enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
}

if (inputChannelsWithData.isEmpty()) {
resetIsAvailable();
}

if (result.isPresent()) {
return Optional.of(new InputWithData<>(
inputChannel,
result.get(),
!inputChannelsWithData.isEmpty()));
}
}
}
}

final Buffer buffer = result.get().buffer();
private BufferOrEvent transformToBufferOrEvent(
Buffer buffer,
boolean moreAvailable,
InputChannel currentChannel) throws IOException, InterruptedException {
numBytesIn.inc(buffer.getSizeUnsafe());
if (buffer.isBuffer()) {
return Optional.of(new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable));
return new BufferOrEvent(buffer, currentChannel.getChannelIndex(), moreAvailable);
}
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
Expand All @@ -589,11 +609,10 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
}

currentChannel.notifySubpartitionConsumed();

currentChannel.releaseAllResources();
}

return Optional.of(new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable));
return new BufferOrEvent(event, currentChannel.getChannelIndex(), moreAvailable);
}
}

Expand All @@ -614,15 +633,6 @@ public void sendTaskEvent(TaskEvent event) throws IOException {
// Channel notifications
// ------------------------------------------------------------------------

@Override
public void registerListener(InputGateListener inputGateListener) {
if (this.inputGateListener == null) {
this.inputGateListener = inputGateListener;
} else {
throw new IllegalStateException("Multiple listeners");
}
}

void notifyChannelNonEmpty(InputChannel channel) {
queueChannel(checkNotNull(channel));
}
Expand All @@ -634,6 +644,8 @@ void triggerPartitionStateCheck(ResultPartitionID partitionId) {
private void queueChannel(InputChannel channel) {
int availableChannels;

CompletableFuture<?> toNotify = null;

synchronized (inputChannelsWithData) {
if (enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
return;
Expand All @@ -645,14 +657,13 @@ private void queueChannel(InputChannel channel) {

if (availableChannels == 0) {
inputChannelsWithData.notifyAll();
toNotify = isAvailable;
isAvailable = AVAILABLE;
}
}

if (availableChannels == 0) {
InputGateListener listener = inputGateListener;
if (listener != null) {
listener.notifyInputGateNonEmpty(this);
}
if (toNotify != null) {
toNotify.complete(null);
}
}

Expand Down
Loading

0 comments on commit d3fd7a6

Please sign in to comment.