diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java index c2e132db960c7..e89016b6113c1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java @@ -348,10 +348,6 @@ public synchronized void close() throws IOException { allBarriersReceivedFuture.cancel(false); } - boolean isCheckpointPending() { - return numBarriersReceived > 0; - } - private synchronized void handleNewCheckpoint(CheckpointBarrier barrier) throws IOException { long barrierId = barrier.getId(); if (!allBarriersReceivedFuture.isDone() && isCheckpointPending()) { @@ -410,8 +406,14 @@ synchronized int getNumOpenChannels() { return numOpenChannels; } + @VisibleForTesting synchronized long getCurrentCheckpointId() { return currentReceivedCheckpointId; } + + @VisibleForTesting + boolean isCheckpointPending() { + return numBarriersReceived > 0; + } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java index 1e2ccf3c6feaa..65fa098e365ec 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java @@ -185,7 +185,8 @@ public void close() throws IOException { * * @return The ID of the pending of completed checkpoint. */ - public long getLatestCheckpointId() { + @VisibleForTesting + long getLatestCheckpointId() { return barrierHandler.getLatestCheckpointId(); }