From 9efce9815b9f43bef1846c079e616325b5c20456 Mon Sep 17 00:00:00 2001 From: Piotr Nowojski Date: Thu, 16 May 2019 17:20:04 +0200 Subject: [PATCH] [FLINK-12535][test] Drop deprecated blocking CheckpointBarrierHandler#getNext method --- .../runtime/io/CheckpointBarrierHandler.java | 11 - .../io/BarrierBufferAlignmentLimitTest.java | 70 +-- .../io/BarrierBufferMassiveRandomTest.java | 2 +- .../runtime/io/BarrierBufferTestBase.java | 519 +++++++++--------- .../runtime/io/BarrierTrackerTest.java | 20 +- 5 files changed, 305 insertions(+), 317 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index 25b220551292f..44cd584491c84 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -25,8 +25,6 @@ import java.io.IOException; -import static org.apache.flink.util.Preconditions.checkState; - /** * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels. * Different implementations may either simply track barriers, or block certain inputs on @@ -34,15 +32,6 @@ */ @Internal public interface CheckpointBarrierHandler extends AsyncDataInput { - /** - * Blocking version of {@link #pollNext()}. - */ - @Deprecated - default BufferOrEvent getNextNonBlocked() throws Exception { - Optional bufferOrEvent = pollNext(); - checkState(bufferOrEvent.isPresent()); - return bufferOrEvent.get(); - } /** * Registers the task be notified once all checkpoint barriers have been received for a checkpoint. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java index c6ef261c8184f..2e4ba519df6ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java @@ -122,37 +122,37 @@ public void testBreakCheckpointAtAlignmentLimit() throws Exception { // validating the sequence of buffers - check(sequence[0], buffer.getNextNonBlocked()); - check(sequence[1], buffer.getNextNonBlocked()); - check(sequence[2], buffer.getNextNonBlocked()); - check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[0], buffer.pollNext().get()); + check(sequence[1], buffer.pollNext().get()); + check(sequence[2], buffer.pollNext().get()); + check(sequence[3], buffer.pollNext().get()); // start of checkpoint long startTs = System.nanoTime(); - check(sequence[6], buffer.getNextNonBlocked()); - check(sequence[8], buffer.getNextNonBlocked()); - check(sequence[10], buffer.getNextNonBlocked()); + check(sequence[6], buffer.pollNext().get()); + check(sequence[8], buffer.pollNext().get()); + check(sequence[10], buffer.pollNext().get()); // trying to pull the next makes the alignment overflow - so buffered buffers are replayed - check(sequence[5], buffer.getNextNonBlocked()); + check(sequence[5], buffer.pollNext().get()); validateAlignmentTime(startTs, buffer); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class)); // playing back buffered events - check(sequence[7], buffer.getNextNonBlocked()); - check(sequence[11], buffer.getNextNonBlocked()); - check(sequence[12], buffer.getNextNonBlocked()); - check(sequence[13], buffer.getNextNonBlocked()); - check(sequence[14], buffer.getNextNonBlocked()); + check(sequence[7], buffer.pollNext().get()); + check(sequence[11], buffer.pollNext().get()); + check(sequence[12], buffer.pollNext().get()); + check(sequence[13], buffer.pollNext().get()); + check(sequence[14], buffer.pollNext().get()); // the additional data - check(sequence[15], buffer.getNextNonBlocked()); - check(sequence[16], buffer.getNextNonBlocked()); - check(sequence[17], buffer.getNextNonBlocked()); + check(sequence[15], buffer.pollNext().get()); + check(sequence[16], buffer.pollNext().get()); + check(sequence[17], buffer.pollNext().get()); - check(sequence[19], buffer.getNextNonBlocked()); - check(sequence[20], buffer.getNextNonBlocked()); - check(sequence[21], buffer.getNextNonBlocked()); + check(sequence[19], buffer.pollNext().get()); + check(sequence[20], buffer.pollNext().get()); + check(sequence[21], buffer.pollNext().get()); // no call for a completed checkpoint must have happened verify(toNotify, times(0)).triggerCheckpointOnBarrier( @@ -217,44 +217,44 @@ public void testAlignmentLimitWithQueuedAlignments() throws Exception { // validating the sequence of buffers long startTs; - check(sequence[0], buffer.getNextNonBlocked()); - check(sequence[1], buffer.getNextNonBlocked()); + check(sequence[0], buffer.pollNext().get()); + check(sequence[1], buffer.pollNext().get()); // start of checkpoint startTs = System.nanoTime(); - check(sequence[3], buffer.getNextNonBlocked()); - check(sequence[7], buffer.getNextNonBlocked()); + check(sequence[3], buffer.pollNext().get()); + check(sequence[7], buffer.pollNext().get()); // next checkpoint also in progress - check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[11], buffer.pollNext().get()); // checkpoint alignment aborted due to too much data - check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[4], buffer.pollNext().get()); validateAlignmentTime(startTs, buffer); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class)); // replay buffered data - in the middle, the alignment for checkpoint 4 starts - check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[6], buffer.pollNext().get()); startTs = System.nanoTime(); - check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[12], buffer.pollNext().get()); // only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success - check(sequence[17], buffer.getNextNonBlocked()); + check(sequence[17], buffer.pollNext().get()); // checkpoint 4 completed - check and validate buffered replay - check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[9], buffer.pollNext().get()); validateAlignmentTime(startTs, buffer); verify(toNotify, times(1)).triggerCheckpointOnBarrier( argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); - check(sequence[10], buffer.getNextNonBlocked()); - check(sequence[15], buffer.getNextNonBlocked()); - check(sequence[16], buffer.getNextNonBlocked()); + check(sequence[10], buffer.pollNext().get()); + check(sequence[15], buffer.pollNext().get()); + check(sequence[16], buffer.pollNext().get()); // trailing data - check(sequence[19], buffer.getNextNonBlocked()); - check(sequence[20], buffer.getNextNonBlocked()); - check(sequence[21], buffer.getNextNonBlocked()); + check(sequence[19], buffer.pollNext().get()); + check(sequence[20], buffer.pollNext().get()); + check(sequence[21], buffer.pollNext().get()); // only checkpoint 4 was successfully completed, not checkpoint 3 verify(toNotify, times(0)).triggerCheckpointOnBarrier( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java index 82d21c742ac17..7fc8a5d4d41cf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java @@ -65,7 +65,7 @@ public void testWithTwoChannelsAndRandomBarriers() { BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, new BufferSpiller(ioMan, myIG.getPageSize())); for (int i = 0; i < 2000000; i++) { - BufferOrEvent boe = barrierBuffer.getNextNonBlocked(); + BufferOrEvent boe = barrierBuffer.pollNext().get(); if (boe.isBuffer()) { boe.getBuffer().recycleBuffer(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index dfde947540991..6475bfc5b47d9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -49,7 +49,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Matchers.any; @@ -108,7 +107,7 @@ public void testSingleChannelNoBarriers() throws Exception { buffer = createBarrierBuffer(1, sequence); for (BufferOrEvent boe : sequence) { - assertEquals(boe, buffer.getNextNonBlocked()); + assertEquals(boe, buffer.pollNext().get()); } assertEquals(0L, buffer.getAlignmentDurationNanos()); @@ -129,7 +128,7 @@ public void testMultiChannelNoBarriers() throws Exception { buffer = createBarrierBuffer(4, sequence); for (BufferOrEvent boe : sequence) { - assertEquals(boe, buffer.getNextNonBlocked()); + assertEquals(boe, buffer.pollNext().get()); } assertEquals(0L, buffer.getAlignmentDurationNanos()); @@ -158,7 +157,7 @@ public void testSingleChannelWithBarriers() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { - assertEquals(boe, buffer.getNextNonBlocked()); + assertEquals(boe, buffer.pollNext().get()); } } } @@ -208,76 +207,76 @@ public void testMultiChannelWithBarriers() throws Exception { handler.setNextExpectedCheckpointId(1L); // pre checkpoint 1 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, handler.getNextExpectedCheckpointId()); long startTs = System.nanoTime(); // blocking while aligning for checkpoint 1 - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, handler.getNextExpectedCheckpointId()); // checkpoint 1 done, returning buffered data - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, handler.getNextExpectedCheckpointId()); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[5], sequence[6]); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); // pre checkpoint 2 - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, handler.getNextExpectedCheckpointId()); // checkpoint 2 barriers come together startTs = System.nanoTime(); - check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[17], buffer.pollNext().get(), PAGE_SIZE); assertEquals(3L, handler.getNextExpectedCheckpointId()); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment()); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 starts, data buffered - check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[20], sequence[21]); assertEquals(4L, handler.getNextExpectedCheckpointId()); - check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 4 happens without extra data // pre checkpoint 5 - check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[27], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment()); assertEquals(5L, handler.getNextExpectedCheckpointId()); - check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[29], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[28], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[29], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 5 aligning - check(sequence[31], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[31], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[32], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[33], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[37], buffer.pollNext().get(), PAGE_SIZE); // buffered data from checkpoint 5 alignment - check(sequence[34], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[36], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[38], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[39], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[34], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[36], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[38], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[39], buffer.pollNext().get(), PAGE_SIZE); // remaining data - check(sequence[41], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[42], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[43], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[44], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[41], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[42], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[43], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[44], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[34], sequence[36], sequence[38], sequence[39]); @@ -302,31 +301,31 @@ public void testMultiChannelTrailingBlockedData() throws Exception { handler.setNextExpectedCheckpointId(1L); // pre-checkpoint 1 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, handler.getNextExpectedCheckpointId()); // pre-checkpoint 2 - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, handler.getNextExpectedCheckpointId()); - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 2 alignment long startTs = System.nanoTime(); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[14], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); // end of stream: remaining buffered contents - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[17], buffer.pollNext().get(), PAGE_SIZE); } /** @@ -378,64 +377,64 @@ public void testMultiChannelWithQueuedFutureBarriers() throws Exception{ handler.setNextExpectedCheckpointId(1L); // around checkpoint 1 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, handler.getNextExpectedCheckpointId()); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); // alignment of checkpoint 2 - buffering also some barriers for // checkpoints 3 and 4 long startTs = System.nanoTime(); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[23], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[23], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 2 completed - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - check(sequence[25], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[30], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[25], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[27], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[30], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[32], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 completed (emit buffered) - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[28], buffer.pollNext().get(), PAGE_SIZE); // past checkpoint 3 - check(sequence[36], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[38], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[36], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[38], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 4 completed (emit buffered) - check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[31], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[39], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[26], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[31], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[33], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[39], buffer.pollNext().get(), PAGE_SIZE); // past checkpoint 4, alignment for checkpoint 5 - check(sequence[42], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[45], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[46], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[42], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[45], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[46], buffer.pollNext().get(), PAGE_SIZE); // abort checkpoint 5 (end of partition) - check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[37], buffer.pollNext().get(), PAGE_SIZE); // start checkpoint 6 alignment - check(sequence[47], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[48], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[47], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[48], buffer.pollNext().get(), PAGE_SIZE); // end of input, emit remainder - check(sequence[43], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[44], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[43], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[44], buffer.pollNext().get(), PAGE_SIZE); } /** @@ -472,51 +471,51 @@ public void testMultiChannelSkippingCheckpoints() throws Exception { long startTs; // initial data - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); // align checkpoint 1 startTs = System.nanoTime(); - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, buffer.getCurrentCheckpointId()); // checkpoint done - replay buffered - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); // alignment of checkpoint 2 startTs = System.nanoTime(); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 2 aborted, checkpoint 3 started - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); assertEquals(3L, buffer.getCurrentCheckpointId()); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); verify(toNotify).abortCheckpointOnBarrier(eq(2L), isA(CheckpointDeclineSubsumedException.class)); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 alignment in progress - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 aborted (end of partition) - check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify).abortCheckpointOnBarrier(eq(3L), isA(InputEndOfStreamException.class)); // replay buffered data from checkpoint 3 - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); // all the remaining messages - check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[23], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[23], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[24], buffer.pollNext().get(), PAGE_SIZE); } /** @@ -556,45 +555,45 @@ public void testMultiChannelJumpingOverCheckpoint() throws Exception { handler.setNextExpectedCheckpointId(1L); // checkpoint 1 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, buffer.getCurrentCheckpointId()); - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); // alignment of checkpoint 2 - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, buffer.getCurrentCheckpointId()); - check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); long startTs = System.nanoTime(); // checkpoint 2 aborted, checkpoint 4 started. replay buffered - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); assertEquals(4L, buffer.getCurrentCheckpointId()); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); // align checkpoint 4 remainder - check(sequence[25], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[25], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[26], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); // checkpoint 4 aborted (due to end of partition) - check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[29], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[30], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[24], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[27], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[28], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[29], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[30], buffer.pollNext().get(), PAGE_SIZE); } /** @@ -642,45 +641,45 @@ public void testMultiChannelSkippingCheckpointsViaBlockedInputs() throws Excepti buffer = createBarrierBuffer(3, sequence); // checkpoint 1 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, buffer.getCurrentCheckpointId()); - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); // alignment of checkpoint 2 - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, buffer.getCurrentCheckpointId()); // checkpoint 2 completed - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 skipped, alignment for 4 started - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); assertEquals(4L, buffer.getCurrentCheckpointId()); - check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[30], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[24], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[26], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[30], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 4 completed - check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[29], buffer.getNextNonBlocked(), PAGE_SIZE); - - check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[34], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[35], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[36], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[28], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[29], buffer.pollNext().get(), PAGE_SIZE); + + check(sequence[32], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[33], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[34], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[35], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[36], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[37], buffer.pollNext().get(), PAGE_SIZE); } @Test @@ -702,29 +701,29 @@ public void testEarlyCleanup() throws Exception { handler.setNextExpectedCheckpointId(1L); // pre-checkpoint 1 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, handler.getNextExpectedCheckpointId()); // pre-checkpoint 2 - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, handler.getNextExpectedCheckpointId()); - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 2 alignment - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[14], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); // drain buffer - buffer.getNextNonBlocked(); - buffer.getNextNonBlocked(); - buffer.getNextNonBlocked(); - buffer.getNextNonBlocked(); - buffer.getNextNonBlocked(); + buffer.pollNext().get(); + buffer.pollNext().get(); + buffer.pollNext().get(); + buffer.pollNext().get(); + buffer.pollNext().get(); } @Test @@ -759,33 +758,33 @@ public void testStartAlignmentWithClosedChannels() throws Exception { buffer = createBarrierBuffer(4, sequence); // pre checkpoint 2 - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 alignment - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, buffer.getCurrentCheckpointId()); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 buffered - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); assertEquals(3L, buffer.getCurrentCheckpointId()); // after checkpoint 4 - check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); assertEquals(4L, buffer.getCurrentCheckpointId()); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[17], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); assertEquals(5L, buffer.getCurrentCheckpointId()); - check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); } @Test @@ -811,22 +810,22 @@ public void testEndOfStreamWhileCheckpoint() throws Exception { buffer = createBarrierBuffer(3, sequence); // data after first checkpoint - check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); assertEquals(1L, buffer.getCurrentCheckpointId()); // alignment of second checkpoint - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); assertEquals(2L, buffer.getCurrentCheckpointId()); // first end-of-partition encountered: checkpoint will not be completed - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[14], buffer.pollNext().get(), PAGE_SIZE); } @Test @@ -847,19 +846,19 @@ public void testSingleChannelAbortCheckpoint() throws Exception { AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer.registerCheckpointEventHandler(toNotify); - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); assertEquals(5L, buffer.getCurrentCheckpointId()); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); assertEquals(6L, buffer.getCurrentCheckpointId()); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); @@ -909,52 +908,52 @@ public void testMultiChannelAbortCheckpoint() throws Exception { long startTs; // successful first checkpoint, with some aligned buffers - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); startTs = System.nanoTime(); - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); // canceled checkpoint on last barrier startTs = System.nanoTime(); - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); // one more successful checkpoint - check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); startTs = System.nanoTime(); - check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); // this checkpoint gets immediately canceled - check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[24], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); // some buffers - check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[26], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[27], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[28], buffer.pollNext().get(), PAGE_SIZE); // a simple successful checkpoint startTs = System.nanoTime(); - check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[32], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[33], buffer.pollNext().get(), PAGE_SIZE); - check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[37], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); } @@ -993,33 +992,33 @@ public void testAbortViaQueuedBarriers() throws Exception { long startTs; - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); // starting first checkpoint startTs = System.nanoTime(); - check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); // finished first checkpoint - check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); // re-read the queued cancellation barriers - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); assertEquals(0L, buffer.getAlignmentDurationNanos()); - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[14], buffer.pollNext().get(), PAGE_SIZE); - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[17], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); // no further alignment should have happened assertEquals(0L, buffer.getAlignmentDurationNanos()); @@ -1075,39 +1074,39 @@ public void testAbortWhileHavingQueuedBarriers() throws Exception { long startTs; - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); // starting first checkpoint startTs = System.nanoTime(); - check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); // cancelled by cancellation barrier - check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); verify(toNotify).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class)); // the next checkpoint alignment starts now startTs = System.nanoTime(); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); // checkpoint done - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); // queued data - check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[14], buffer.pollNext().get(), PAGE_SIZE); // trailing data - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[20], buffer.pollNext().get(), PAGE_SIZE); // check overall notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); @@ -1153,33 +1152,33 @@ public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { // validate the sequence - check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[0], buffer.pollNext().get(), PAGE_SIZE); // beginning of first checkpoint - check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); // future barrier aborts checkpoint startTs = System.nanoTime(); - check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); - check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); // alignment of next checkpoint - check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); // checkpoint finished - check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); - check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); // remaining data - check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); - check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[17], buffer.pollNext().get(), PAGE_SIZE); + check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); // check overall notifications verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index 22c6de4a850c9..398a95a42413f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -67,7 +67,7 @@ public void testSingleChannelNoBarriers() throws Exception { tracker = createBarrierTracker(1, sequence); for (BufferOrEvent boe : sequence) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } @@ -80,7 +80,7 @@ public void testMultiChannelNoBarriers() throws Exception { tracker = createBarrierTracker(4, sequence); for (BufferOrEvent boe : sequence) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } @@ -103,7 +103,7 @@ public void testSingleChannelWithBarriers() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } } @@ -127,7 +127,7 @@ public void testSingleChannelWithSkippedBarriers() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } } @@ -160,7 +160,7 @@ public void testMultiChannelWithBarriers() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } } @@ -197,7 +197,7 @@ public void testMultiChannelSkippingCheckpoints() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } } @@ -273,7 +273,7 @@ public void testCompleteCheckpointsOnLateBarriers() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } } @@ -300,7 +300,7 @@ public void testSingleChannelAbortCheckpoint() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer()) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } assertTrue(tracker.isEmpty()); } @@ -351,7 +351,7 @@ public void testMultiChannelAbortCheckpoint() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer()) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } } } @@ -379,7 +379,7 @@ public void testInterleavedCancellationBarriers() throws Exception { for (BufferOrEvent boe : sequence) { if (boe.isBuffer() || (boe.getEvent().getClass() != CheckpointBarrier.class && boe.getEvent().getClass() != CancelCheckpointMarker.class)) { - assertEquals(boe, tracker.getNextNonBlocked()); + assertEquals(boe, tracker.pollNext().get()); } }