Skip to content

Commit

Permalink
[FLINK-20028][table-planner-blink] FileCompactionITCase is unstable
Browse files Browse the repository at this point in the history
This closes apache#14097
  • Loading branch information
JingsongLi committed Nov 18, 2020
1 parent cb850fd commit 2465b71
Showing 1 changed file with 6 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ParallelFiniteTestSource<T> extends RichSourceFunction<T> implement
private final Iterable<T> elements;

private transient volatile boolean running;
private transient int numCheckpointsComplete;
private transient volatile long currentCheckpointId;

public ParallelFiniteTestSource(Iterable<T> elements) {
this.elements = elements;
Expand All @@ -44,7 +44,7 @@ public ParallelFiniteTestSource(Iterable<T> elements) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
running = true;
numCheckpointsComplete = 0;
currentCheckpointId = 0;
}

public boolean isTaskMessage(int id) {
Expand All @@ -58,21 +58,17 @@ public void run(SourceContext<T> ctx) throws Exception {
emitElementsAndWaitForCheckpoints(ctx, 2);

// second round of the same
emitElementsAndWaitForCheckpoints(ctx, 2);
emitElementsAndWaitForCheckpoints(ctx, 4);
}

private void emitElementsAndWaitForCheckpoints(
SourceContext<T> ctx, int checkpointsToWaitFor) throws InterruptedException {
SourceContext<T> ctx, long checkpointIdToWaitFor) throws InterruptedException {
final Object lock = ctx.getCheckpointLock();

final int checkpointToAwait;
synchronized (lock) {
checkpointToAwait = numCheckpointsComplete + checkpointsToWaitFor;
emitRecords(ctx);
}

synchronized (lock) {
while (running && numCheckpointsComplete < checkpointToAwait) {
while (running && currentCheckpointId < checkpointIdToWaitFor) {
lock.wait(1);
}
}
Expand All @@ -97,6 +93,6 @@ public void cancel() {

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
numCheckpointsComplete++;
currentCheckpointId = checkpointId;
}
}

0 comments on commit 2465b71

Please sign in to comment.