Skip to content

Commit

Permalink
[FLINK-8755] [FLINK-8786] [network] Add and improve subpartition tests
Browse files Browse the repository at this point in the history
+ also improve the subpartition tests in general to reduce some duplication

This closes apache#5581
  • Loading branch information
Nico Kruber authored and StephanEwen committed Mar 9, 2018
1 parent 112c54f commit c19df9f
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException

parent.updateStatistics(current);
// if we are spilled (but still process a non-spilled nextBuffer), we don't know the
// state of nextBufferIsEvent...
// state of nextBufferIsEvent or whether more buffers are available
if (spilledView == null) {
return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void testAddNonEmptyNotFinishedBuffer() throws Exception {
bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
subpartition.add(bufferBuilder.createBufferConsumer());

assertNextBuffer(readView, 1024, false, 1);
// note that since the buffer builder is not finished, there is still a retained instance!
assertNextBuffer(readView, 1024, false, 1, false, false);
assertEquals(1, subpartition.getBuffersInBacklog());
} finally {
readView.releaseAllResources();
Expand All @@ -157,7 +158,7 @@ public void testUnfinishedBufferBehindFinished() throws Exception {
subpartition.add(createFilledBufferConsumer(1025)); // finished
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished

assertNextBuffer(readView, 1025, false, 1);
assertNextBuffer(readView, 1025, false, 1, false, true);
} finally {
subpartition.release();
}
Expand All @@ -178,8 +179,8 @@ public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
subpartition.flush();

assertNextBuffer(readView, 1025, true, 1);
assertNextBuffer(readView, 1024, false, 1);
assertNextBuffer(readView, 1025, true, 1, false, true);
assertNextBuffer(readView, 1024, false, 1, false, false);
} finally {
subpartition.release();
}
Expand Down Expand Up @@ -208,7 +209,7 @@ public void testMultipleEmptyBuffers() throws Exception {
subpartition.add(createFilledBufferConsumer(1024));
assertEquals(2, availablityListener.getNumNotifications());

assertNextBuffer(readView, 1024, false, 0);
assertNextBuffer(readView, 1024, false, 0, false, true);
} finally {
readView.releaseAllResources();
subpartition.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;

import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -52,7 +52,6 @@
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
Expand Down Expand Up @@ -190,10 +189,13 @@ public void testConsumeSpilledPartition() throws Exception {
SpillableSubpartition partition = createSubpartition();

BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
BufferConsumer eventBufferConsumer =
EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
final int eventSize = eventBufferConsumer.getWrittenBytes();

partition.add(bufferConsumer.copy());
partition.add(bufferConsumer.copy());
partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
partition.add(eventBufferConsumer);
partition.add(bufferConsumer);

assertEquals(4, partition.getTotalNumberOfBuffers());
Expand All @@ -207,73 +209,38 @@ public void testConsumeSpilledPartition() throws Exception {
// still same statistics
assertEquals(4, partition.getTotalNumberOfBuffers());
assertEquals(3, partition.getBuffersInBacklog());
assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes());
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, partition.getTotalNumberOfBytes());

partition.finish();
// + one EndOfPartitionEvent
assertEquals(5, partition.getTotalNumberOfBuffers());
assertEquals(3, partition.getBuffersInBacklog());
assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes());
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes());

AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);

assertEquals(1, listener.getNumNotifications());

