Skip to content

Commit

Permalink
[FLINK-13013][network] Request partitions during InputGate#setup
Browse files Browse the repository at this point in the history
Before partitions were being requested on first polling/getting next buffer
which was causing a couple of issues:
- it was a little bit confusing
- after first requestPartitions call, this was causing unnecessary synchronisation overhead
- this was preventing data notifications to come through and isAvailable() future was always not
  completed before the first attempt to read the data from the input gate

This commit moves requesting partitions to InputGate#setup solving those issues.
  • Loading branch information
pnowojski committed Jul 9, 2019
1 parent 282646e commit c27d0d6
Show file tree
Hide file tree
Showing 19 changed files with 252 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,5 @@ protected static class InputWithData<INPUT, DATA> {
/**
* Setup gate, potentially heavy-weight, blocking operation comparing to just creation.
*/
public abstract void setup() throws IOException;
public abstract void setup() throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public RemoteChannelStateChecker(ResultPartitionID resultPartitionId, String tas

public boolean isProducerReadyOrAbortConsumption(ResponseHandle responseHandle) {
Either<ExecutionState, Throwable> result = responseHandle.getProducerExecutionState();
if (responseHandle.getConsumerExecutionState() != ExecutionState.RUNNING) {
ExecutionState consumerExecutionState = responseHandle.getConsumerExecutionState();
if (!isConsumerStateValidForConsumption(consumerExecutionState)) {
LOG.debug(
"Ignore a partition producer state notification for task {}, because it's not running.",
taskNameWithSubtask);
Expand All @@ -64,6 +65,12 @@ else if (result.isLeft() || result.right() instanceof TimeoutException) {
return false;
}

private static boolean isConsumerStateValidForConsumption(
ExecutionState consumerExecutionState) {
return consumerExecutionState == ExecutionState.RUNNING ||
consumerExecutionState == ExecutionState.DEPLOYING;
}

private boolean isProducerConsumerReady(ResponseHandle responseHandle) {
ExecutionState producerState = getProducerState(responseHandle);
return producerState == ExecutionState.SCHEDULED ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public SingleInputGate(
}

@Override
public void setup() throws IOException {
public void setup() throws IOException, InterruptedException {
checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
if (isCreditBased) {
// assign exclusive buffers to input channels directly and use the rest for floating buffers
Expand All @@ -211,6 +211,8 @@ public void setup() throws IOException {

BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);

requestPartitions();
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -481,7 +483,6 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
throw new IllegalStateException("Released");
}

requestPartitions();
Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IO
return Optional.empty();
}

// Make sure to request the partitions, if they have not been requested before.
requestPartitions();

Optional<InputWithData<InputGate, BufferOrEvent>> next = waitAndGetNextData(blocking);
if (!next.isPresent()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public boolean isFinished() {
}

@Override
public void setup() throws IOException {
public void setup() throws IOException, InterruptedException {
inputGate.setup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,12 +834,14 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {

@VisibleForTesting
public static void setupPartitionsAndGates(
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException {
ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException, InterruptedException {

for (ResultPartitionWriter partition : producedPartitions) {
partition.setup();
}

// InputGates must be initialized after the partitions, since during InputGate#setup
// we are requesting partitions
for (InputGate gate : inputGates) {
gate.setup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,23 +155,19 @@ private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize, boolean asse
SingleInputGate ig4 = createSingleInputGate(network, ResultPartitionType.PIPELINED_BOUNDED, 4);
final SingleInputGate[] inputGates = new SingleInputGate[] {ig1, ig2, ig3, ig4};

// set up remote input channels for the exclusive buffers of the credit-based flow control
// (note that this does not obey the partition types which is ok for the scope of the test)
if (enableCreditBasedFlowControl) {
createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig4, 0, rp2, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig4, 0, rp3, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig4, 0, rp4, connManager, network.getNetworkBufferPool());

createRemoteInputChannel(ig1, 1, rp1, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig1, 1, rp4, connManager, network.getNetworkBufferPool());

createRemoteInputChannel(ig2, 1, rp2, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig2, 2, rp4, connManager, network.getNetworkBufferPool());

createRemoteInputChannel(ig3, 1, rp3, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());
}
createRemoteInputChannel(ig4, 0, rp1, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig4, 0, rp2, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig4, 0, rp3, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig4, 0, rp4, connManager, network.getNetworkBufferPool());

createRemoteInputChannel(ig1, 1, rp1, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig1, 1, rp4, connManager, network.getNetworkBufferPool());

createRemoteInputChannel(ig2, 1, rp2, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig2, 2, rp4, connManager, network.getNetworkBufferPool());

createRemoteInputChannel(ig3, 1, rp3, connManager, network.getNetworkBufferPool());
createRemoteInputChannel(ig3, 3, rp4, connManager, network.getNetworkBufferPool());

Task.setupPartitionsAndGates(resultPartitions, inputGates);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// We have it in this package because we could not mock the methods otherwise

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.core.memory.MemorySegment;

import java.io.IOException;

/**
* No-op implementation of {@link BufferPool}.
*/
public class NoOpBufferPool implements BufferPool {

@Override
public void lazyDestroy() {
}

@Override
public Buffer requestBuffer() throws IOException {
throw new UnsupportedOperationException();
}

@Override
public Buffer requestBufferBlocking() throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}

@Override
public boolean addBufferListener(BufferListener listener) {
throw new UnsupportedOperationException();
}

@Override
public boolean isDestroyed() {
throw new UnsupportedOperationException();
}

@Override
public int getNumberOfRequiredMemorySegments() {
throw new UnsupportedOperationException();
}

@Override
public int getMaxNumberOfMemorySegments() {
throw new UnsupportedOperationException();
}

@Override
public int getNumBuffers() {
throw new UnsupportedOperationException();
}

@Override
public void setNumBuffers(int numBuffers) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public int getNumberOfAvailableMemorySegments() {
throw new UnsupportedOperationException();
}

@Override
public int bestEffortGetNumOfUsedBuffers() {
throw new UnsupportedOperationException();
}

@Override
public void recycle(MemorySegment memorySegment) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
Expand Down Expand Up @@ -195,4 +196,24 @@ public Collection<MemorySegment> requestMemorySegments() {
public void recycleMemorySegments(Collection<MemorySegment> segments) {
}
}

/**
* {@link MemorySegmentProvider} that provides unpooled {@link MemorySegment}s.
*/
public static class UnpooledMemorySegmentProvider implements MemorySegmentProvider {
private final int pageSize;

public UnpooledMemorySegmentProvider(int pageSize) {
this.pageSize = pageSize;
}

@Override
public Collection<MemorySegment> requestMemorySegments() {
return Collections.singletonList(MemorySegmentFactory.allocateUnpooledSegment(pageSize));
}

@Override
public void recycleMemorySegments(Collection<MemorySegment> segments) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NoOpBufferPool;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.UnpooledMemorySegmentProvider;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
Expand All @@ -47,7 +50,6 @@
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createLocalInputChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createResultPartitionManager;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -93,6 +95,8 @@ public void testFairConsumptionLocalChannelsPreFilled() throws Exception {
createLocalInputChannel(gate, i, resultPartitionManager);
}

gate.setup();

// read all the buffers and the EOF event
for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; --i) {
assertNotNull(gate.getNext());
Expand Down Expand Up @@ -141,6 +145,8 @@ public void testFairConsumptionLocalChannels() throws Exception {
// seed one initial buffer
sources[12].add(bufferConsumer.copy());

gate.setup();

// read all the buffers and the EOF event
for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
assertNotNull(gate.getNext());
Expand Down Expand Up @@ -190,6 +196,8 @@ public void testFairConsumptionRemoteChannelsPreFilled() throws Exception {
channel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), buffersPerChannel, -1);
}

gate.setup();

// read all the buffers and the EOF event
for (int i = numberOfChannels * (buffersPerChannel + 1); i > 0; --i) {
assertNotNull(gate.getNext());
Expand Down Expand Up @@ -233,6 +241,8 @@ public void testFairConsumptionRemoteChannels() throws Exception {
channels[11].onBuffer(mockBuffer, 0, -1);
channelSequenceNums[11]++;

gate.setup();

// read all the buffers and the EOF event
for (int i = 0; i < numberOfChannels * buffersPerChannel; i++) {
assertNotNull(gate.getNext());
Expand Down Expand Up @@ -308,9 +318,8 @@ private void fillRandom(
// ------------------------------------------------------------------------

private static class FairnessVerifyingInputGate extends SingleInputGate {
private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY = () -> {
throw new UnsupportedOperationException();
};
private static final SupplierWithException<BufferPool, IOException> STUB_BUFFER_POOL_FACTORY =
NoOpBufferPool::new;

private final ArrayDeque<InputChannel> channelsWithData;

Expand Down Expand Up @@ -368,4 +377,16 @@ private void ensureUnique(Collection<InputChannel> channels) {
uniquenessChecker.clear();
}
}

public static RemoteInputChannel createRemoteInputChannel(
SingleInputGate inputGate,
int channelIndex,
ConnectionManager connectionManager) {

return InputChannelBuilder.newBuilder()
.setChannelIndex(channelIndex)
.setConnectionManager(connectionManager)
.setMemorySegmentProvider(new UnpooledMemorySegmentProvider(32 * 1024))
.buildRemoteAndSetToGate(inputGate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public static ResultPartition createPartition(
ResultPartitionType partitionType,
int numChannels) {
return new ResultPartitionBuilder()
.setResultPartitionManager(environment.getResultPartitionManager())
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
.setResultPartitionType(partitionType)
.setNumberOfSubpartitions(numChannels)
Expand All @@ -75,6 +76,7 @@ public static ResultPartition createPartition(
ResultPartitionType partitionType,
int numChannels) {
return new ResultPartitionBuilder()
.setResultPartitionManager(environment.getResultPartitionManager())
.setupBufferPoolFactoryFromNettyShuffleEnvironment(environment)
.setFileChannelManager(channelManager)
.setResultPartitionType(partitionType)
Expand Down
Loading

0 comments on commit c27d0d6

Please sign in to comment.