Skip to content

Commit

Permalink
[FLINK-21085][runtime] SingleInputGate supports acquiring unfinished …
Browse files Browse the repository at this point in the history
…channels
  • Loading branch information
gaoyunhaii authored and dawidwys committed Jun 18, 2021
1 parent 7281f2b commit a8b5649
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;

import java.io.IOException;
import java.util.List;

/** An {@link InputGate} with a specific index. */
public abstract class IndexedInputGate extends InputGate implements CheckpointableInput {
/** Returns the index of this input gate. Only supported on */
public abstract int getGateIndex();

/** Returns the list of channels that have not received EndOfPartitionEvent. */
public abstract List<InputChannelInfo> getUnfinishedChannels();

@Override
public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
for (int index = 0, numChannels = getNumberOfInputChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,22 @@ public int getGateIndex() {
return gateIndex;
}

@Override
public List<InputChannelInfo> getUnfinishedChannels() {
List<InputChannelInfo> unfinishedChannels =
new ArrayList<>(
numberOfInputChannels - channelsWithEndOfPartitionEvents.cardinality());
synchronized (inputChannelsWithData) {
for (int i = channelsWithEndOfPartitionEvents.nextClearBit(0);
i < numberOfInputChannels;
i = channelsWithEndOfPartitionEvents.nextClearBit(i + 1)) {
unfinishedChannels.add(getChannel(i).getChannelInfo());
}
}

return unfinishedChannels;
}

/**
* Returns the type of this input channel's consumed result partition.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -74,6 +75,11 @@ public int getGateIndex() {
return inputGate.getGateIndex();
}

@Override
public List<InputChannelInfo> getUnfinishedChannels() {
return inputGate.getUnfinishedChannels();
}

@Override
public boolean isFinished() {
return inputGate.isFinished();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -885,6 +887,48 @@ public void testSingleInputGateInfo() {
}
}

@Test
public void testGetUnfinishedChannels() throws IOException, InterruptedException {
SingleInputGate inputGate =
new SingleInputGateBuilder()
.setSingleInputGateIndex(1)
.setNumberOfChannels(3)
.build();
final TestInputChannel[] inputChannels =
new TestInputChannel[] {
new TestInputChannel(inputGate, 0),
new TestInputChannel(inputGate, 1),
new TestInputChannel(inputGate, 2)
};
inputGate.setInputChannels(inputChannels);

assertEquals(
Arrays.asList(
inputChannels[0].getChannelInfo(),
inputChannels[1].getChannelInfo(),
inputChannels[2].getChannelInfo()),
inputGate.getUnfinishedChannels());

inputChannels[1].readEndOfPartitionEvent();
inputGate.notifyChannelNonEmpty(inputChannels[1]);
inputGate.getNext();
assertEquals(
Arrays.asList(inputChannels[0].getChannelInfo(), inputChannels[2].getChannelInfo()),
inputGate.getUnfinishedChannels());

inputChannels[0].readEndOfPartitionEvent();
inputGate.notifyChannelNonEmpty(inputChannels[0]);
inputGate.getNext();
assertEquals(
Collections.singletonList(inputChannels[2].getChannelInfo()),
inputGate.getUnfinishedChannels());

inputChannels[2].readEndOfPartitionEvent();
inputGate.notifyChannelNonEmpty(inputChannels[2]);
inputGate.getNext();
assertEquals(Collections.emptyList(), inputGate.getUnfinishedChannels());
}

// ---------------------------------------------------------------------------------------------

private static Map<InputGateID, SingleInputGate> createInputGateWithLocalChannels(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -107,4 +108,9 @@ public void close() {}
public int getGateIndex() {
return gateIndex;
}

@Override
public List<InputChannelInfo> getUnfinishedChannels() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -166,4 +167,9 @@ public void close() {}
public int getGateIndex() {
return 0;
}

@Override
public List<InputChannelInfo> getUnfinishedChannels() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
Expand Down Expand Up @@ -254,5 +255,10 @@ public void close() {}
public int getGateIndex() {
return 0;
}

@Override
public List<InputChannelInfo> getUnfinishedChannels() {
return Collections.emptyList();
}
}
}

0 comments on commit a8b5649

Please sign in to comment.