Skip to content

Commit

Permalink
[FLINK-19297][network] Make ResultPartitionWriter record-oriented
Browse files Browse the repository at this point in the history
Currently, the ResultPartitionWriter is buffer-oriented, that is, RecordWriter can only add buffers of different channels to ResultPartitionWriter and the buffer boundary serves as a nature boundary of data belonging to different channels. However, this abstraction is not flexible enough to handle new implementations like sort-based partitioning where records are appended a joint structure shared by all channels and sorting is used to cluster data belonging to different channels. This patch makes ResultPartitionWriter record-oriented by adding new record-oriented interfaces to and removing the old buffer-oriented interfaces from ResultPartitionWriter. After this change, the future sort-merge based ResultPartitionWriter can be implemented easily.
  • Loading branch information
wsry authored and AHeise committed Sep 24, 2020
1 parent 88a07a9 commit 900a896
Show file tree
Hide file tree
Showing 35 changed files with 785 additions and 1,423 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,20 @@

package org.apache.flink.runtime.io.network.api.writer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;

import javax.annotation.Nullable;

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A special record-oriented runtime result writer only for broadcast mode.
*
* <p>The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder}
* for all the channels. Then the serialization results need be copied only once to this buffer which would be
* shared for all the channels in a more efficient way.
* <p>The BroadcastRecordWriter extends the {@link RecordWriter} and emits records to all channels for
* regular {@link #emit(IOReadableWritable)}.
*
* @param <T> the type of the record that can be emitted with this record writer
*/
public final class BroadcastRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {

/** The current buffer builder shared for all the channels. */
@Nullable
private BufferBuilder bufferBuilder;

/**
* The flag for judging whether {@link #requestNewBufferBuilder(int)} and {@link #flushTargetPartition(int)}
* is triggered by {@link #randomEmit(IOReadableWritable)} or not.
*/
private boolean randomTriggered;

private BufferConsumer randomTriggeredConsumer;

BroadcastRecordWriter(
ResultPartitionWriter writer,
long timeout,
Expand All @@ -60,113 +40,18 @@ public final class BroadcastRecordWriter<T extends IOReadableWritable> extends R
}

@Override
public void emit(T record) throws IOException, InterruptedException {
public void emit(T record) throws IOException {
broadcastEmit(record);
}

@Override
public void randomEmit(T record) throws IOException, InterruptedException {
randomEmit(record, rng.nextInt(numberOfChannels));
}

/**
* For non-broadcast emit, we try to finish the current {@link BufferBuilder} first, and then request
* a new {@link BufferBuilder} for the random channel. If this new {@link BufferBuilder} is not full,
* it can be shared for all the other channels via initializing readable position in created
* {@link BufferConsumer}.
*/
@VisibleForTesting
void randomEmit(T record, int targetChannelIndex) throws IOException, InterruptedException {
tryFinishCurrentBufferBuilder(targetChannelIndex);

randomTriggered = true;
emit(record, targetChannelIndex);
randomTriggered = false;

if (bufferBuilder != null) {
for (int index = 0; index < numberOfChannels; index++) {
if (index != targetChannelIndex) {
addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()), index);
}
}
}
}
public void broadcastEmit(T record) throws IOException {
checkErroneous();

@Override
public void broadcastEmit(T record) throws IOException, InterruptedException {
// We could actually select any target channel here because all the channels
// are sharing the same BufferBuilder in broadcast mode.
emit(record, 0);
}
targetPartition.broadcastRecord(serializeRecord(serializer, record));

/**
* The flush could be triggered by {@link #randomEmit(IOReadableWritable)}, {@link #emit(IOReadableWritable)}
* or {@link #broadcastEmit(IOReadableWritable)}. Only random emit should flush a single target channel,
* otherwise we should flush all the channels.
*/
@Override
public void flushTargetPartition(int targetChannel) {
if (randomTriggered) {
super.flushTargetPartition(targetChannel);
} else {
if (flushAlways) {
flushAll();
}
}

@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
return bufferBuilder != null ? bufferBuilder : requestNewBufferBuilder(targetChannel);
}

/**
* The request could be from broadcast or non-broadcast modes like {@link #randomEmit(IOReadableWritable)}.
*
* <p>For non-broadcast, the created {@link BufferConsumer} is only for the target channel.
*
* <p>For broadcast, all the channels share the same requested {@link BufferBuilder} and the created
* {@link BufferConsumer} is copied for every channel.
*/
@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
checkState(bufferBuilder == null || bufferBuilder.isFinished());

BufferBuilder builder = super.requestNewBufferBuilder(targetChannel);
if (randomTriggered) {
addBufferConsumer(randomTriggeredConsumer = builder.createBufferConsumer(), targetChannel);
} else {
try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
for (int channel = 0; channel < numberOfChannels; channel++) {
addBufferConsumer(bufferConsumer.copy(), channel);
}
}
}

bufferBuilder = builder;
return builder;
}

@Override
public void tryFinishCurrentBufferBuilder(int targetChannel) {
if (bufferBuilder == null) {
return;
}

BufferBuilder builder = bufferBuilder;
bufferBuilder = null;

finishBufferBuilder(builder);
}

@Override
public void emptyCurrentBufferBuilder(int targetChannel) {
bufferBuilder = null;
}

@Override
public void closeBufferBuilders() {
if (bufferBuilder != null) {
bufferBuilder.finish();
bufferBuilder = null;
}
}
}
Loading

0 comments on commit 900a896

Please sign in to comment.