assertFalse(reader.nextBufferIsEvent()); // buffer
BufferAndBacklog read = reader.getNextBuffer();
assertNotNull(read);
assertTrue(read.buffer().isBuffer());
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
assertEquals(2, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertFalse(read.buffer().isRecycled());
read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
assertFalse(read.nextBufferIsEvent());

assertFalse(reader.nextBufferIsEvent()); // buffer
read = reader.getNextBuffer();
assertNotNull(read);
assertTrue(read.buffer().isBuffer());
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertFalse(read.buffer().isRecycled());
read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
assertTrue(read.nextBufferIsEvent());

assertTrue(reader.nextBufferIsEvent()); // event
read = reader.getNextBuffer();
assertNotNull(read);
assertFalse(read.buffer().isBuffer());
assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
assertEquals(1, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
read.buffer().recycleBuffer();
assertFalse(read.nextBufferIsEvent());

assertFalse(reader.nextBufferIsEvent()); // buffer
read = reader.getNextBuffer();
assertNotNull(read);
assertTrue(read.buffer().isBuffer());
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
assertEquals(0, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertFalse(read.buffer().isRecycled());
read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
assertTrue(read.nextBufferIsEvent());

assertTrue(reader.nextBufferIsEvent()); // end of partition event
read = reader.getNextBuffer();
assertNotNull(read);
assertFalse(read.buffer().isBuffer());
assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
assertEquals(0, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertEquals(EndOfPartitionEvent.class,
EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
assertFalse(read.buffer().isRecycled());
read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
assertFalse(read.nextBufferIsEvent());

// finally check that the bufferConsumer has been freed after a successful (or failed) write
final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
Expand All @@ -292,10 +259,13 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
SpillableSubpartition partition = createSubpartition();

BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
BufferConsumer eventBufferConsumer =
EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
final int eventSize = eventBufferConsumer.getWrittenBytes();

partition.add(bufferConsumer.copy());
partition.add(bufferConsumer.copy());
partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
partition.add(eventBufferConsumer);
partition.add(bufferConsumer);
partition.finish();

Expand All @@ -311,17 +281,12 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertFalse(bufferConsumer.isRecycled());

assertFalse(reader.nextBufferIsEvent());
BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled)
assertNotNull(read);
assertTrue(read.buffer().isBuffer());
// first buffer (non-spilled)
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
assertEquals(2, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
read.buffer().recycleBuffer();
assertTrue(read.isMoreAvailable());
assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification
assertFalse(bufferConsumer.isRecycled());
assertFalse(read.nextBufferIsEvent());

// Spill now
assertEquals(3, partition.releaseMemory());
Expand All @@ -330,59 +295,44 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertEquals(5, partition.getTotalNumberOfBuffers());
assertEquals(2, partition.getBuffersInBacklog());
// only updated when getting/spilling the buffers but without the nextBuffer (kept in memory)
assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes());
assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, partition.getTotalNumberOfBytes());

// wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!)
listener.awaitNotifications(2, 30_000);
// Spiller finished
assertEquals(2, listener.getNumNotifications());

// after consuming and releasing the next buffer, the bufferConsumer may be freed,
// depending on the timing of the last write operation
// -> retain once so that we can check below
Buffer buffer = bufferConsumer.build();
buffer.retainBuffer();

assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
read = reader.getNextBuffer();
assertNotNull(read);
assertTrue(read.buffer().isBuffer());
assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
assertEquals(1, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
read.buffer().recycleBuffer();
// now the bufferConsumer may be freed, depending on the timing of the write operation
// -> let's do this check at the end of the test (to save some time)
assertTrue(read.nextBufferIsEvent());

bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)

assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
read = reader.getNextBuffer();
assertNotNull(read);
assertFalse(read.buffer().isBuffer());
assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(1, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
read.buffer().recycleBuffer();
assertFalse(read.nextBufferIsEvent());

assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
read = reader.getNextBuffer();
assertNotNull(read);
assertTrue(read.buffer().isBuffer());
assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(0, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertFalse(read.buffer().isRecycled());
read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
assertTrue(read.nextBufferIsEvent());

buffer.recycleBuffer();
assertTrue(buffer.isRecycled());

// End of partition
assertTrue(reader.nextBufferIsEvent());
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(0, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertEquals(EndOfPartitionEvent.class,
EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
assertFalse(read.buffer().isRecycled());
read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
assertFalse(read.nextBufferIsEvent());

// finally check that the bufferConsumer has been freed after a successful (or failed) write
final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;

import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -138,11 +145,74 @@ static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog) throws IOException, InterruptedException {
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
assertNextBufferOrEvent(
readView,
expectedReadableBufferSize,
true,
null,
expectedIsMoreAvailable,
expectedBuffersInBacklog,
expectedNextBufferIsEvent,
expectedRecycledAfterRecycle);
}

static void assertNextEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
Class<? extends AbstractEvent> expectedEventClass,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
assertNextBufferOrEvent(
readView,
expectedReadableBufferSize,
false,
expectedEventClass,
expectedIsMoreAvailable,
expectedBuffersInBacklog,
expectedNextBufferIsEvent,
expectedRecycledAfterRecycle);
}

private static void assertNextBufferOrEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsBuffer,
@Nullable Class<? extends AbstractEvent> expectedEventClass,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
checkArgument(expectedEventClass == null || !expectedIsBuffer);

ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes());
assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable());
assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
assertNotNull(bufferAndBacklog);
try {
assertEquals("buffer size", expectedReadableBufferSize,
bufferAndBacklog.buffer().readableBytes());
assertEquals("buffer or event", expectedIsBuffer,
bufferAndBacklog.buffer().isBuffer());
if (expectedEventClass != null) {
assertThat(EventSerializer
.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
instanceOf(expectedEventClass));
}
assertEquals("more available", expectedIsMoreAvailable,
bufferAndBacklog.isMoreAvailable());
assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
assertEquals("next is event", expectedNextBufferIsEvent,
bufferAndBacklog.nextBufferIsEvent());

assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
} finally {
bufferAndBacklog.buffer().recycleBuffer();
}
assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
}

protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
Expand Down

0 comments on commit c19df9f

Please sign in to comment.