Skip to content

Commit

Permalink
[FLINK-12535][test] Drop deprecated blocking CheckpointBarrierHandler…
Browse files Browse the repository at this point in the history
…#getNext method
  • Loading branch information
pnowojski committed Jun 5, 2019
1 parent 8d281f8 commit 9efce98
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 317 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,13 @@

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkState;

/**
* The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels.
* Different implementations may either simply track barriers, or block certain inputs on
* barriers.
*/
@Internal
public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> {
/**
* Blocking version of {@link #pollNext()}.
*/
@Deprecated
default BufferOrEvent getNextNonBlocked() throws Exception {
Optional<BufferOrEvent> bufferOrEvent = pollNext();
checkState(bufferOrEvent.isPresent());
return bufferOrEvent.get();
}

/**
* Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,37 +122,37 @@ public void testBreakCheckpointAtAlignmentLimit() throws Exception {

// validating the sequence of buffers

check(sequence[0], buffer.getNextNonBlocked());
check(sequence[1], buffer.getNextNonBlocked());
check(sequence[2], buffer.getNextNonBlocked());
check(sequence[3], buffer.getNextNonBlocked());
check(sequence[0], buffer.pollNext().get());
check(sequence[1], buffer.pollNext().get());
check(sequence[2], buffer.pollNext().get());
check(sequence[3], buffer.pollNext().get());

// start of checkpoint
long startTs = System.nanoTime();
check(sequence[6], buffer.getNextNonBlocked());
check(sequence[8], buffer.getNextNonBlocked());
check(sequence[10], buffer.getNextNonBlocked());
check(sequence[6], buffer.pollNext().get());
check(sequence[8], buffer.pollNext().get());
check(sequence[10], buffer.pollNext().get());

// trying to pull the next makes the alignment overflow - so buffered buffers are replayed
check(sequence[5], buffer.getNextNonBlocked());
check(sequence[5], buffer.pollNext().get());
validateAlignmentTime(startTs, buffer);
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(7L), any(AlignmentLimitExceededException.class));

// playing back buffered events
check(sequence[7], buffer.getNextNonBlocked());
check(sequence[11], buffer.getNextNonBlocked());
check(sequence[12], buffer.getNextNonBlocked());
check(sequence[13], buffer.getNextNonBlocked());
check(sequence[14], buffer.getNextNonBlocked());
check(sequence[7], buffer.pollNext().get());
check(sequence[11], buffer.pollNext().get());
check(sequence[12], buffer.pollNext().get());
check(sequence[13], buffer.pollNext().get());
check(sequence[14], buffer.pollNext().get());

// the additional data
check(sequence[15], buffer.getNextNonBlocked());
check(sequence[16], buffer.getNextNonBlocked());
check(sequence[17], buffer.getNextNonBlocked());
check(sequence[15], buffer.pollNext().get());
check(sequence[16], buffer.pollNext().get());
check(sequence[17], buffer.pollNext().get());

check(sequence[19], buffer.getNextNonBlocked());
check(sequence[20], buffer.getNextNonBlocked());
check(sequence[21], buffer.getNextNonBlocked());
check(sequence[19], buffer.pollNext().get());
check(sequence[20], buffer.pollNext().get());
check(sequence[21], buffer.pollNext().get());

// no call for a completed checkpoint must have happened
verify(toNotify, times(0)).triggerCheckpointOnBarrier(
Expand Down Expand Up @@ -217,44 +217,44 @@ public void testAlignmentLimitWithQueuedAlignments() throws Exception {
// validating the sequence of buffers
long startTs;

check(sequence[0], buffer.getNextNonBlocked());
check(sequence[1], buffer.getNextNonBlocked());
check(sequence[0], buffer.pollNext().get());
check(sequence[1], buffer.pollNext().get());

// start of checkpoint
startTs = System.nanoTime();
check(sequence[3], buffer.getNextNonBlocked());
check(sequence[7], buffer.getNextNonBlocked());
check(sequence[3], buffer.pollNext().get());
check(sequence[7], buffer.pollNext().get());

// next checkpoint also in progress
check(sequence[11], buffer.getNextNonBlocked());
check(sequence[11], buffer.pollNext().get());

// checkpoint alignment aborted due to too much data
check(sequence[4], buffer.getNextNonBlocked());
check(sequence[4], buffer.pollNext().get());
validateAlignmentTime(startTs, buffer);
verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(AlignmentLimitExceededException.class));

// replay buffered data - in the middle, the alignment for checkpoint 4 starts
check(sequence[6], buffer.getNextNonBlocked());
check(sequence[6], buffer.pollNext().get());
startTs = System.nanoTime();
check(sequence[12], buffer.getNextNonBlocked());
check(sequence[12], buffer.pollNext().get());

// only checkpoint 4 is pending now - the last checkpoint 3 barrier will not trigger success
check(sequence[17], buffer.getNextNonBlocked());
check(sequence[17], buffer.pollNext().get());

// checkpoint 4 completed - check and validate buffered replay
check(sequence[9], buffer.getNextNonBlocked());
check(sequence[9], buffer.pollNext().get());
validateAlignmentTime(startTs, buffer);
verify(toNotify, times(1)).triggerCheckpointOnBarrier(
argThat(new CheckpointMatcher(4L)), any(CheckpointOptions.class), any(CheckpointMetrics.class));

check(sequence[10], buffer.getNextNonBlocked());
check(sequence[15], buffer.getNextNonBlocked());
check(sequence[16], buffer.getNextNonBlocked());
check(sequence[10], buffer.pollNext().get());
check(sequence[15], buffer.pollNext().get());
check(sequence[16], buffer.pollNext().get());

// trailing data
check(sequence[19], buffer.getNextNonBlocked());
check(sequence[20], buffer.getNextNonBlocked());
check(sequence[21], buffer.getNextNonBlocked());
check(sequence[19], buffer.pollNext().get());
check(sequence[20], buffer.pollNext().get());
check(sequence[21], buffer.pollNext().get());

// only checkpoint 4 was successfully completed, not checkpoint 3
verify(toNotify, times(0)).triggerCheckpointOnBarrier(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testWithTwoChannelsAndRandomBarriers() {
BarrierBuffer barrierBuffer = new BarrierBuffer(myIG, new BufferSpiller(ioMan, myIG.getPageSize()));

for (int i = 0; i < 2000000; i++) {
BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
BufferOrEvent boe = barrierBuffer.pollNext().get();
if (boe.isBuffer()) {
boe.getBuffer().recycleBuffer();
}
Expand Down
Loading

0 comments on commit 9efce98

Please sign in to comment.