Skip to content

Commit

Permalink
[FLINK-17344][test] Fix unstability on IdleTime metric test.
Browse files Browse the repository at this point in the history
Previously issue was that a thread could finish before the main thread reached .wait() call
  • Loading branch information
wenlong.lwl authored and pnowojski committed Apr 24, 2020
1 parent eeaf08f commit 948a42f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,16 +519,14 @@ public void testIdleTime() throws IOException, InterruptedException {
// idle time is zero when there is buffer available.
assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount());

final Object runningLock = new Object();
CountDownLatch syncLock = new CountDownLatch(1);
AtomicReference<BufferBuilder> asyncRequestResult = new AtomicReference<>();
final Thread requestThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// notify that the request thread start to run.
synchronized (runningLock) {
runningLock.notify();
}
syncLock.countDown();
// wait for buffer.
asyncRequestResult.set(recordWriter.getBufferBuilder());
} catch (Exception e) {
Expand All @@ -538,9 +536,8 @@ public void run() {
requestThread.start();

// wait until request thread start to run.
synchronized (runningLock) {
runningLock.wait();
}
syncLock.await();

Thread.sleep(10);
//recycle the buffer
final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(builder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -267,6 +268,7 @@ public void testIdleTime() throws InterruptedException {
final AtomicReference<MailboxDefaultAction.Suspension> suspendedActionRef = new AtomicReference<>();
final int totalSwitches = 2;

CountDownLatch syncLock = new CountDownLatch(1);
MailboxThread mailboxThread = new MailboxThread() {
int count = 0;

Expand All @@ -278,12 +280,14 @@ public void runDefaultAction(Controller controller) {
if (count == totalSwitches) {
controller.allActionsCompleted();
}
syncLock.countDown();
}
};
mailboxThread.start();
final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
mailboxThread.signalStart();

syncLock.await();
Thread.sleep(10);
mailboxProcessor.getMailboxExecutor(DEFAULT_PRIORITY).execute(suspendedActionRef.get()::resume, "resume");
mailboxThread.join();
Expand Down

0 comments on commit 948a42f

Please sign in to comment.