Skip to content

Commit

Permalink
[FLINK-18094][network] Add InputGate#getChannelInfos for easier testing.
Browse files Browse the repository at this point in the history
In the following commits, this method will be used to fetch information about all channels without explicitly needing to access the channels. Thus, for tests mocks just need to return meaningful InputChannelInfos instead of actually creating the respective channels.
  • Loading branch information
Arvid Heise authored and pnowojski committed Jun 17, 2020
1 parent 03beab8 commit 485688f
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -117,6 +121,15 @@ public CompletableFuture<?> getAvailableFuture() {
*/
public abstract InputChannel getChannel(int channelIndex);

/**
* Returns the channel infos of this gate.
*/
public List<InputChannelInfo> getChannelInfos() {
return IntStream.range(0, getNumberOfInputChannels())
.mapToObj(index -> getChannel(index).getChannelInfo())
.collect(Collectors.toList());
}

/**
* Simple pojo for INPUT, DATA and moreAvailable.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
Expand All @@ -34,6 +35,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -231,6 +233,10 @@ public InputChannel getChannel(int channelIndex) {
return inputGate.getChannel(channelIndex);
}

public List<InputChannelInfo> getChannelInfos() {
return inputGate.getChannelInfos();
}

@VisibleForTesting
CheckpointBarrierHandler getCheckpointBarrierHandler() {
return barrierHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Mock {@link IndexedInputGate}.
Expand Down Expand Up @@ -72,6 +76,13 @@ public InputChannel getChannel(int channelIndex) {
throw new UnsupportedOperationException();
}

@Override
public List<InputChannelInfo> getChannelInfos() {
return IntStream.range(0, numberOfInputChannels)
.mapToObj(channelIndex -> new InputChannelInfo(gateIndex, channelIndex))
.collect(Collectors.toList());
}

@Override
public boolean isFinished() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
Expand All @@ -33,6 +34,8 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Mock {@link InputGate}.
Expand Down Expand Up @@ -88,6 +91,13 @@ public InputChannel getChannel(int channelIndex) {
throw new UnsupportedOperationException();
}

@Override
public List<InputChannelInfo> getChannelInfos() {
return IntStream.range(0, numberOfChannels)
.mapToObj(channelIndex -> new InputChannelInfo(0, channelIndex))
.collect(Collectors.toList());
}

@Override
public boolean isFinished() {
return finishAfterLastBuffer && bufferOrEvents.isEmpty();
Expand Down

0 comments on commit 485688f

Please sign in to comment.