Skip to content

Commit

Permalink
[hotfix] [tests] Move utility methods into correct test class.
Browse files Browse the repository at this point in the history
The methods were intended to be generic for tests across partitions and hence placed in the
SubpartitionTestBase.

However, the SubpartitionTestBase can only really test common contract behavior, like behavior on
disposal, finishing, and buffer reycling contracts in those cases. Producer / consumer behavior
is sufficiently different between both implementations that it does not make sense at this point
to try and share the tests.
  • Loading branch information
StephanEwen committed May 10, 2019
1 parent e5a6a94 commit 422f7b5
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,32 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -273,4 +278,86 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception {
assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
assertEquals(4, availablityListener.getNumNotifications());
}

// ------------------------------------------------------------------------

static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
assertNextBufferOrEvent(
readView,
expectedReadableBufferSize,
true,
null,
expectedIsMoreAvailable,
expectedBuffersInBacklog,
expectedNextBufferIsEvent,
expectedRecycledAfterRecycle);
}

static void assertNextEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
Class<? extends AbstractEvent> expectedEventClass,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
assertNextBufferOrEvent(
readView,
expectedReadableBufferSize,
false,
expectedEventClass,
expectedIsMoreAvailable,
expectedBuffersInBacklog,
expectedNextBufferIsEvent,
expectedRecycledAfterRecycle);
}

private static void assertNextBufferOrEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsBuffer,
@Nullable Class<? extends AbstractEvent> expectedEventClass,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
checkArgument(expectedEventClass == null || !expectedIsBuffer);

ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
assertNotNull(bufferAndBacklog);
try {
assertEquals("buffer size", expectedReadableBufferSize,
bufferAndBacklog.buffer().readableBytes());
assertEquals("buffer or event", expectedIsBuffer,
bufferAndBacklog.buffer().isBuffer());
if (expectedEventClass != null) {
Assert.assertThat(EventSerializer
.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
instanceOf(expectedEventClass));
}
assertEquals("more available", expectedIsMoreAvailable,
bufferAndBacklog.isMoreAvailable());
assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
assertEquals("next is event", expectedNextBufferIsEvent,
bufferAndBacklog.nextBufferIsEvent());
assertEquals("next is event", expectedNextBufferIsEvent,
readView.nextBufferIsEvent());

assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
} finally {
bufferAndBacklog.buffer().recycleBuffer();
}
assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
}

static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
assertNull(readView.getNextBuffer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,18 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import javax.annotation.Nullable;

import java.io.IOException;

import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;

/**
Expand Down Expand Up @@ -135,84 +126,4 @@ private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition)
// Verify that parent release is reflected at partition view
assertTrue(view.isReleased());
}

static void assertNextBuffer(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
assertNextBufferOrEvent(
readView,
expectedReadableBufferSize,
true,
null,
expectedIsMoreAvailable,
expectedBuffersInBacklog,
expectedNextBufferIsEvent,
expectedRecycledAfterRecycle);
}

static void assertNextEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
Class<? extends AbstractEvent> expectedEventClass,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
assertNextBufferOrEvent(
readView,
expectedReadableBufferSize,
false,
expectedEventClass,
expectedIsMoreAvailable,
expectedBuffersInBacklog,
expectedNextBufferIsEvent,
expectedRecycledAfterRecycle);
}

private static void assertNextBufferOrEvent(
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsBuffer,
@Nullable Class<? extends AbstractEvent> expectedEventClass,
boolean expectedIsMoreAvailable,
int expectedBuffersInBacklog,
boolean expectedNextBufferIsEvent,
boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
checkArgument(expectedEventClass == null || !expectedIsBuffer);

ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
assertNotNull(bufferAndBacklog);
try {
assertEquals("buffer size", expectedReadableBufferSize,
bufferAndBacklog.buffer().readableBytes());
assertEquals("buffer or event", expectedIsBuffer,
bufferAndBacklog.buffer().isBuffer());
if (expectedEventClass != null) {
assertThat(EventSerializer
.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
instanceOf(expectedEventClass));
}
assertEquals("more available", expectedIsMoreAvailable,
bufferAndBacklog.isMoreAvailable());
assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
assertEquals("next is event", expectedNextBufferIsEvent,
bufferAndBacklog.nextBufferIsEvent());
assertEquals("next is event", expectedNextBufferIsEvent,
readView.nextBufferIsEvent());

assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
} finally {
bufferAndBacklog.buffer().recycleBuffer();
}
assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
}

static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
assertNull(readView.getNextBuffer());
}
}

0 comments on commit 422f7b5

Please sign in to comment.