diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java deleted file mode 100644 index 6f7391779bae1..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.serialization; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; - -import java.io.IOException; - -/** - * Interface for turning records into sequences of memory segments. - */ -public interface RecordSerializer { - - /** - * Status of the serialization result. - */ - enum SerializationResult { - PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true), - FULL_RECORD_MEMORY_SEGMENT_FULL(true, true), - FULL_RECORD(true, false); - - private final boolean isFullRecord; - - private final boolean isFullBuffer; - - SerializationResult(boolean isFullRecord, boolean isFullBuffer) { - this.isFullRecord = isFullRecord; - this.isFullBuffer = isFullBuffer; - } - - /** - * Whether the full record was serialized and completely written to - * a target buffer. - * - * @return true if the complete record was written - */ - public boolean isFullRecord() { - return this.isFullRecord; - } - - /** - * Whether the target buffer is full after the serialization process. - * - * @return true if the target buffer is full - */ - public boolean isFullBuffer() { - return this.isFullBuffer; - } - } - - /** - * Starts serializing the given record to an intermediate data buffer. - * - * @param record the record to serialize - */ - void serializeRecord(T record) throws IOException; - - /** - * Copies the intermediate data serialization buffer to the given target buffer. - * - * @param bufferBuilder the new target buffer to use - * @return how much information was written to the target buffer and - * whether this buffer is full - */ - SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder); - - /** - * Supports copying an intermediate data serialization buffer to multiple target buffers - * by resetting its initial position before each copying. - */ - void reset(); - - /** - * @return true if has some serialized data pending copying to the result {@link BufferBuilder}. - */ - boolean hasSerializedData(); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java deleted file mode 100644 index 1bb9ec85db560..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.serialization; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataOutputSerializer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; - -import java.io.IOException; -import java.nio.ByteBuffer; - -/** - * Record serializer which serializes the complete record to an intermediate - * data serialization buffer and copies this buffer to target buffers - * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}. - * - * @param The type of the records that are serialized. - */ -public class SpanningRecordSerializer implements RecordSerializer { - - /** Flag to enable/disable checks, if buffer not set/full or pending serialization. */ - private static final boolean CHECKED = false; - - /** Intermediate data serialization. */ - private final DataOutputSerializer serializationBuffer; - - /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */ - private ByteBuffer dataBuffer; - - public SpanningRecordSerializer() { - serializationBuffer = new DataOutputSerializer(128); - - // ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic) - dataBuffer = serializationBuffer.wrapAsByteBuffer(); - } - - /** - * Serializes the complete record to an intermediate data serialization buffer. - * - * @param record the record to serialize - */ - @Override - public void serializeRecord(T record) throws IOException { - if (CHECKED) { - if (dataBuffer.hasRemaining()) { - throw new IllegalStateException("Pending serialization of previous record."); - } - } - - serializationBuffer.clear(); - // the initial capacity of the serialization buffer should be no less than 4 - serializationBuffer.skipBytesToWrite(4); - - // write data and length - record.write(serializationBuffer); - - int len = serializationBuffer.length() - 4; - serializationBuffer.setPosition(0); - serializationBuffer.writeInt(len); - serializationBuffer.skipBytesToWrite(len); - - dataBuffer = serializationBuffer.wrapAsByteBuffer(); - } - - /** - * Copies an intermediate data serialization buffer into the target BufferBuilder. - * - * @param targetBuffer the target BufferBuilder to copy to - * @return how much information was written to the target buffer and - * whether this buffer is full - */ - @Override - public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) { - targetBuffer.append(dataBuffer); - targetBuffer.commit(); - - return getSerializationResult(targetBuffer); - } - - private SerializationResult getSerializationResult(BufferBuilder targetBuffer) { - if (dataBuffer.hasRemaining()) { - return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL; - } - return !targetBuffer.isFull() - ? SerializationResult.FULL_RECORD - : SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL; - } - - @Override - public void reset() { - dataBuffer.position(0); - } - - @Override - public boolean hasSerializedData() { - return dataBuffer.hasRemaining(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java index b834738ef9c7c..ec800dcd8ed24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java @@ -18,40 +18,20 @@ package org.apache.flink.runtime.io.network.api.writer; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; - -import javax.annotation.Nullable; import java.io.IOException; -import static org.apache.flink.util.Preconditions.checkState; - /** * A special record-oriented runtime result writer only for broadcast mode. * - *

The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder} - * for all the channels. Then the serialization results need be copied only once to this buffer which would be - * shared for all the channels in a more efficient way. + *

The BroadcastRecordWriter extends the {@link RecordWriter} and emits records to all channels for + * regular {@link #emit(IOReadableWritable)}. * * @param the type of the record that can be emitted with this record writer */ public final class BroadcastRecordWriter extends RecordWriter { - /** The current buffer builder shared for all the channels. */ - @Nullable - private BufferBuilder bufferBuilder; - - /** - * The flag for judging whether {@link #requestNewBufferBuilder(int)} and {@link #flushTargetPartition(int)} - * is triggered by {@link #randomEmit(IOReadableWritable)} or not. - */ - private boolean randomTriggered; - - private BufferConsumer randomTriggeredConsumer; - BroadcastRecordWriter( ResultPartitionWriter writer, long timeout, @@ -60,113 +40,18 @@ public final class BroadcastRecordWriter extends R } @Override - public void emit(T record) throws IOException, InterruptedException { + public void emit(T record) throws IOException { broadcastEmit(record); } @Override - public void randomEmit(T record) throws IOException, InterruptedException { - randomEmit(record, rng.nextInt(numberOfChannels)); - } - - /** - * For non-broadcast emit, we try to finish the current {@link BufferBuilder} first, and then request - * a new {@link BufferBuilder} for the random channel. If this new {@link BufferBuilder} is not full, - * it can be shared for all the other channels via initializing readable position in created - * {@link BufferConsumer}. - */ - @VisibleForTesting - void randomEmit(T record, int targetChannelIndex) throws IOException, InterruptedException { - tryFinishCurrentBufferBuilder(targetChannelIndex); - - randomTriggered = true; - emit(record, targetChannelIndex); - randomTriggered = false; - - if (bufferBuilder != null) { - for (int index = 0; index < numberOfChannels; index++) { - if (index != targetChannelIndex) { - addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()), index); - } - } - } - } + public void broadcastEmit(T record) throws IOException { + checkErroneous(); - @Override - public void broadcastEmit(T record) throws IOException, InterruptedException { - // We could actually select any target channel here because all the channels - // are sharing the same BufferBuilder in broadcast mode. - emit(record, 0); - } + targetPartition.broadcastRecord(serializeRecord(serializer, record)); - /** - * The flush could be triggered by {@link #randomEmit(IOReadableWritable)}, {@link #emit(IOReadableWritable)} - * or {@link #broadcastEmit(IOReadableWritable)}. Only random emit should flush a single target channel, - * otherwise we should flush all the channels. - */ - @Override - public void flushTargetPartition(int targetChannel) { - if (randomTriggered) { - super.flushTargetPartition(targetChannel); - } else { + if (flushAlways) { flushAll(); } } - - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - return bufferBuilder != null ? bufferBuilder : requestNewBufferBuilder(targetChannel); - } - - /** - * The request could be from broadcast or non-broadcast modes like {@link #randomEmit(IOReadableWritable)}. - * - *

For non-broadcast, the created {@link BufferConsumer} is only for the target channel. - * - *

For broadcast, all the channels share the same requested {@link BufferBuilder} and the created - * {@link BufferConsumer} is copied for every channel. - */ - @Override - public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(bufferBuilder == null || bufferBuilder.isFinished()); - - BufferBuilder builder = super.requestNewBufferBuilder(targetChannel); - if (randomTriggered) { - addBufferConsumer(randomTriggeredConsumer = builder.createBufferConsumer(), targetChannel); - } else { - try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) { - for (int channel = 0; channel < numberOfChannels; channel++) { - addBufferConsumer(bufferConsumer.copy(), channel); - } - } - } - - bufferBuilder = builder; - return builder; - } - - @Override - public void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilder == null) { - return; - } - - BufferBuilder builder = bufferBuilder; - bufferBuilder = null; - - finishBufferBuilder(builder); - } - - @Override - public void emptyCurrentBufferBuilder(int targetChannel) { - bufferBuilder = null; - } - - @Override - public void closeBufferBuilders() { - if (bufferBuilder != null) { - bufferBuilder.finish(); - bufferBuilder = null; - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java index 5e320563575eb..b94207e39c881 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java @@ -19,19 +19,17 @@ package org.apache.flink.runtime.io.network.api.writer; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import java.io.IOException; +import java.nio.ByteBuffer; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * A regular record-oriented runtime result writer. * - *

The ChannelSelectorRecordWriter extends the {@link RecordWriter} and maintains an array of - * {@link BufferBuilder}s for all the channels. The {@link #emit(IOReadableWritable)} - * operation is based on {@link ChannelSelector} to select the target channel. + *

The ChannelSelectorRecordWriter extends the {@link RecordWriter} and emits records to the channel + * selected by the {@link ChannelSelector} for regular {@link #emit(IOReadableWritable)}. * * @param the type of the record that can be emitted with this record writer */ @@ -39,9 +37,6 @@ public final class ChannelSelectorRecordWriter ext private final ChannelSelector channelSelector; - /** Every subpartition maintains a separate buffer builder which might be null. */ - private final BufferBuilder[] bufferBuilders; - ChannelSelectorRecordWriter( ResultPartitionWriter writer, ChannelSelector channelSelector, @@ -51,77 +46,28 @@ public final class ChannelSelectorRecordWriter ext this.channelSelector = checkNotNull(channelSelector); this.channelSelector.setup(numberOfChannels); - - this.bufferBuilders = new BufferBuilder[numberOfChannels]; } @Override - public void emit(T record) throws IOException, InterruptedException { + public void emit(T record) throws IOException { emit(record, channelSelector.selectChannel(record)); } @Override - public void randomEmit(T record) throws IOException, InterruptedException { - emit(record, rng.nextInt(numberOfChannels)); - } - - /** - * The record is serialized into intermediate serialization buffer which is then copied - * into the target buffer for every channel. - */ - @Override - public void broadcastEmit(T record) throws IOException, InterruptedException { + public void broadcastEmit(T record) throws IOException { checkErroneous(); - serializer.serializeRecord(record); - - for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) { - copyFromSerializerToTargetChannel(targetChannel); - } - } - - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - if (bufferBuilders[targetChannel] != null) { - return bufferBuilders[targetChannel]; - } else { - return requestNewBufferBuilder(targetChannel); - } - } - - @Override - public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished()); - - BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel); - addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel); - bufferBuilders[targetChannel] = bufferBuilder; - return bufferBuilder; - } - - @Override - public void tryFinishCurrentBufferBuilder(int targetChannel) { - if (bufferBuilders[targetChannel] == null) { - return; + // Emitting to all channels in a for loop can be better than calling + // ResultPartitionWriter#broadcastRecord because the broadcastRecord + // method incurs extra overhead. + ByteBuffer serializedRecord = serializeRecord(serializer, record); + for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) { + serializedRecord.rewind(); + emit(record, channelIndex); } - BufferBuilder bufferBuilder = bufferBuilders[targetChannel]; - bufferBuilders[targetChannel] = null; - finishBufferBuilder(bufferBuilder); - } - - @Override - public void emptyCurrentBufferBuilder(int targetChannel) { - bufferBuilders[targetChannel] = null; - } - - @Override - public void closeBufferBuilders() { - for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) { - if (bufferBuilders[targetChannel] != null) { - bufferBuilders[targetChannel].finish(); - bufferBuilders[targetChannel] = null; - } + if (flushAlways) { + flushAll(); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java index 995e670631623..b58ef38a13504 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java @@ -20,17 +20,9 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.metrics.Counter; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.AvailabilityProvider; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.XORShiftRandom; @@ -40,18 +32,17 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.CompletableFuture; -import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkState; /** * An abstract record-oriented runtime result writer. * *

The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of - * serializing records into buffers. + * channel selection and serializing records into bytes. * * @param the type of the record that can be emitted with this record writer */ @@ -63,21 +54,15 @@ public abstract class RecordWriter implements Avai private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class); - private final ResultPartitionWriter targetPartition; + protected final ResultPartitionWriter targetPartition; protected final int numberOfChannels; - protected final RecordSerializer serializer; + protected final DataOutputSerializer serializer; protected final Random rng = new XORShiftRandom(); - private Counter numBytesOut = new SimpleCounter(); - - private Counter numBuffersOut = new SimpleCounter(); - - protected Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter()); - - private final boolean flushAlways; + protected final boolean flushAlways; /** The thread that periodically flushes the output, to give an upper latency bound. */ @Nullable @@ -93,7 +78,7 @@ public abstract class RecordWriter implements Avai this.targetPartition = writer; this.numberOfChannels = writer.getNumberOfSubpartitions(); - this.serializer = new SpanningRecordSerializer(); + this.serializer = new DataOutputSerializer(128); checkArgument(timeout >= -1); this.flushAlways = (timeout == 0); @@ -109,89 +94,58 @@ public abstract class RecordWriter implements Avai } } - protected void emit(T record, int targetChannel) throws IOException, InterruptedException { + protected void emit(T record, int targetSubpartition) throws IOException { checkErroneous(); - serializer.serializeRecord(record); + targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition); - // Make sure we don't hold onto the large intermediate serialization buffer for too long - copyFromSerializerToTargetChannel(targetChannel); + if (flushAlways) { + targetPartition.flush(targetSubpartition); + } } - /** - * @param targetChannel - * @return true if the intermediate serialization buffer should be pruned - */ - protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException { - // We should reset the initial position of the intermediate serialization buffer before - // copying, so the serialization results can be copied to multiple target buffers. - serializer.reset(); - - boolean pruneTriggered = false; - BufferBuilder bufferBuilder = getBufferBuilder(targetChannel); - SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); - while (result.isFullBuffer()) { - finishBufferBuilder(bufferBuilder); - - // If this was a full record, we are done. Not breaking out of the loop at this point - // will lead to another buffer request before breaking out (that would not be a - // problem per se, but it can lead to stalls in the pipeline). - if (result.isFullRecord()) { - pruneTriggered = true; - emptyCurrentBufferBuilder(targetChannel); - break; - } + public void broadcastEvent(AbstractEvent event) throws IOException { + broadcastEvent(event, false); + } - bufferBuilder = requestNewBufferBuilder(targetChannel); - result = serializer.copyToBufferBuilder(bufferBuilder); - } - checkState(!serializer.hasSerializedData(), "All data should be written at once"); + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + targetPartition.broadcastEvent(event, isPriorityEvent); if (flushAlways) { - flushTargetPartition(targetChannel); + flushAll(); } - return pruneTriggered; } - public void broadcastEvent(AbstractEvent event) throws IOException { - broadcastEvent(event, false); - } + @VisibleForTesting + public static ByteBuffer serializeRecord( + DataOutputSerializer serializer, + IOReadableWritable record) throws IOException { + serializer.clear(); - public void broadcastEvent(AbstractEvent event, boolean hasPriority) throws IOException { - try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, hasPriority)) { - for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) { - tryFinishCurrentBufferBuilder(targetChannel); + // the initial capacity should be no less than 4 bytes + serializer.skipBytesToWrite(4); - // Retain the buffer so that it can be recycled by each channel of targetPartition - targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel); - } + // write data + record.write(serializer); - if (flushAlways) { - flushAll(); - } - } + // write length + int len = serializer.length() - 4; + serializer.setPosition(0); + serializer.writeInt(len); + serializer.skipBytesToWrite(len); + + return serializer.wrapAsByteBuffer(); } public void flushAll() { targetPartition.flushAll(); } - protected void flushTargetPartition(int targetChannel) { - targetPartition.flush(targetChannel); - } - /** * Sets the metric group for this RecordWriter. */ public void setMetricGroup(TaskIOMetricGroup metrics) { - numBytesOut = metrics.getNumBytesOutCounter(); - numBuffersOut = metrics.getNumBuffersOutCounter(); - idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond(); - } - - protected void finishBufferBuilder(BufferBuilder bufferBuilder) { - numBytesOut.inc(bufferBuilder.finish()); - numBuffersOut.inc(); + targetPartition.setMetricGroup(metrics); } @Override @@ -202,44 +156,27 @@ public CompletableFuture getAvailableFuture() { /** * This is used to send regular records. */ - public abstract void emit(T record) throws IOException, InterruptedException; + public abstract void emit(T record) throws IOException; /** * This is used to send LatencyMarks to a random target channel. */ - public abstract void randomEmit(T record) throws IOException, InterruptedException; - - /** - * This is used to broadcast streaming Watermarks in-band with records. - */ - public abstract void broadcastEmit(T record) throws IOException, InterruptedException; - - /** - * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need - * request a new one for this target channel. - */ - abstract BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException; - - /** - * Marks the current {@link BufferBuilder} as finished if present and clears the state for next one. - */ - abstract void tryFinishCurrentBufferBuilder(int targetChannel); + public void randomEmit(T record) throws IOException { + checkErroneous(); - /** - * Marks the current {@link BufferBuilder} as empty for the target channel. - */ - abstract void emptyCurrentBufferBuilder(int targetChannel); + int targetSubpartition = rng.nextInt(numberOfChannels); + emit(record, targetSubpartition); + } /** - * Marks the current {@link BufferBuilder}s as finished and releases the resources. + * This is used to broadcast streaming Watermarks in-band with records. */ - abstract void closeBufferBuilders(); + public abstract void broadcastEmit(T record) throws IOException; /** * Closes the writer. This stops the flushing thread (if there is one). */ public void close() { - closeBufferBuilders(); // make sure we terminate the thread in any case if (outputFlusher != null) { outputFlusher.terminate(); @@ -277,28 +214,6 @@ protected void checkErroneous() throws IOException { } } - protected void addBufferConsumer(BufferConsumer consumer, int targetChannel) throws IOException { - targetPartition.addBufferConsumer(consumer, targetChannel); - } - - /** - * Requests a new {@link BufferBuilder} for the target channel and returns it. - */ - public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException { - BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel); - if (builder == null) { - long start = System.currentTimeMillis(); - builder = targetPartition.getBufferBuilder(targetChannel); - idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start); - } - return builder; - } - - @VisibleForTesting - public Meter getIdleTimeMsPerSecond() { - return idleTimeMsPerSecond; - } - // ------------------------------------------------------------------------ /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java index 838850e17b76f..a90b0ac09992f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java @@ -18,19 +18,20 @@ package org.apache.flink.runtime.io.network.api.writer; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.AvailabilityProvider; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; /** - * A buffer-oriented runtime result writer API for producing results. + * A record-oriented runtime result writer API for producing results. * *

If {@link ResultPartitionWriter#close()} is called before {@link ResultPartitionWriter#fail(Throwable)} or * {@link ResultPartitionWriter#finish()}, it abruptly triggers failure and cancellation of production. @@ -51,30 +52,27 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid int getNumTargetKeyGroups(); /** - * Requests a {@link BufferBuilder} from this partition for writing data. + * Writes the given serialized record to the target subpartition. */ - BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException; + void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException; + /** + * Writes the given serialized record to all subpartitions. One can also achieve the same effect by emitting + * the same record to all subpartitions one by one, however, this method can have better performance for the + * underlying implementation can do some optimizations, for example coping the given serialized record only + * once to a shared channel which can be consumed by all subpartitions. + */ + void broadcastRecord(ByteBuffer record) throws IOException; /** - * Try to request a {@link BufferBuilder} from this partition for writing data. - * - *

Returns null if no buffer is available or the buffer provider has been destroyed. + * Writes the given {@link AbstractEvent} to all channels. */ - BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException; + void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException; /** - * Adds the bufferConsumer to the subpartition with the given index. - * - *

This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing - * it's resources. - * - *

To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one - * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}. - * - * @return true if operation succeeded and bufferConsumer was enqueued for consumption. + * Sets the metric group for the {@link ResultPartitionWriter}. */ - boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException; + void setMetricGroup(TaskIOMetricGroup metrics); /** * Returns a reader for the subpartition with the given index. @@ -82,12 +80,12 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException; /** - * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in all subpartitions. + * Manually trigger the consumption of data from all subpartitions. */ void flushAll(); /** - * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition. + * Manually trigger the consumption of data from the given subpartitions. */ void flush(int subpartitionIndex); @@ -108,4 +106,19 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid *

Closing of partition is still needed afterwards. */ void finish() throws IOException; + + boolean isFinished(); + + /** + * Releases the partition writer which releases the produced data and no reader can consume the + * partition any more. + */ + void release(Throwable cause); + + boolean isReleased(); + + /** + * Closes the partition writer which releases the allocated resource, for example the buffer pool. + */ + void close() throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index 437d0a0b1b051..bd9e8ad614397 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -19,16 +19,23 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.util.function.FunctionWithException; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkElementIndex; @@ -49,6 +56,14 @@ public class BufferWritingResultPartition extends ResultPartition { /** The subpartitions of this partition. At least one. */ protected final ResultSubpartition[] subpartitions; + /** For non-broadcast mode, each subpartition maintains a separate BufferBuilder which might be null. */ + private final BufferBuilder[] subpartitionBufferBuilders; + + /** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */ + private BufferBuilder broadcastBufferBuilder; + + private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter()); + public BufferWritingResultPartition( String owningTaskName, int partitionIndex, @@ -72,6 +87,7 @@ public BufferWritingResultPartition( bufferPoolFactory); this.subpartitions = checkNotNull(subpartitions); + this.subpartitionBufferBuilders = new BufferBuilder[subpartitions.length]; } public int getNumberOfQueuedBuffers() { @@ -90,46 +106,58 @@ public int getNumberOfQueuedBuffers(int targetSubpartition) { } @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - checkInProduceState(); - - return bufferPool.requestBufferBuilderBlocking(targetChannel); + public void flushAll() { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.flush(); + } } @Override - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - return bufferPool.requestBufferBuilder(targetChannel); + public void flush(int targetSubpartition) { + subpartitions[targetSubpartition].flush(); } - @Override - public boolean addBufferConsumer( - BufferConsumer bufferConsumer, - int subpartitionIndex) throws IOException { - checkNotNull(bufferConsumer); + public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException { + do { + final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition); + bufferBuilder.appendAndCommit(record); - ResultSubpartition subpartition; - try { - checkInProduceState(); - subpartition = subpartitions[subpartitionIndex]; - } - catch (Exception ex) { - bufferConsumer.close(); - throw ex; - } + if (bufferBuilder.isFull()) { + finishSubpartitionBufferBuilder(targetSubpartition); + } + } while (record.hasRemaining()); + } + + @Override + public void broadcastRecord(ByteBuffer record) throws IOException { + do { + final BufferBuilder bufferBuilder = getBroadcastBufferBuilder(); + bufferBuilder.appendAndCommit(record); - return subpartition.add(bufferConsumer); + if (bufferBuilder.isFull()) { + finishBroadcastBufferBuilder(); + } + } while (record.hasRemaining()); } @Override - public void flushAll() { - for (ResultSubpartition subpartition : subpartitions) { - subpartition.flush(); + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + checkInProduceState(); + finishBroadcastBufferBuilder(); + finishSubpartitionBufferBuilders(); + + try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent)) { + for (ResultSubpartition subpartition : subpartitions) { + // Retain the buffer so that it can be recycled by each channel of targetPartition + subpartition.add(eventBufferConsumer.copy()); + } } } @Override - public void flush(int targetSubpartition) { - subpartitions[targetSubpartition].flush(); + public void setMetricGroup(TaskIOMetricGroup metrics) { + super.setMetricGroup(metrics); + idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond(); } @Override @@ -149,9 +177,13 @@ public ResultSubpartitionView createSubpartitionView( @Override public void finish() throws IOException { + finishBroadcastBufferBuilder(); + finishSubpartitionBufferBuilders(); + for (ResultSubpartition subpartition : subpartitions) { subpartition.finish(); } + super.finish(); } @@ -169,6 +201,101 @@ protected void releaseInternal() { } } + private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException { + final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition]; + if (bufferBuilder != null) { + return bufferBuilder; + } + + return getNewSubpartitionBufferBuilder(targetSubpartition); + } + + private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException { + checkInProduceState(); + ensureUnicastMode(); + + final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition); + subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer()); + subpartitionBufferBuilders[targetSubpartition] = bufferBuilder; + return bufferBuilder; + } + + private BufferBuilder getBroadcastBufferBuilder() throws IOException { + if (broadcastBufferBuilder != null) { + return broadcastBufferBuilder; + } + + return getNewBroadcastBufferBuilder(); + } + + private BufferBuilder getNewBroadcastBufferBuilder() throws IOException { + checkInProduceState(); + ensureBroadcastMode(); + + final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(0); + broadcastBufferBuilder = bufferBuilder; + + try (final BufferConsumer consumer = bufferBuilder.createBufferConsumer()) { + for (ResultSubpartition subpartition : subpartitions) { + subpartition.add(consumer.copy()); + } + } + + return bufferBuilder; + } + + private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition) throws IOException { + BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition); + if (bufferBuilder != null) { + return bufferBuilder; + } + + final long start = System.currentTimeMillis(); + try { + bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition); + idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start); + return bufferBuilder; + } catch (InterruptedException e) { + throw new IOException("Interrupted while waiting for buffer"); + } + } + + private void finishSubpartitionBufferBuilder(int targetSubpartition) { + final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition]; + if (bufferBuilder != null) { + numBytesOut.inc(bufferBuilder.finish()); + numBuffersOut.inc(); + subpartitionBufferBuilders[targetSubpartition] = null; + } + } + + private void finishSubpartitionBufferBuilders() { + for (int channelIndex = 0; channelIndex < numSubpartitions; channelIndex++) { + finishSubpartitionBufferBuilder(channelIndex); + } + } + + private void finishBroadcastBufferBuilder() { + if (broadcastBufferBuilder != null) { + numBytesOut.inc(broadcastBufferBuilder.finish() * numSubpartitions); + numBuffersOut.inc(numSubpartitions); + broadcastBufferBuilder = null; + } + } + + private void ensureUnicastMode() { + finishBroadcastBufferBuilder(); + } + + private void ensureBroadcastMode() { + finishSubpartitionBufferBuilders(); + } + + @VisibleForTesting + public Meter getIdleTimeMsPerSecond() { + return idleTimeMsPerSecond; + } + @VisibleForTesting public ResultSubpartition[] getAllPartitions() { return subpartitions; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java index 02c91c5c3cbb0..bd81c19a45e6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; @@ -27,6 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionWithException; @@ -103,6 +106,10 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo @Nullable protected final BufferCompressor bufferCompressor; + protected Counter numBytesOut = new SimpleCounter(); + + protected Counter numBuffersOut = new SimpleCounter(); + public ResultPartition( String owningTaskName, int partitionIndex, @@ -202,13 +209,15 @@ public void finish() throws IOException { isFinished = true; } + @Override + public boolean isFinished() { + return isFinished; + } + public void release() { release(null); } - /** - * Releases the result partition. - */ public void release(Throwable cause) { if (isReleased.compareAndSet(false, true)) { LOG.debug("{}: Releasing {}.", owningTaskName, this); @@ -248,6 +257,12 @@ public int getNumTargetKeyGroups() { return numTargetKeyGroups; } + @Override + public void setMetricGroup(TaskIOMetricGroup metrics) { + numBytesOut = metrics.getNumBytesOutCounter(); + numBuffersOut = metrics.getNumBuffersOutCounter(); + } + /** * Releases buffers held by this result partition. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java index d8cd9ec226542..a2dd4c9532936 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java @@ -68,9 +68,6 @@ public void collect(T record) { catch (IOException e) { throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e); } - catch (InterruptedException e) { - throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e); - } } else { throw new NullPointerException("The system does not support records that are null. " diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java index 3451939a80af9..3eca089dff389 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java @@ -21,17 +21,18 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition; import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -68,16 +69,6 @@ public ConsumableNotifyingResultPartitionWriterDecorator( this.partitionConsumableNotifier = checkNotNull(partitionConsumableNotifier); } - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - return partitionWriter.getBufferBuilder(targetChannel); - } - - @Override - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - return partitionWriter.tryGetBufferBuilder(targetChannel); - } - @Override public ResultPartitionID getPartitionId() { return partitionWriter.getPartitionId(); @@ -99,18 +90,34 @@ public void setup() throws IOException { } @Override - public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { - return partitionWriter.createSubpartitionView(index, availabilityListener); + public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException { + partitionWriter.emitRecord(record, targetSubpartition); + + notifyPipelinedConsumers(); } @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException { - boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex); - if (success) { - notifyPipelinedConsumers(); - } + public void broadcastRecord(ByteBuffer record) throws IOException { + partitionWriter.broadcastRecord(record); + + notifyPipelinedConsumers(); + } + + @Override + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + partitionWriter.broadcastEvent(event, isPriorityEvent); + + notifyPipelinedConsumers(); + } - return success; + @Override + public void setMetricGroup(TaskIOMetricGroup metrics) { + partitionWriter.setMetricGroup(metrics); + } + + @Override + public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException { + return partitionWriter.createSubpartitionView(index, availabilityListener); } @Override @@ -130,6 +137,21 @@ public void finish() throws IOException { notifyPipelinedConsumers(); } + @Override + public boolean isFinished() { + return partitionWriter.isFinished(); + } + + @Override + public void release(Throwable cause) { + partitionWriter.release(cause); + } + + @Override + public boolean isReleased() { + return partitionWriter.isReleased(); + } + @Override public void fail(Throwable throwable) { partitionWriter.fail(throwable); @@ -152,7 +174,7 @@ public void close() throws Exception { * this will trigger the deployment of consuming tasks after the first buffer has been added. */ private void notifyPipelinedConsumers() { - if (!hasNotifiedPipelinedConsumers) { + if (!hasNotifiedPipelinedConsumers && !partitionWriter.isReleased()) { partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionWriter.getPartitionId(), taskActions); hasNotifiedPipelinedConsumers = true; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java index e35b3111cc8ed..9c748dbfa15a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.io.network.api.serialization; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -116,17 +118,16 @@ public void testHandleMixedLargeRecords() throws Exception { // ----------------------------------------------------------------------------------------------------------------- private void testSerializationRoundTrip(Iterable records, int segmentSize) throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer<>(); RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{ tempFolder.getRoot().getAbsolutePath() }); - testSerializationRoundTrip(records, segmentSize, serializer, deserializer); + testSerializationRoundTrip(records, segmentSize, deserializer); } /** - * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link RecordDeserializer} - * interact as expected. + * Iterates over the provided records and tests whether {@link RecordWriter#serializeRecord} and + * {@link RecordDeserializer} interact as expected. * *

Only a single {@link MemorySegment} will be allocated. * @@ -136,14 +137,14 @@ private void testSerializationRoundTrip(Iterable records, private static void testSerializationRoundTrip( Iterable records, int segmentSize, - RecordSerializer serializer, RecordDeserializer deserializer) throws Exception { + final DataOutputSerializer serializer = new DataOutputSerializer(128); final ArrayDeque serializedRecords = new ArrayDeque<>(); // ------------------------------------------------------------------------------------------------------------- - BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize); + BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer.wrapAsByteBuffer(), segmentSize); int numRecords = 0; for (SerializationTestType record : records) { @@ -153,18 +154,21 @@ private static void testSerializationRoundTrip( numRecords++; // serialize record - serializer.serializeRecord(record); - if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) { + serializer.clear(); + ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record); + serializationResult.getBufferBuilder().appendAndCommit(serializedRecord); + if (serializationResult.getBufferBuilder().isFull()) { // buffer is full => start deserializing deserializer.setNextBuffer(serializationResult.buildBuffer()); numRecords -= DeserializationUtils.deserializeRecords(serializedRecords, deserializer); // move buffers as long as necessary (for long records) - while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) { + while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) { deserializer.setNextBuffer(serializationResult.buildBuffer()); } } + Assert.assertFalse(serializedRecord.hasRemaining()); } // deserialize left over records @@ -183,18 +187,16 @@ private static void testSerializationRoundTrip( // assert that all records have been serialized and deserialized Assert.assertEquals(0, numRecords); - Assert.assertFalse(serializer.hasSerializedData()); Assert.assertFalse(deserializer.hasUnfinishedData()); } @Test public void testSmallRecordUnconsumedBuffer() throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer<>(); RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{tempFolder.getRoot().getAbsolutePath()}); - testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024); + testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024); } /** @@ -202,33 +204,29 @@ public void testSmallRecordUnconsumedBuffer() throws Exception { */ @Test public void testSpanningRecordUnconsumedBuffer() throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer<>(); RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{tempFolder.getRoot().getAbsolutePath()}); - testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1); + testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1); } @Test public void testLargeSpanningRecordUnconsumedBuffer() throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer<>(); RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{tempFolder.getRoot().getAbsolutePath()}); - testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1); + testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1); } @Test public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception { - RecordSerializer serializer = new SpanningRecordSerializer<>(); RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{tempFolder.getRoot().getAbsolutePath()}); testUnconsumedBuffer( - serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1, @@ -237,7 +235,6 @@ public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Ex deserializer.clear(); testUnconsumedBuffer( - serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1, @@ -245,17 +242,18 @@ public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Ex } public void testUnconsumedBuffer( - RecordSerializer serializer, RecordDeserializer deserializer, SerializationTestType record, int segmentSize, byte... leftOverBytes) throws Exception { try (ByteArrayOutputStream unconsumedBytes = new ByteArrayOutputStream()) { - serializer.serializeRecord(record); + DataOutputSerializer serializer = new DataOutputSerializer(128); + ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record); - BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize); + BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize); - if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) { + serializationResult.getBufferBuilder().appendAndCommit(serializedRecord); + if (serializationResult.getBufferBuilder().isFull()) { // buffer is full => start deserializing Buffer buffer = serializationResult.buildBuffer(); writeBuffer(buffer.readOnlySlice().getNioBufferReadable(), unconsumedBytes); @@ -265,7 +263,7 @@ public void testUnconsumedBuffer( deserializer.getNextRecord(record.getClass().newInstance()); // move buffers as long as necessary (for long records) - while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) { + while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) { buffer = serializationResult.buildBuffer(); if (serializationResult.isFullRecord()) { @@ -310,7 +308,7 @@ private static void writeBuffer(ByteBuffer buffer, OutputStream stream) throws I } private static BufferAndSerializerResult setNextBufferForSerializer( - RecordSerializer serializer, + ByteBuffer serializedRecord, int segmentSize) throws IOException { // create a bufferBuilder with some random starting offset to properly test handling buffer slices in the // deserialization code. @@ -319,24 +317,29 @@ private static BufferAndSerializerResult setNextBufferForSerializer( BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); bufferConsumer.build().recycleBuffer(); + bufferBuilder.appendAndCommit(serializedRecord); return new BufferAndSerializerResult( bufferBuilder, bufferConsumer, - serializer.copyToBufferBuilder(bufferBuilder)); + bufferBuilder.isFull(), + !serializedRecord.hasRemaining()); } private static class BufferAndSerializerResult { private final BufferBuilder bufferBuilder; private final BufferConsumer bufferConsumer; - private final RecordSerializer.SerializationResult serializationResult; + private final boolean isFullBuffer; + private final boolean isFullRecord; public BufferAndSerializerResult( BufferBuilder bufferBuilder, BufferConsumer bufferConsumer, - RecordSerializer.SerializationResult serializationResult) { + boolean isFullBuffer, + boolean isFullRecord) { this.bufferBuilder = bufferBuilder; this.bufferConsumer = bufferConsumer; - this.serializationResult = serializationResult; + this.isFullBuffer = isFullBuffer; + this.isFullRecord = isFullRecord; } public BufferBuilder getBufferBuilder() { @@ -348,11 +351,11 @@ public Buffer buildBuffer() { } public boolean isFullBuffer() { - return serializationResult.isFullBuffer(); + return isFullBuffer; } public boolean isFullRecord() { - return serializationResult.isFullRecord(); + return isFullRecord; } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java deleted file mode 100644 index e5f5dfcd1028c..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api.serialization; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.testutils.serialization.types.SerializationTestType; -import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; -import org.apache.flink.testutils.serialization.types.Util; - -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Random; - -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; - -/** - * Tests for the {@link SpanningRecordSerializer}. - */ -public class SpanningRecordSerializerTest { - - @Test - public void testHasSerializedData() throws IOException { - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); - final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT); - - Assert.assertFalse(serializer.hasSerializedData()); - - serializer.serializeRecord(randomIntRecord); - Assert.assertTrue(serializer.hasSerializedData()); - - final BufferBuilder bufferBuilder1 = createBufferBuilder(16); - serializer.copyToBufferBuilder(bufferBuilder1); - Assert.assertFalse(serializer.hasSerializedData()); - - final BufferBuilder bufferBuilder2 = createBufferBuilder(8); - serializer.reset(); - serializer.copyToBufferBuilder(bufferBuilder2); - Assert.assertFalse(serializer.hasSerializedData()); - - serializer.reset(); - serializer.copyToBufferBuilder(bufferBuilder2); - // Buffer builder full! - Assert.assertTrue(serializer.hasSerializedData()); - } - - @Test - public void testEmptyRecords() throws IOException { - final int segmentSize = 11; - - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); - final BufferBuilder bufferBuilder1 = createBufferBuilder(segmentSize); - - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, - serializer.copyToBufferBuilder(bufferBuilder1)); - - SerializationTestType emptyRecord = new SerializationTestType() { - @Override - public SerializationTestType getRandom(Random rnd) { - throw new UnsupportedOperationException(); - } - - @Override - public int length() { - throw new UnsupportedOperationException(); - } - - @Override - public void write(DataOutputView out) {} - - @Override - public void read(DataInputView in) {} - - @Override - public int hashCode() { - throw new UnsupportedOperationException(); - } - - @Override - public boolean equals(Object obj) { - throw new UnsupportedOperationException(); - } - }; - - serializer.serializeRecord(emptyRecord); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1)); - - serializer.reset(); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1)); - - serializer.reset(); - Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, - serializer.copyToBufferBuilder(bufferBuilder1)); - - final BufferBuilder bufferBuilder2 = createBufferBuilder(segmentSize); - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, - serializer.copyToBufferBuilder(bufferBuilder2)); - } - - @Test - public void testIntRecordsSpanningMultipleSegments() throws Exception { - final int segmentSize = 1; - final int numValues = 10; - - test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); - } - - @Test - public void testIntRecordsWithAlignedSegments() throws Exception { - final int segmentSize = 64; - final int numValues = 64; - - test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); - } - - @Test - public void testIntRecordsWithUnalignedSegments() throws Exception { - final int segmentSize = 31; - final int numValues = 248; // least common multiple => last record should align - - test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize); - } - - @Test - public void testRandomRecords() throws Exception { - final int segmentSize = 127; - final int numValues = 100000; - - test(Util.randomRecords(numValues), segmentSize); - } - - // ----------------------------------------------------------------------------------------------------------------- - - /** - * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected - * {@link RecordSerializer.SerializationResult} values. - * - *

Only a single {@link MemorySegment} will be allocated. - * - * @param records records to test - * @param segmentSize size for the {@link MemorySegment} - */ - private void test(Util.MockRecords records, int segmentSize) throws Exception { - final int serializationOverhead = 4; // length encoding - - final SpanningRecordSerializer serializer = new SpanningRecordSerializer<>(); - - // ------------------------------------------------------------------------------------------------------------- - - BufferBuilder bufferBuilder = createBufferBuilder(segmentSize); - int numBytes = 0; - for (SerializationTestType record : records) { - serializer.serializeRecord(record); - RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder); - numBytes += record.length() + serializationOverhead; - - if (numBytes < segmentSize) { - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result); - } else if (numBytes == segmentSize) { - Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result); - bufferBuilder = createBufferBuilder(segmentSize); - numBytes = 0; - } else { - Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result); - - while (result.isFullBuffer()) { - numBytes -= segmentSize; - bufferBuilder = createBufferBuilder(segmentSize); - result = serializer.copyToBufferBuilder(bufferBuilder); - } - - Assert.assertTrue(result.isFullRecord()); - } - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java index 84aa17e0765ec..66eb8f2736266 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java @@ -18,79 +18,45 @@ package org.apache.flink.runtime.io.network.api.writer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter; import javax.annotation.concurrent.ThreadSafe; import java.io.IOException; -import java.util.ArrayDeque; +import java.nio.ByteBuffer; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; +import static org.apache.flink.util.Preconditions.checkArgument; /** * {@link ResultPartitionWriter} that collects output on the List. */ @ThreadSafe public abstract class AbstractCollectingResultPartitionWriter extends MockResultPartitionWriter { - private final BufferProvider bufferProvider; - private final ArrayDeque bufferConsumers = new ArrayDeque<>(); - - public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) { - this.bufferProvider = checkNotNull(bufferProvider); - } - - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - return bufferProvider.requestBufferBuilderBlocking(targetChannel); - } @Override - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - return bufferProvider.requestBufferBuilder(targetChannel); + public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException { + checkArgument(targetSubpartition < getNumberOfSubpartitions()); + deserializeRecord(record); } @Override - public synchronized boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { - checkState(targetChannel < getNumberOfSubpartitions()); - bufferConsumers.add(bufferConsumer); - processBufferConsumers(); - return true; + public void broadcastRecord(ByteBuffer record) throws IOException { + deserializeRecord(record); } - private void processBufferConsumers() throws IOException { - while (!bufferConsumers.isEmpty()) { - BufferConsumer bufferConsumer = bufferConsumers.peek(); - Buffer buffer = bufferConsumer.build(); - try { - deserializeBuffer(buffer); - if (!bufferConsumer.isFinished()) { - break; - } - bufferConsumers.pop().close(); - } - finally { - buffer.recycleBuffer(); - } - } - } + private void deserializeRecord(ByteBuffer serializedRecord) throws IOException { + checkArgument(serializedRecord.hasArray()); - @Override - public synchronized void flushAll() { - try { - processBufferConsumers(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void flush(int subpartitionIndex) { - flushAll(); + MemorySegment segment = MemorySegmentFactory.wrap(serializedRecord.array()); + NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE); + buffer.setSize(serializedRecord.remaining()); + deserializeBuffer(buffer); + buffer.recycleBuffer(); } protected abstract void deserializeBuffer(Buffer buffer) throws IOException; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java index dccfd1d6a6ad6..d318961ce5acd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java @@ -22,8 +22,10 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; +import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.testutils.serialization.types.IntType; import org.apache.flink.testutils.serialization.types.SerializationTestType; import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory; @@ -31,12 +33,12 @@ import org.junit.Test; +import java.io.IOException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Queue; import static org.junit.Assert.assertEquals; @@ -56,19 +58,12 @@ public BroadcastRecordWriterTest() { */ @Test public void testBroadcastMixedRandomEmitRecord() throws Exception { - final int numberOfChannels = 4; + final int numberOfChannels = 8; final int numberOfRecords = 8; final int bufferSize = 32; - @SuppressWarnings("unchecked") - final Queue[] queues = new Queue[numberOfChannels]; - for (int i = 0; i < numberOfChannels; i++) { - queues[i] = new ArrayDeque<>(); - } - - final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); - final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); - final BroadcastRecordWriter writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test"); + final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels); + final BroadcastRecordWriter writer = new BroadcastRecordWriter<>(partition, -1, "test"); final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{ tempFolder.getRoot().getAbsolutePath() }); @@ -84,7 +79,7 @@ public void testBroadcastMixedRandomEmitRecord() throws Exception { int index = 0; for (SerializationTestType record : records) { int randomChannel = index++ % numberOfChannels; - writer.randomEmit(record, randomChannel); + writer.emit(record, randomChannel); serializedRecords.get(randomChannel).add(record); writer.broadcastEmit(record); @@ -93,23 +88,23 @@ public void testBroadcastMixedRandomEmitRecord() throws Exception { } } - final int numberOfCreatedBuffers = bufferProvider.getNumberOfCreatedBuffers(); + final int numberOfCreatedBuffers = partition.getBufferPool().bestEffortGetNumOfUsedBuffers(); // verify the expected number of requested buffers, and it would always request a new buffer while random emitting - assertEquals(numberOfRecords, numberOfCreatedBuffers); + assertEquals(2 * numberOfRecords, numberOfCreatedBuffers); for (int i = 0; i < numberOfChannels; i++) { // every channel would queue the number of above crated buffers - assertEquals(numberOfRecords, queues[i].size()); + assertEquals(numberOfRecords + 1, partition.getNumberOfQueuedBuffers(i)); final int excessRandomRecords = i < numberOfRecords % numberOfChannels ? 1 : 0; final int numberOfRandomRecords = numberOfRecords / numberOfChannels + excessRandomRecords; final int numberOfTotalRecords = numberOfRecords + numberOfRandomRecords; // verify the data correctness in every channel queue verifyDeserializationResults( - queues[i], + partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()), deserializer, serializedRecords.get(i), - numberOfCreatedBuffers, + numberOfRecords + 1, numberOfTotalRecords); } } @@ -121,45 +116,41 @@ public void testBroadcastMixedRandomEmitRecord() throws Exception { @Test public void testRandomEmitAndBufferRecycling() throws Exception { int recordSize = 8; + int numberOfChannels = 2; - final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(2, 2 * recordSize); - final KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider) { - @Override - public int getNumberOfSubpartitions() { - return 2; - } - }; - final BroadcastRecordWriter writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test"); + ResultPartition partition = createResultPartition(2 * recordSize, numberOfChannels); + BufferPool bufferPool = partition.getBufferPool(); + BroadcastRecordWriter writer = new BroadcastRecordWriter<>(partition, -1, "test"); // force materialization of both buffers for easier availability tests - List buffers = Arrays.asList(bufferProvider.requestBuffer(), bufferProvider.requestBuffer()); + List buffers = Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer()); buffers.forEach(Buffer::recycleBuffer); - assertEquals(2, bufferProvider.getNumberOfAvailableBuffers()); + assertEquals(2, bufferPool.getNumberOfAvailableMemorySegments()); // fill first buffer - writer.randomEmit(new IntType(1), 0); + writer.broadcastEmit(new IntType(1)); writer.broadcastEmit(new IntType(2)); - assertEquals(1, bufferProvider.getNumberOfAvailableBuffers()); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); // simulate consumption of first buffer consumer; this should not free buffers - assertEquals(1, partitionWriter.getAddedBufferConsumers(0).size()); - closeConsumer(partitionWriter, 0, 2 * recordSize); - assertEquals(1, bufferProvider.getNumberOfAvailableBuffers()); + assertEquals(1, partition.getNumberOfQueuedBuffers(0)); + ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); + closeConsumer(view0, 2 * recordSize); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); // use second buffer - writer.broadcastEmit(new IntType(3)); - assertEquals(0, bufferProvider.getNumberOfAvailableBuffers()); + writer.emit(new IntType(3), 0); + assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments()); // fully free first buffer - assertEquals(2, partitionWriter.getAddedBufferConsumers(1).size()); - closeConsumer(partitionWriter, 1, recordSize); - assertEquals(1, bufferProvider.getNumberOfAvailableBuffers()); + assertEquals(1, partition.getNumberOfQueuedBuffers(1)); + ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener()); + closeConsumer(view1, 2 * recordSize); + assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments()); } - public void closeConsumer(KeepingPartitionWriter partitionWriter, int subpartitionIndex, int expectedSize) { - BufferConsumer bufferConsumer = partitionWriter.getAddedBufferConsumers(subpartitionIndex).get(0); - Buffer buffer = bufferConsumer.build(); - bufferConsumer.close(); + public void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException { + Buffer buffer = view.getNextBuffer().buffer(); assertEquals(expectedSize, buffer.getSize()); buffer.recycleBuffer(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java index 1285c4e60c02a..4a9f7eeca3e42 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java @@ -21,7 +21,6 @@ import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.types.Record; import java.io.IOException; @@ -39,8 +38,7 @@ public class RecordCollectingResultPartitionWriter extends AbstractCollectingRes private final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{System.getProperty("java.io.tmpdir")}); - public RecordCollectingResultPartitionWriter(List output, BufferProvider bufferProvider) { - super(bufferProvider); + public RecordCollectingResultPartitionWriter(List output) { this.output = checkNotNull(output); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java index 3d3073ba09096..5540a53ea4349 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java @@ -20,11 +20,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.event.AbstractEvent; -import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; @@ -44,35 +42,32 @@ public class RecordOrEventCollectingResultPartitionWriter extends AbstractCol public RecordOrEventCollectingResultPartitionWriter( Collection output, - BufferProvider bufferProvider, TypeSerializer serializer) { - super(bufferProvider); this.output = checkNotNull(output); this.delegate = new NonReusingDeserializationDelegate<>(checkNotNull(serializer)); } + @Override + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { + output.add(event); + } + @Override protected void deserializeBuffer(Buffer buffer) throws IOException { - if (buffer.isBuffer()) { - deserializer.setNextBuffer(buffer); + deserializer.setNextBuffer(buffer); - while (deserializer.hasUnfinishedData()) { - RecordDeserializer.DeserializationResult result = - deserializer.getNextRecord(delegate); + while (deserializer.hasUnfinishedData()) { + RecordDeserializer.DeserializationResult result = + deserializer.getNextRecord(delegate); - if (result.isFullRecord()) { - output.add(delegate.getInstance()); - } + if (result.isFullRecord()) { + output.add(delegate.getInstance()); + } - if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER - || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { - break; - } + if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER + || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) { + break; } - } else { - // is event - AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); - output.add(event); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java index adb059c858140..5a0ea8f4a408d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java @@ -20,28 +20,25 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; +import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; +import org.apache.flink.types.IntValue; import org.apache.flink.util.TestLogger; import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -51,16 +48,17 @@ */ public class RecordWriterDelegateTest extends TestLogger { + private static final int recordSize = 8; + private static final int numberOfBuffers = 10; private static final int memorySegmentSize = 128; - private static final int numberOfSegmentsToRequest = 2; - private NetworkBufferPool globalPool; @Before public void setup() { + assertEquals("Illegal memory segment size,", 0, memorySegmentSize % recordSize); globalPool = new NetworkBufferPool(numberOfBuffers, memorySegmentSize); } @@ -103,11 +101,11 @@ public void testMultipleRecordWritersAvailability() throws Exception { @SuppressWarnings("unchecked") public void testSingleRecordWriterBroadcastEvent() throws Exception { // setup - final ArrayDeque[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() }; - final RecordWriter recordWriter = createRecordWriter(queues); + final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2); + final RecordWriter recordWriter = new RecordWriterBuilder<>().build(partition); final RecordWriterDelegate writerDelegate = new SingleRecordWriter(recordWriter); - verifyBroadcastEvent(writerDelegate, queues, 1); + verifyBroadcastEvent(writerDelegate, Collections.singletonList(partition)); } @Test @@ -116,14 +114,16 @@ public void testMultipleRecordWritersBroadcastEvent() throws Exception { // setup final int numRecordWriters = 2; final List recordWriters = new ArrayList<>(numRecordWriters); - final ArrayDeque[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() }; + final List partitions = new ArrayList<>(numRecordWriters); for (int i = 0; i < numRecordWriters; i++) { - recordWriters.add(createRecordWriter(queues)); + final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2); + partitions.add(partition); + recordWriters.add(new RecordWriterBuilder<>().build(partition)); } final RecordWriterDelegate writerDelegate = new MultipleRecordWriters(recordWriters); - verifyBroadcastEvent(writerDelegate, queues, numRecordWriters); + verifyBroadcastEvent(writerDelegate, partitions); } private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception { @@ -136,14 +136,6 @@ private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exc return new RecordWriterBuilder().build(partition); } - private RecordWriter createRecordWriter(ArrayDeque[] queues) { - final ResultPartitionWriter partition = new RecordWriterTest.CollectingPartitionWriter( - queues, - new TestPooledBufferProvider(1)); - - return new RecordWriterBuilder().build(partition); - } - private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception { // writer is available at the beginning assertTrue(writerDelegate.isAvailable()); @@ -151,13 +143,14 @@ private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exce // request one buffer from the local pool to make it unavailable RecordWriter recordWriter = writerDelegate.getRecordWriter(0); - final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0)); + for (int i = 0; i < memorySegmentSize / recordSize; ++i) { + recordWriter.emit(new IntValue(i)); + } assertFalse(writerDelegate.isAvailable()); CompletableFuture future = writerDelegate.getAvailableFuture(); assertFalse(future.isDone()); // recycle the buffer to make the local pool available again - BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish(); ResultSubpartitionView readView = recordWriter.getTargetPartition().createSubpartitionView(0, new NoOpBufferAvailablityListener()); Buffer buffer = readView.getNextBuffer().buffer(); @@ -169,18 +162,18 @@ private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exce private void verifyBroadcastEvent( RecordWriterDelegate writerDelegate, - ArrayDeque[] queues, - int numRecordWriters) throws Exception { + List partitions) throws Exception { final CancelCheckpointMarker message = new CancelCheckpointMarker(1); writerDelegate.broadcastEvent(message); // verify the added messages in all the queues - for (int i = 0; i < queues.length; i++) { - assertEquals(numRecordWriters, queues[i].size()); + for (ResultPartition partition : partitions) { + for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) { + assertEquals(1, partition.getNumberOfQueuedBuffers(i)); - for (int j = 0; j < numRecordWriters; j++) { - BufferOrEvent boe = RecordWriterTest.parseBuffer(queues[i].remove(), i); + ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()); + BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i); assertTrue(boe.isEvent()); assertEquals(message, boe.getEvent()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 6cad1d3af1c82..a0956ec23d837 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -27,34 +27,33 @@ import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.AbstractEvent; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.BufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; -import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition; import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition; import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView; +import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionTest; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.util.DeserializationUtils; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.operators.shipping.OutputEmitter; import org.apache.flink.runtime.operators.shipping.ShipStrategyType; import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator; @@ -65,7 +64,6 @@ import org.apache.flink.types.IntValue; import org.apache.flink.util.XORShiftRandom; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -74,19 +72,13 @@ import java.io.IOException; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Queue; import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer; +import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; /** @@ -119,35 +111,27 @@ public void testBroadcastEventNoRecords() throws Exception { int numberOfChannels = 4; int bufferSize = 32; - @SuppressWarnings("unchecked") - Queue[] queues = new Queue[numberOfChannels]; - for (int i = 0; i < numberOfChannels; i++) { - queues[i] = new ArrayDeque<>(); - } - - TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); - - ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); - RecordWriter writer = createRecordWriter(partitionWriter); + ResultPartition partition = createResultPartition(bufferSize, numberOfChannels); + RecordWriter writer = createRecordWriter(partition); CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation()); // No records emitted yet, broadcast should not request a buffer writer.broadcastEvent(barrier); - assertEquals(0, bufferProvider.getNumberOfCreatedBuffers()); + assertEquals(0, partition.getBufferPool().bestEffortGetNumOfUsedBuffers()); for (int i = 0; i < numberOfChannels; i++) { - assertEquals(1, queues[i].size()); - BufferOrEvent boe = parseBuffer(queues[i].remove(), i); + assertEquals(1, partition.getNumberOfQueuedBuffers(i)); + ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()); + BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i); assertTrue(boe.isEvent()); assertEquals(barrier, boe.getEvent()); - assertEquals(0, queues[i].size()); + assertFalse(view.isAvailable(Integer.MAX_VALUE)); } } /** - * Tests broadcasting events when records have been emitted. The emitted - * records cover all three {@link SerializationResult} types. + * Tests broadcasting events when records have been emitted. */ @Test public void testBroadcastEventMixedRecords() throws Exception { @@ -156,16 +140,8 @@ public void testBroadcastEventMixedRecords() throws Exception { int bufferSize = 32; int lenBytes = 4; // serialized length - @SuppressWarnings("unchecked") - Queue[] queues = new Queue[numberOfChannels]; - for (int i = 0; i < numberOfChannels; i++) { - queues[i] = new ArrayDeque<>(); - } - - TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); - - ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); - RecordWriter writer = createRecordWriter(partitionWriter); + ResultPartition partition = createResultPartition(bufferSize, numberOfChannels); + RecordWriter writer = createRecordWriter(partition); CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation()); // Emit records on some channels first (requesting buffers), then @@ -194,34 +170,43 @@ public void testBroadcastEventMixedRecords() throws Exception { writer.broadcastEvent(barrier); if (isBroadcastWriter) { - assertEquals(3, bufferProvider.getNumberOfCreatedBuffers()); + assertEquals(3, partition.getBufferPool().bestEffortGetNumOfUsedBuffers()); for (int i = 0; i < numberOfChannels; i++) { - assertEquals(4, queues[i].size()); // 3 buffer + 1 event + assertEquals(4, partition.getNumberOfQueuedBuffers(i)); // 3 buffer + 1 event + ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()); for (int j = 0; j < 3; j++) { - assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer()); + assertTrue(parseBuffer(view.getNextBuffer().buffer(), 0).isBuffer()); } - BufferOrEvent boe = parseBuffer(queues[i].remove(), i); + BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i); assertTrue(boe.isEvent()); assertEquals(barrier, boe.getEvent()); } } else { - assertEquals(4, bufferProvider.getNumberOfCreatedBuffers()); + assertEquals(4, partition.getBufferPool().bestEffortGetNumOfUsedBuffers()); + ResultSubpartitionView[] views = new ResultSubpartitionView[4]; + + assertEquals(2, partition.getNumberOfQueuedBuffers(0)); // 1 buffer + 1 event + views[0] = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); + assertTrue(parseBuffer(views[0].getNextBuffer().buffer(), 0).isBuffer()); - assertEquals(2, queues[0].size()); // 1 buffer + 1 event - assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer()); - assertEquals(3, queues[1].size()); // 2 buffers + 1 event - assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer()); - assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer()); - assertEquals(2, queues[2].size()); // 1 buffer + 1 event - assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer()); - assertEquals(1, queues[3].size()); // 0 buffers + 1 event + assertEquals(3, partition.getNumberOfQueuedBuffers(1)); // 2 buffers + 1 event + views[1] = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener()); + assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer()); + assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer()); + + assertEquals(2, partition.getNumberOfQueuedBuffers(2)); // 1 buffer + 1 event + views[2] = partition.createSubpartitionView(2, new NoOpBufferAvailablityListener()); + assertTrue(parseBuffer(views[2].getNextBuffer().buffer(), 2).isBuffer()); + + views[3] = partition.createSubpartitionView(3, new NoOpBufferAvailablityListener()); + assertEquals(1, partition.getNumberOfQueuedBuffers(3)); // 0 buffers + 1 event // every queue's last element should be the event for (int i = 0; i < numberOfChannels; i++) { - BufferOrEvent boe = parseBuffer(queues[i].remove(), i); + BufferOrEvent boe = parseBuffer(views[i].getNextBuffer().buffer(), i); assertTrue(boe.isEvent()); assertEquals(barrier, boe.getEvent()); } @@ -234,31 +219,28 @@ public void testBroadcastEventMixedRecords() throws Exception { */ @Test public void testBroadcastEventBufferReferenceCounting() throws Exception { + int bufferSize = 32 * 1024; + int numSubpartitions = 2; - @SuppressWarnings("unchecked") - ArrayDeque[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() }; - - ResultPartitionWriter partition = - new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE)); + ResultPartition partition = createResultPartition(bufferSize, numSubpartitions); RecordWriter writer = createRecordWriter(partition); writer.broadcastEvent(EndOfPartitionEvent.INSTANCE); - // Verify added to all queues - assertEquals(1, queues[0].size()); - assertEquals(1, queues[1].size()); - // get references to buffer consumers (copies from the original event buffer consumer) - BufferConsumer bufferConsumer1 = queues[0].getFirst(); - BufferConsumer bufferConsumer2 = queues[1].getFirst(); + Buffer[] buffers = new Buffer[numSubpartitions]; // process all collected events (recycles the buffer) - for (int i = 0; i < queues.length; i++) { - assertTrue(parseBuffer(queues[i].remove(), i).isEvent()); + for (int i = 0; i < numSubpartitions; i++) { + assertEquals(1, partition.getNumberOfQueuedBuffers(i)); + ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()); + buffers[i] = view.getNextBuffer().buffer(); + assertTrue(parseBuffer(buffers[i], i).isEvent()); } - assertTrue(bufferConsumer1.isRecycled()); - assertTrue(bufferConsumer2.isRecycled()); + for (int i = 0; i < numSubpartitions; ++i) { + assertTrue(buffers[i].isRecycled()); + } } /** @@ -289,15 +271,8 @@ public void testBroadcastEmitRecord() throws Exception { final int numValues = 8; final int serializationLength = 4; - @SuppressWarnings("unchecked") - final Queue[] queues = new Queue[numberOfChannels]; - for (int i = 0; i < numberOfChannels; i++) { - queues[i] = new ArrayDeque<>(); - } - - final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize); - final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider); - final RecordWriter writer = createRecordWriter(partitionWriter); + final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels); + final RecordWriter writer = createRecordWriter(partition); final RecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>( new String[]{ tempFolder.getRoot().getAbsolutePath() }); @@ -310,14 +285,15 @@ public void testBroadcastEmitRecord() throws Exception { final int numRequiredBuffers = numValues / (bufferSize / (4 + serializationLength)); if (isBroadcastWriter) { - assertEquals(numRequiredBuffers, bufferProvider.getNumberOfCreatedBuffers()); + assertEquals(numRequiredBuffers, partition.getBufferPool().bestEffortGetNumOfUsedBuffers()); } else { - assertEquals(numRequiredBuffers * numberOfChannels, bufferProvider.getNumberOfCreatedBuffers()); + assertEquals(numRequiredBuffers * numberOfChannels, partition.getBufferPool().bestEffortGetNumOfUsedBuffers()); } for (int i = 0; i < numberOfChannels; i++) { - assertEquals(numRequiredBuffers, queues[i].size()); - verifyDeserializationResults(queues[i], deserializer, serializedRecords.clone(), numRequiredBuffers, numValues); + assertEquals(numRequiredBuffers, partition.getNumberOfQueuedBuffers(i)); + ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()); + verifyDeserializationResults(view, deserializer, serializedRecords.clone(), numRequiredBuffers, numValues); } } @@ -345,7 +321,7 @@ public void testIsAvailableOrNot() throws Exception { assertTrue(recordWriter.getAvailableFuture().isDone()); // request one buffer from the local pool to make it unavailable afterwards - final BufferBuilder bufferBuilder = resultPartition.getBufferBuilder(0); + final BufferBuilder bufferBuilder = localPool.requestBufferBuilder(0); assertNotNull(bufferBuilder); assertFalse(recordWriter.getAvailableFuture().isDone()); @@ -418,64 +394,8 @@ public void testEmitRecordWithPartitionStateRecovery() throws Exception { } } - @Test - public void testIdleTime() throws IOException, InterruptedException { - // setup - final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128); - final BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE); - final ResultPartitionWriter resultPartition = new ResultPartitionBuilder() - .setBufferPoolFactory(p -> localPool) - .build(); - resultPartition.setup(); - final ResultPartitionWriter partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator( - new NoOpTaskActions(), - new JobID(), - resultPartition, - new NoOpResultPartitionConsumableNotifier()); - final RecordWriter recordWriter = createRecordWriter(partitionWrapper); - BufferBuilder builder = recordWriter.requestNewBufferBuilder(0); - BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish(); - ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); - Buffer buffer = readView.getNextBuffer().buffer(); - - // idle time is zero when there is buffer available. - assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount()); - - CountDownLatch syncLock = new CountDownLatch(1); - AtomicReference asyncRequestResult = new AtomicReference<>(); - final Thread requestThread = new Thread(new Runnable() { - @Override - public void run() { - try { - // notify that the request thread start to run. - syncLock.countDown(); - // wait for buffer. - asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0)); - } catch (Exception e) { - } - } - }); - requestThread.start(); - - // wait until request thread start to run. - syncLock.await(); - - Thread.sleep(10); - - //recycle the buffer - buffer.recycleBuffer(); - requestThread.join(); - - assertThat(recordWriter.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L)); - assertNotNull(asyncRequestResult.get()); - } - private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception { - @SuppressWarnings("unchecked") - ArrayDeque[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()}; - - ResultPartitionWriter partition = - new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE)); + ResultPartition partition = createResultPartition(4096, 2); RecordWriter writer = createRecordWriter(partition); if (broadcastEvent) { @@ -485,12 +405,15 @@ private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) th } // verify added to all queues - assertEquals(1, queues[0].size()); - assertEquals(1, queues[1].size()); + assertEquals(1, partition.getNumberOfQueuedBuffers(0)); + assertEquals(1, partition.getNumberOfQueuedBuffers(1)); + + ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); + ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener()); // these two buffers may share the memory but not the indices! - Buffer buffer1 = buildSingleBuffer(queues[0].remove()); - Buffer buffer2 = buildSingleBuffer(queues[1].remove()); + Buffer buffer1 = view0.getNextBuffer().buffer(); + Buffer buffer2 = view1.getNextBuffer().buffer(); assertEquals(0, buffer1.getReaderIndex()); assertEquals(0, buffer2.getReaderIndex()); buffer1.setReaderIndex(1); @@ -498,18 +421,19 @@ private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) th } protected void verifyDeserializationResults( - Queue queue, + ResultSubpartitionView view, RecordDeserializer deserializer, ArrayDeque expectedRecords, int numRequiredBuffers, int numValues) throws Exception { int assertRecords = 0; for (int j = 0; j < numRequiredBuffers; j++) { - Buffer buffer = buildSingleBuffer(queue.remove()); + Buffer buffer = view.getNextBuffer().buffer(); deserializer.setNextBuffer(buffer); assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer); } + assertFalse(view.isAvailable(Integer.MAX_VALUE)); Assert.assertEquals(numValues, assertRecords); } @@ -526,51 +450,18 @@ private RecordWriter createRecordWriter(ResultPartitionWriter writer) { } } + public static ResultPartition createResultPartition(int bufferSize, int numSubpartitions) throws IOException { + NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setBufferSize(bufferSize).build(); + ResultPartition partition = createPartition(env, ResultPartitionType.PIPELINED, numSubpartitions); + partition.setup(); + return partition; + } + // --------------------------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------------------------- - /** - * Partition writer that collects the added buffers/events in multiple queue. - */ - static class CollectingPartitionWriter extends MockResultPartitionWriter { - private final Queue[] queues; - private final BufferProvider bufferProvider; - - /** - * Create the partition writer. - * - * @param queues one queue per outgoing channel - * @param bufferProvider buffer provider - */ - CollectingPartitionWriter(Queue[] queues, BufferProvider bufferProvider) { - this.queues = queues; - this.bufferProvider = bufferProvider; - } - - @Override - public int getNumberOfSubpartitions() { - return queues.length; - } - - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - return bufferProvider.requestBufferBuilderBlocking(targetChannel); - } - - @Override - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - return bufferProvider.requestBufferBuilder(targetChannel); - } - - @Override - public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel) { - return queues[targetChannel].add(buffer); - } - } - - static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException { - Buffer buffer = buildSingleBuffer(bufferConsumer); + static BufferOrEvent parseBuffer(Buffer buffer, int targetChannel) throws IOException { if (buffer.isBuffer()) { return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel)); } else { @@ -581,68 +472,6 @@ static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChanne } } - /** - * Partition writer that recycles all received buffers and does no further processing. - */ - private static class RecyclingPartitionWriter extends MockResultPartitionWriter { - private final BufferProvider bufferProvider; - - private RecyclingPartitionWriter(BufferProvider bufferProvider) { - this.bufferProvider = bufferProvider; - } - - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - return bufferProvider.requestBufferBuilderBlocking(targetChannel); - } - - @Override - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - return bufferProvider.requestBufferBuilder(targetChannel); - } - } - - static class KeepingPartitionWriter extends MockResultPartitionWriter { - private final BufferProvider bufferProvider; - private Map> produced = new HashMap<>(); - - KeepingPartitionWriter(BufferProvider bufferProvider) { - this.bufferProvider = bufferProvider; - } - - @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - return bufferProvider.requestBufferBuilderBlocking(targetChannel); - } - - @Override - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - return bufferProvider.requestBufferBuilder(targetChannel); - } - - @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) { - // keep the buffer occupied. - produced.putIfAbsent(targetChannel, new ArrayList<>()); - produced.get(targetChannel).add(bufferConsumer); - return true; - } - - public List getAddedBufferConsumers(int subpartitionIndex) { - return produced.get(subpartitionIndex); - } - - @Override - public void close() { - for (List bufferConsumers : produced.values()) { - for (BufferConsumer bufferConsumer : bufferConsumers) { - bufferConsumer.close(); - } - } - produced.clear(); - } - } - private static class ByteArrayIO implements IOReadableWritable { private final byte[] bytes; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java index 3b6a2f5452092..f9616bd7097d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java @@ -21,22 +21,21 @@ import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; +import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.NetworkSequenceViewReader; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener; import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView; -import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition; import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest; import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView; import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.ResultSubpartition; import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; @@ -55,9 +54,11 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer; +import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; @@ -478,14 +479,12 @@ private void testCancelPartitionRequest(boolean isAvailableView) throws Exceptio } private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception { - final ResultPartition partition = new ResultPartitionBuilder() - .setResultPartitionType(ResultPartitionType.BLOCKING) - .setFileChannelManager(fileChannelManager) - .setResultPartitionManager(partitionManager) - .build(); - - partitionManager.registerResultPartition(partition); - PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE); + NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(partitionManager).build(); + ResultPartition partition = createPartition(environment, fileChannelManager, ResultPartitionType.BLOCKING, 1); + + partition.setup(); + partition.emitRecord(ByteBuffer.allocate(BUFFER_SIZE), 0); + partition.finish(); return partition; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java index ad21bf4e6358e..220225e527718 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; /** @@ -54,14 +55,15 @@ public int getNumTargetKeyGroups() { } @Override - public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException { - bufferConsumer.close(); - return true; + public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException { } @Override - public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException { - throw new UnsupportedOperationException(); + public void broadcastRecord(ByteBuffer record) throws IOException { + } + + @Override + public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException { } @Override @@ -69,8 +71,8 @@ public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabili throw new UnsupportedOperationException(); } - public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException { - throw new UnsupportedOperationException(); + @Override + public void setMetricGroup(TaskIOMetricGroup metrics) { } @Override @@ -89,6 +91,20 @@ public void fail(@Nullable Throwable throwable) { public void finish() { } + @Override + public boolean isFinished() { + return false; + } + + @Override + public void release(Throwable cause) { + } + + @Override + public boolean isReleased() { + return false; + } + @Override public CompletableFuture getAvailableFuture() { return AVAILABLE; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java index 462cc4bc0362b..1ee8dd7ae77be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -39,6 +38,8 @@ import org.junit.ClassRule; import org.junit.Test; +import java.nio.ByteBuffer; + /** * Test for consuming a pipelined result only partially. */ @@ -118,10 +119,8 @@ public void invoke() throws Exception { final ResultPartitionWriter writer = getEnvironment().getWriter(0); for (int i = 0; i < 8; i++) { - final BufferBuilder bufferBuilder = writer.getBufferBuilder(0); - writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0); + writer.emitRecord(ByteBuffer.allocate(1024), 0); Thread.sleep(50); - bufferBuilder.finish(); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index a545b737df823..c6d7819521820 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -21,7 +21,6 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; -import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; @@ -31,7 +30,6 @@ import java.io.IOException; -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -131,14 +129,4 @@ public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescr 1, true); } - - public static void writeBuffers( - ResultPartitionWriter partition, - int numberOfBuffers, - int bufferSize) throws IOException { - for (int i = 0; i < numberOfBuffers; i++) { - partition.addBufferConsumer(createFilledFinishedBufferConsumer(bufferSize), 0); - } - partition.finish(); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java index 95aee0079ccf8..c806a197a8596 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java @@ -23,12 +23,9 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; -import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.util.TestConsumerCallback; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.io.network.util.TestProducerSource; import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer; import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer; @@ -39,7 +36,6 @@ import org.junit.Assume; import org.junit.Test; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -47,7 +43,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer; -import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -141,41 +136,32 @@ public void testConcurrentSlowProduceAndSlowConsume() throws Exception { private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception { // Config - final int producerBufferPoolSize = 8; final int producerNumberOfBuffersToProduce = 128; + final int bufferSize = 32 * 1024; // Producer behaviour final TestProducerSource producerSource = new TestProducerSource() { - private BufferProvider bufferProvider = new TestPooledBufferProvider(producerBufferPoolSize); - private int numberOfBuffers; @Override - public BufferConsumerAndChannel getNextBufferConsumer() throws Exception { + public BufferAndChannel getNextBuffer() throws Exception { if (numberOfBuffers == producerNumberOfBuffersToProduce) { return null; } - final BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking(); - final BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); - int segmentSize = bufferBuilder.getMaxCapacity(); - - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize); + MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize); - int next = numberOfBuffers * (segmentSize / Integer.BYTES); + int next = numberOfBuffers * (bufferSize / Integer.BYTES); - for (int i = 0; i < segmentSize; i += 4) { + for (int i = 0; i < bufferSize; i += 4) { segment.putInt(i, next); next++; } - checkState(bufferBuilder.appendAndCommit(ByteBuffer.wrap(segment.getArray())) == segmentSize); - bufferBuilder.finish(); - numberOfBuffers++; - return new BufferConsumerAndChannel(bufferConsumer, 0); + return new BufferAndChannel(segment.getArray(), 0); } }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index e29342219aa98..d221e99ef1566 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -26,49 +26,46 @@ import org.apache.flink.runtime.io.disk.FileChannelManagerImpl; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator; import org.apache.flink.runtime.taskmanager.NoOpTaskActions; import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.concurrent.FutureConsumerWithException; +import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; /** * Tests for {@link ResultPartition}. @@ -79,6 +76,8 @@ public class ResultPartitionTest { private static FileChannelManager fileChannelManager; + private final int bufferSize = 1024; + @BeforeClass public static void setUp() { fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing"); @@ -115,32 +114,45 @@ public void testResultSubpartitionInfo() { */ @Test public void testSendScheduleOrUpdateConsumersMessage() throws Exception { + FutureConsumerWithException[] notificationCalls = new FutureConsumerWithException[] { + writer -> ((ResultPartitionWriter) writer).finish(), + writer -> ((ResultPartitionWriter) writer).emitRecord(ByteBuffer.allocate(bufferSize), 0), + writer -> ((ResultPartitionWriter) writer).broadcastEvent(EndOfPartitionEvent.INSTANCE, false), + writer -> ((ResultPartitionWriter) writer).broadcastRecord(ByteBuffer.allocate(bufferSize)) + }; + + for (FutureConsumerWithException notificationCall: notificationCalls) { + testSendScheduleOrUpdateConsumersMessage(notificationCall); + } + } + + private void testSendScheduleOrUpdateConsumersMessage( + FutureConsumerWithException notificationCall) throws Exception { JobID jobId = new JobID(); TaskActions taskActions = new NoOpTaskActions(); { // Pipelined, send message => notify - ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier(); ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter( ResultPartitionType.PIPELINED, taskActions, jobId, notifier); - consumableNotifyingPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0); - verify(notifier, times(1)) - .notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions)); + notificationCall.accept(consumableNotifyingPartitionWriter); + notifier.check(jobId, consumableNotifyingPartitionWriter.getPartitionId(), taskActions, 1); } { // Blocking, send message => don't notify - ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier(); ResultPartitionWriter partition = createConsumableNotifyingResultPartitionWriter( ResultPartitionType.BLOCKING, taskActions, jobId, notifier); - partition.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0); - verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions)); + notificationCall.accept(partition); + notifier.check(null, null, null, 0); } } @@ -181,38 +193,32 @@ public void testBlockingPartitionIsConsumableMultipleTimesIfNotReleasedOnConsump } /** - * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished. + * Tests {@link ResultPartition#emitRecord} on a partition which has already finished. * * @param partitionType the result partition type to set up */ private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception { - BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE); - ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); - JobID jobId = new JobID(); - TaskActions taskActions = new NoOpTaskActions(); - ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter( - partitionType, - taskActions, - jobId, - notifier); + TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier(); + BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType); + ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate( + Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), + new ResultPartitionWriter[]{bufferWritingResultPartition}, + new NoOpTaskActions(), + new JobID(), + notifier)[0]; try { - consumableNotifyingPartitionWriter.finish(); - reset(notifier); - // partition.add() should fail - consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0); - Assert.fail("exception expected"); + partitionWriter.finish(); + notifier.reset(); + // partitionWriter.emitRecord() should fail + partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); } catch (IllegalStateException e) { // expected => ignored } finally { - if (!bufferConsumer.isRecycled()) { - bufferConsumer.close(); - Assert.fail("bufferConsumer not recycled"); - } + assertEquals(0, bufferWritingResultPartition.numBuffersOut.getCount()); + assertEquals(0, bufferWritingResultPartition.numBytesOut.getCount()); + assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()); // should not have notified either - verify(notifier, never()).notifyPartitionConsumable( - eq(jobId), - eq(consumableNotifyingPartitionWriter.getPartitionId()), - eq(taskActions)); + notifier.check(null, null, null, 0); } } @@ -227,35 +233,30 @@ public void testAddOnReleasedBlockingPartition() throws Exception { } /** - * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released. + * Tests {@link ResultPartition#emitRecord} on a partition which has already been released. * * @param partitionType the result partition type to set up */ - private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception { - BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE); - ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); - JobID jobId = new JobID(); - TaskActions taskActions = new NoOpTaskActions(); - ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ? - createPartition(partitionType, fileChannelManager) : createPartition(partitionType); - ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate( + private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception { + TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier(); + BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType); + ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate( Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), - new ResultPartitionWriter[] {partition}, - taskActions, - jobId, + new ResultPartitionWriter[]{bufferWritingResultPartition}, + new NoOpTaskActions(), + new JobID(), notifier)[0]; try { - partition.release(); - // partition.add() silently drops the bufferConsumer but recycles it - consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0); - assertTrue(partition.isReleased()); + partitionWriter.release(null); + // partitionWriter.emitRecord() should silently drop the given record + partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); } finally { - if (!bufferConsumer.isRecycled()) { - bufferConsumer.close(); - Assert.fail("bufferConsumer not recycled"); - } + assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount()); + assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount()); + // the buffer should be recycled for the result partition has already been released + assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()); // should not have notified either - verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions)); + notifier.check(null, null, null, 0); } } @@ -289,32 +290,31 @@ public void testCreateSubpartitionOnFailingPartition() throws Exception { } /** - * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition. + * Tests {@link ResultPartition#emitRecord} on a working partition. * * @param partitionType the result partition type to set up */ private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception { - ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); + TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier(); JobID jobId = new JobID(); TaskActions taskActions = new NoOpTaskActions(); - ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter( - partitionType, + BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType); + ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate( + Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), + new ResultPartitionWriter[]{bufferWritingResultPartition}, taskActions, jobId, - notifier); - BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE); + notifier)[0]; try { - // partition.add() adds the bufferConsumer without recycling it (if not spilling) - consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0); - assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled()); + // partitionWriter.emitRecord() will allocate a new buffer and copies the record to it + partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0); } finally { - if (!bufferConsumer.isRecycled()) { - bufferConsumer.close(); - } + assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount()); + assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount()); + assertEquals(1, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers()); // should have been notified for pipelined partitions if (partitionType.isPipelined()) { - verify(notifier, times(1)) - .notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions)); + notifier.check(jobId, partitionWriter.getPartitionId(), taskActions, 1); } } } @@ -332,15 +332,14 @@ public void testReleaseMemoryOnPipelinedPartition() throws Exception { private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception { final int numAllBuffers = 10; final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() - .setNumNetworkBuffers(numAllBuffers).build(); + .setNumNetworkBuffers(numAllBuffers).setBufferSize(bufferSize).build(); final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1); try { resultPartition.setup(); // take all buffers (more than the minimum required) for (int i = 0; i < numAllBuffers; ++i) { - BufferBuilder bufferBuilder = resultPartition.getBufferPool().requestBufferBuilderBlocking(); - resultPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0); + resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); } resultPartition.finish(); @@ -366,10 +365,12 @@ private void testReleaseMemory(final ResultPartitionType resultPartitionType) th * Tests {@link ResultPartition#getAvailableFuture()}. */ @Test - public void testIsAvailableOrNot() throws IOException, InterruptedException { + public void testIsAvailableOrNot() throws IOException { final int numAllBuffers = 10; + final int bufferSize = 1024; final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() - .setNumNetworkBuffers(numAllBuffers).build(); + .setNumNetworkBuffers(numAllBuffers) + .setBufferSize(bufferSize).build(); final ResultPartition resultPartition = createPartition(network, ResultPartitionType.PIPELINED, 1); try { @@ -379,8 +380,8 @@ public void testIsAvailableOrNot() throws IOException, InterruptedException { assertTrue(resultPartition.getAvailableFuture().isDone()); - resultPartition.getBufferBuilder(0); - resultPartition.getBufferBuilder(0); + resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); + resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); assertFalse(resultPartition.getAvailableFuture().isDone()); } finally { resultPartition.release(); @@ -434,17 +435,25 @@ private ResultPartitionWriter createConsumableNotifyingResultPartitionWriter( ResultPartitionType partitionType, TaskActions taskActions, JobID jobId, - ResultPartitionConsumableNotifier notifier) { - ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ? - createPartition(partitionType, fileChannelManager) : createPartition(partitionType); + ResultPartitionConsumableNotifier notifier) throws IOException { + ResultPartition partition = createResultPartition(partitionType); return ConsumableNotifyingResultPartitionWriterDecorator.decorate( Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)), - new ResultPartitionWriter[] {partition}, + new ResultPartitionWriter[]{partition}, taskActions, jobId, notifier)[0]; } + private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException { + NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder() + .setNumNetworkBuffers(10) + .setBufferSize(bufferSize).build(); + ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 1); + resultPartition.setup(); + return (BufferWritingResultPartition) resultPartition; + } + @Test public void testInitializeEmptyState() throws Exception { final int totalBuffers = 2; @@ -558,6 +567,50 @@ public void testReadRecoveredStateWithException() throws Exception { } } + @Test + public void testIdleTime() throws IOException, InterruptedException { + // setup + int bufferSize = 1024; + NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize); + BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE); + BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder() + .setBufferPoolFactory(p -> localPool) + .build(); + resultPartition.setup(); + + resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); + ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener()); + Buffer buffer = readView.getNextBuffer().buffer(); + assertNotNull(buffer); + + // idle time is zero when there is buffer available. + assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount()); + + CountDownLatch syncLock = new CountDownLatch(1); + final Thread requestThread = new Thread(() -> { + try { + // notify that the request thread start to run. + syncLock.countDown(); + // wait for buffer. + resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0); + } catch (Exception e) { + } + }); + requestThread.start(); + + // wait until request thread start to run. + syncLock.await(); + + Thread.sleep(100); + + //recycle the buffer + buffer.recycleBuffer(); + requestThread.join(); + + Assert.assertThat(resultPartition.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L)); + assertNotNull(readView.getNextBuffer().buffer()); + } + /** * The {@link ChannelStateReader} instance for restoring the specific number of states. */ @@ -631,4 +684,33 @@ public ReadResult readOutputData(ResultSubpartitionInfo info, BufferBuilder buff public void close() { } } + + private static class TestResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier { + private JobID jobID; + private ResultPartitionID partitionID; + private TaskActions taskActions; + private int numNotification; + + @Override + public void notifyPartitionConsumable(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions) { + ++numNotification; + this.jobID = jobID; + this.partitionID = partitionID; + this.taskActions = taskActions; + } + + private void check(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions, int numNotification) { + assertEquals(jobID, this.jobID); + assertEquals(partitionID, this.partitionID); + assertEquals(taskActions, this.taskActions); + assertEquals(numNotification, this.numNotification); + } + + private void reset() { + jobID = null; + partitionID = null; + taskActions = null; + numNotification = 0; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java index 4a5f445262b86..a4985cb58ee1a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java @@ -19,10 +19,10 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -32,6 +32,7 @@ import org.apache.flink.util.MutableObjectIterator; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Optional; import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder; @@ -49,7 +50,7 @@ public class IteratorWrappingTestSingleInputGate e private MutableObjectIterator inputIterator; - private RecordSerializer serializer; + private DataOutputSerializer serializer; private final T reuse; @@ -68,7 +69,7 @@ public IteratorWrappingTestSingleInputGate( private IteratorWrappingTestSingleInputGate wrapIterator(MutableObjectIterator iterator) throws IOException, InterruptedException { inputIterator = iterator; - serializer = new SpanningRecordSerializer(); + serializer = new DataOutputSerializer(128); // The input iterator can produce an infinite stream. That's why we have to serialize each // record on demand and cannot do it upfront. @@ -79,10 +80,10 @@ private IteratorWrappingTestSingleInputGate wrapIterator(MutableObjectIterato @Override public Optional getBufferAvailability() throws IOException { if (hasData) { - serializer.serializeRecord(reuse); + ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, reuse); BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); - serializer.copyToBufferBuilder(bufferBuilder); + bufferBuilder.appendAndCommit(serializedRecord); hasData = inputIterator.next(reuse) != null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java index 6e65ad74553c1..90cbc6faf79a9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java @@ -28,13 +28,12 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; -import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.BufferProvider; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition; @@ -60,7 +59,6 @@ import org.mockito.stubbing.Answer; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -135,8 +133,8 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { .setNumberOfSubpartitions(parallelism) .setNumTargetKeyGroups(parallelism) .setResultPartitionManager(partitionManager) - .setBufferPoolFactory(p -> - networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize)) + .setBufferPoolFactory(p -> networkBuffers.createBufferPool( + producerBufferPoolSize, producerBufferPoolSize, null, parallelism, Integer.MAX_VALUE)) .build(); // Create a buffer pool for this partition @@ -144,11 +142,11 @@ public void testConcurrentConsumeMultiplePartitions() throws Exception { // Create the producer partitionProducers[i] = new TestPartitionProducer( - partition, + (BufferWritingResultPartition) partition, false, new TestPartitionProducerBufferSource( parallelism, - partition.getBufferPool(), + TestBufferFactory.BUFFER_SIZE, numberOfBuffersPerChannel) ); } @@ -519,16 +517,16 @@ private static ResultSubpartitionView createResultSubpartitionView(boolean addBu */ private static class TestPartitionProducerBufferSource implements TestProducerSource { - private final BufferProvider bufferProvider; + private final int bufferSize; private final List channelIndexes; public TestPartitionProducerBufferSource( int parallelism, - BufferProvider bufferProvider, + int bufferSize, int numberOfBuffersToProduce) { - this.bufferProvider = bufferProvider; + this.bufferSize = bufferSize; this.channelIndexes = Lists.newArrayListWithCapacity( parallelism * numberOfBuffersToProduce); @@ -544,14 +542,10 @@ public TestPartitionProducerBufferSource( } @Override - public BufferConsumerAndChannel getNextBufferConsumer() throws Exception { + public BufferAndChannel getNextBuffer() throws Exception { if (channelIndexes.size() > 0) { final int channelIndex = channelIndexes.remove(0); - BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking(); - BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); - bufferBuilder.appendAndCommit(ByteBuffer.wrap(new byte[4])); - bufferBuilder.finish(); - return new BufferConsumerAndChannel(bufferConsumer, channelIndex); + return new BufferAndChannel(new byte[bufferSize], channelIndex); } return null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 3787627ddf02f..d16ee7f6b33cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.io.network.TestingConnectionManager; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest; -import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; +import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition; import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils; import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView; import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; @@ -753,7 +753,7 @@ public void testUpdateUnknownInputChannel() throws Exception { public void testQueuedBuffers() throws Exception { final NettyShuffleEnvironment network = createNettyShuffleEnvironment(); - final ResultPartition resultPartition = new ResultPartitionBuilder() + final BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder() .setResultPartitionManager(network.getResultPartitionManager()) .setupBufferPoolFactoryFromNettyShuffleEnvironment(network) .build(); @@ -784,7 +784,7 @@ public void testQueuedBuffers() throws Exception { remoteInputChannel.onBuffer(createBuffer(1), 0, 0); assertEquals(1, inputGate.getNumberOfQueuedBuffers()); - resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1), 0); + resultPartition.emitRecord(ByteBuffer.allocate(1), 0); assertEquals(2, inputGate.getNumberOfQueuedBuffers()); } finally { resultPartition.release(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java index 2dfb4c9e788d5..9d944028384f1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java @@ -18,9 +18,10 @@ package org.apache.flink.runtime.io.network.util; -import org.apache.flink.runtime.io.network.partition.ResultPartition; -import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel; +import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition; +import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel; +import java.nio.ByteBuffer; import java.util.Random; import java.util.concurrent.Callable; @@ -38,7 +39,7 @@ public class TestPartitionProducer implements Callable { public static final int MAX_SLEEP_TIME_MS = 20; /** The partition to add data to. */ - private final ResultPartition partition; + private final BufferWritingResultPartition partition; /** * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random @@ -53,7 +54,7 @@ public class TestPartitionProducer implements Callable { private final Random random; public TestPartitionProducer( - ResultPartition partition, + BufferWritingResultPartition partition, boolean isSlowProducer, TestProducerSource source) { @@ -69,10 +70,11 @@ public Boolean call() throws Exception { boolean success = false; try { - BufferConsumerAndChannel consumerAndChannel; + BufferAndChannel bufferAndChannel; - while ((consumerAndChannel = source.getNextBufferConsumer()) != null) { - partition.addBufferConsumer(consumerAndChannel.getBufferConsumer(), consumerAndChannel.getTargetChannel()); + while ((bufferAndChannel = source.getNextBuffer()) != null) { + ByteBuffer record = ByteBuffer.wrap(bufferAndChannel.getBuffer()); + partition.emitRecord(record, bufferAndChannel.getTargetChannel()); // Check for interrupted flag after adding data to prevent resource leaks if (Thread.interrupted()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java index f5d97f55b0f90..99006e54fe8e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java @@ -29,19 +29,19 @@ public interface TestProducerSource { * *

The channel index specifies the subpartition add the data to. */ - BufferConsumerAndChannel getNextBufferConsumer() throws Exception; + BufferAndChannel getNextBuffer() throws Exception; - class BufferConsumerAndChannel { - private final BufferConsumer bufferConsumer; + class BufferAndChannel { + private final byte[] buffer; private final int targetChannel; - public BufferConsumerAndChannel(BufferConsumer bufferConsumer, int targetChannel) { - this.bufferConsumer = checkNotNull(bufferConsumer); + public BufferAndChannel(byte[] buffer, int targetChannel) { + this.buffer = checkNotNull(buffer); this.targetChannel = targetChannel; } - public BufferConsumer getBufferConsumer() { - return bufferConsumer; + public byte[] getBuffer() { + return buffer; } public int getTargetChannel() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java index 60eee340fb0a3..a14cdba76cba0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java @@ -18,8 +18,12 @@ package org.apache.flink.runtime.io.network.util; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferConsumer; import org.apache.flink.runtime.io.network.partition.ResultSubpartition; -import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel; +import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel; import java.util.Random; import java.util.concurrent.Callable; @@ -69,10 +73,11 @@ public Boolean call() throws Exception { boolean success = false; try { - BufferConsumerAndChannel consumerAndChannel; + BufferAndChannel bufferAndChannel; - while ((consumerAndChannel = source.getNextBufferConsumer()) != null) { - subpartition.add(consumerAndChannel.getBufferConsumer()); + while ((bufferAndChannel = source.getNextBuffer()) != null) { + MemorySegment segment = MemorySegmentFactory.wrap(bufferAndChannel.getBuffer()); + subpartition.add(new BufferConsumer(segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER)); // Check for interrupted flag after adding data to prevent resource leaks if (Thread.interrupted()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 28652044c19c5..a28a0b55bf221 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -202,7 +202,7 @@ public void addInputs(List gates) { public void addOutput(final List outputList) { try { - outputs.add(new RecordCollectingResultPartitionWriter(outputList, new TestPooledBufferProvider(Integer.MAX_VALUE))); + outputs.add(new RecordCollectingResultPartitionWriter(outputList)); } catch (Throwable t) { t.printStackTrace(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java index 082ddc511f33f..a29746076c0fd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java @@ -21,11 +21,11 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -34,6 +34,7 @@ import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import java.nio.ByteBuffer; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedQueue; @@ -81,7 +82,7 @@ private void setupInputChannels() { for (int i = 0; i < numInputChannels; i++) { final int channelIndex = i; - final RecordSerializer> recordSerializer = new SpanningRecordSerializer>(); + final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128); final SerializationDelegate delegate = (SerializationDelegate) (SerializationDelegate) new SerializationDelegate<>(new StreamElementSerializer(serializer)); @@ -107,10 +108,10 @@ private void setupInputChannels() { Object inputElement = input.getStreamRecord(); delegate.setInstance(inputElement); - recordSerializer.serializeRecord(delegate); + ByteBuffer serializedRecord = RecordWriter.serializeRecord(dataOutputSerializer, delegate); BufferBuilder bufferBuilder = createBufferBuilder(bufferSize); BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer(); - recordSerializer.copyToBufferBuilder(bufferBuilder); + bufferBuilder.appendAndCommit(serializedRecord); bufferBuilder.finish(); // Call getCurrentBuffer to ensure size is set diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java index 4c68ec3db935c..48f833db69ddd 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; @@ -27,9 +28,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; -import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer; import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.apache.flink.runtime.io.network.buffer.BufferConsumer; @@ -53,6 +53,7 @@ import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -218,14 +219,15 @@ private StreamTaskNetworkInput createStreamTaskNetworkInput(List } private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException { - RecordSerializer> serializer = new SpanningRecordSerializer<>(); + DataOutputSerializer serializer = new DataOutputSerializer(128); SerializationDelegate serializationDelegate = new SerializationDelegate<>( new StreamElementSerializer<>(LongSerializer.INSTANCE)); serializationDelegate.setInstance(new StreamRecord<>(value)); - serializer.serializeRecord(serializationDelegate); + ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, serializationDelegate); + bufferBuilder.appendAndCommit(serializedRecord); - assertFalse(serializer.copyToBufferBuilder(bufferBuilder).isFullBuffer()); + assertFalse(bufferBuilder.isFull()); } private static void assertHasNextElement(StreamTaskNetworkInput input, DataOutput output) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index bf2b79ce28ea4..447b9a0951f6a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; @@ -204,7 +203,6 @@ public void addOutput(final Collection outputList, final TypeSeriali try { outputs.add(new RecordOrEventCollectingResultPartitionWriter( outputList, - new TestPooledBufferProvider(Integer.MAX_VALUE), serializer)); } catch (Throwable t) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index 00ef9a16746d3..87142d29fa00f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter; import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; -import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -252,11 +251,10 @@ public void testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throw .setEnvironment(mockEnvironment) .build(); - TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096); ArrayList recordOrEvents = new ArrayList<>(); StreamElementSerializer stringStreamElementSerializer = new StreamElementSerializer<>(StringSerializer.INSTANCE); ResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter<>( - recordOrEvents, bufferProvider, stringStreamElementSerializer); + recordOrEvents, stringStreamElementSerializer); mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter)); OneInputStreamTask task = testHarness.getTask();