Skip to content

Commit

Permalink
[FLINK-19312][network] Introduce BufferWritingResultPartition which w…
Browse files Browse the repository at this point in the history
…raps the ResultSubpartition related logic

In the current abstraction, buffers are written to and read from ResultSubpartitions, which is a hash-style data writing and reading implementation. This is in contrast to implementations where records are appended to a joint structure, from which the data is drawn after the write phase is finished, for example the sort-based partitioning which clusters data belonging to different channels by sorting channel index. In the future, sort-merge based ResultPartitionWriter will be implemented which can not share the current hash-style ResultSubpartition related logics. So this patch migrates these logics from ResultPartition to the new BufferWritingResultPartition, after which ResultPartition is free of ResultSubpartition and can be reused by the future sort-merge based ResultPartitionWriter implementation.
  • Loading branch information
wsry authored and AHeise committed Sep 24, 2020
1 parent 0c8a7c1 commit 88a07a9
Show file tree
Hide file tree
Showing 11 changed files with 227 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -49,13 +48,7 @@ private ResultPartitionMetrics(ResultPartition partition) {
* @return total number of queued buffers
*/
long refreshAndGetTotal() {
long total = 0;

for (ResultSubpartition part : partition.getAllPartitions()) {
total += part.unsynchronizedGetNumberOfQueuedBuffers();
}

return total;
return partition.getNumberOfQueuedBuffers();
}

/**
Expand All @@ -66,15 +59,15 @@ long refreshAndGetTotal() {
*/
int refreshAndGetMin() {
int min = Integer.MAX_VALUE;
int numSubpartitions = partition.getNumberOfSubpartitions();

ResultSubpartition[] allPartitions = partition.getAllPartitions();
if (allPartitions.length == 0) {
if (numSubpartitions == 0) {
// meaningful value when no channels exist:
return 0;
}

for (ResultSubpartition part : allPartitions) {
int size = part.unsynchronizedGetNumberOfQueuedBuffers();
for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
min = Math.min(min, size);
}

Expand All @@ -89,9 +82,10 @@ int refreshAndGetMin() {
*/
int refreshAndGetMax() {
int max = 0;
int numSubpartitions = partition.getNumberOfSubpartitions();

for (ResultSubpartition part : partition.getAllPartitions()) {
int size = part.unsynchronizedGetNumberOfQueuedBuffers();
for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
max = Math.max(max, size);
}

Expand All @@ -105,15 +99,7 @@ int refreshAndGetMax() {
* @return average number of queued buffers per sub-partition
*/
float refreshAndGetAvg() {
long total = 0;

ResultSubpartition[] allPartitions = partition.getAllPartitions();
for (ResultSubpartition part : allPartitions) {
int size = part.unsynchronizedGetNumberOfQueuedBuffers();
total += size;
}

return total / (float) allPartitions.length;
return partition.getNumberOfQueuedBuffers() / (float) partition.getNumberOfSubpartitions();
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* per sub-partition. This implementation hence requires at least as many files (file handles) and
* memory buffers as the parallelism of the target task that the data is shuffled to.
*/
public class BoundedBlockingResultPartition extends ResultPartition {
public class BoundedBlockingResultPartition extends BufferWritingResultPartition {

public BoundedBlockingResultPartition(
String owningTaskName,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.util.function.FunctionWithException;

import javax.annotation.Nullable;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A {@link ResultPartition} which writes buffers directly to {@link ResultSubpartition}s. This
* is in contrast to implementations where records are written to a joint structure, from which
* the subpartitions draw the data after the write phase is finished, for example the sort-based
* partitioning.
*
* <p>To avoid confusion: On the read side, all subpartitions return buffers (and backlog) to be
* transported through the network.
*/
public class BufferWritingResultPartition extends ResultPartition {

/** The subpartitions of this partition. At least one. */
protected final ResultSubpartition[] subpartitions;

public BufferWritingResultPartition(
String owningTaskName,
int partitionIndex,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
ResultSubpartition[] subpartitions,
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
@Nullable BufferCompressor bufferCompressor,
FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {

super(
owningTaskName,
partitionIndex,
partitionId,
partitionType,
subpartitions.length,
numTargetKeyGroups,
partitionManager,
bufferCompressor,
bufferPoolFactory);

this.subpartitions = checkNotNull(subpartitions);
}

public int getNumberOfQueuedBuffers() {
int totalBuffers = 0;

for (ResultSubpartition subpartition : subpartitions) {
totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
}

return totalBuffers;
}

public int getNumberOfQueuedBuffers(int targetSubpartition) {
checkArgument(targetSubpartition >= 0 && targetSubpartition < numSubpartitions);
return subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
}

@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
checkInProduceState();

return bufferPool.requestBufferBuilderBlocking(targetChannel);
}

@Override
public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
return bufferPool.requestBufferBuilder(targetChannel);
}

@Override
public boolean addBufferConsumer(
BufferConsumer bufferConsumer,
int subpartitionIndex) throws IOException {
checkNotNull(bufferConsumer);

ResultSubpartition subpartition;
try {
checkInProduceState();
subpartition = subpartitions[subpartitionIndex];
}
catch (Exception ex) {
bufferConsumer.close();
throw ex;
}

return subpartition.add(bufferConsumer);
}

@Override
public void flushAll() {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.flush();
}
}

@Override
public void flush(int targetSubpartition) {
subpartitions[targetSubpartition].flush();
}

@Override
public ResultSubpartitionView createSubpartitionView(
int subpartitionIndex,
BufferAvailabilityListener availabilityListener) throws IOException {
checkElementIndex(subpartitionIndex, numSubpartitions, "Subpartition not found.");
checkState(!isReleased(), "Partition released.");

ResultSubpartition subpartition = subpartitions[subpartitionIndex];
ResultSubpartitionView readView = subpartition.createReadView(availabilityListener);

LOG.debug("Created {}", readView);

return readView;
}

@Override
public void finish() throws IOException {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.finish();
}
super.finish();
}

@Override
protected void releaseInternal() {
// Release all subpartitions
for (ResultSubpartition subpartition : subpartitions) {
try {
subpartition.release();
}
// Catch this in order to ensure that release is called on all subpartitions
catch (Throwable t) {
LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
}
}
}

@VisibleForTesting
public ResultSubpartition[] getAllPartitions() {
return subpartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* {@link #onConsumedSubpartition(int)}) then the partition as a whole is disposed and all buffers are
* freed.
*/
public class PipelinedResultPartition extends ResultPartition
public class PipelinedResultPartition extends BufferWritingResultPartition
implements CheckpointedResultPartition, ChannelStateHolder {

/** The lock that guard release operations (which can be asynchronously propagated from the
Expand Down Expand Up @@ -134,12 +134,12 @@ void onConsumedSubpartition(int subpartitionIndex) {

@Override
public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
return (CheckpointedResultSubpartition) getAllPartitions()[subpartitionIndex];
return (CheckpointedResultSubpartition) subpartitions[subpartitionIndex];
}

@Override
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
for (ResultSubpartition subPar : getAllPartitions()) {
for (ResultSubpartition subPar : subpartitions) {
((PipelinedSubpartition) subPar).readRecoveredState(stateReader);
}

Expand Down
Loading

0 comments on commit 88a07a9

Please sign in to comment.