diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/ParallelFiniteTestSource.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/ParallelFiniteTestSource.java index 952ba75a43fa5..12c2e617f09dd 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/ParallelFiniteTestSource.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/ParallelFiniteTestSource.java @@ -34,7 +34,7 @@ public class ParallelFiniteTestSource extends RichSourceFunction implement private final Iterable elements; private transient volatile boolean running; - private transient int numCheckpointsComplete; + private transient volatile long currentCheckpointId; public ParallelFiniteTestSource(Iterable elements) { this.elements = elements; @@ -44,7 +44,7 @@ public ParallelFiniteTestSource(Iterable elements) { public void open(Configuration parameters) throws Exception { super.open(parameters); running = true; - numCheckpointsComplete = 0; + currentCheckpointId = 0; } public boolean isTaskMessage(int id) { @@ -58,21 +58,17 @@ public void run(SourceContext ctx) throws Exception { emitElementsAndWaitForCheckpoints(ctx, 2); // second round of the same - emitElementsAndWaitForCheckpoints(ctx, 2); + emitElementsAndWaitForCheckpoints(ctx, 4); } private void emitElementsAndWaitForCheckpoints( - SourceContext ctx, int checkpointsToWaitFor) throws InterruptedException { + SourceContext ctx, long checkpointIdToWaitFor) throws InterruptedException { final Object lock = ctx.getCheckpointLock(); - final int checkpointToAwait; synchronized (lock) { - checkpointToAwait = numCheckpointsComplete + checkpointsToWaitFor; emitRecords(ctx); - } - synchronized (lock) { - while (running && numCheckpointsComplete < checkpointToAwait) { + while (running && currentCheckpointId < checkpointIdToWaitFor) { lock.wait(1); } } @@ -97,6 +93,6 @@ public void cancel() { @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - numCheckpointsComplete++; + currentCheckpointId = checkpointId; } }