Skip to content

Commit

Permalink
Makes the testPauseResume more deterministic with UDP
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Feb 22, 2023
1 parent cfa29b8 commit 6f2bcae
Showing 1 changed file with 29 additions and 19 deletions.
48 changes: 29 additions & 19 deletions src/test/java/io/vertx/core/datagram/DatagramTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.netty.TestLoggerFactory;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

Expand Down Expand Up @@ -177,33 +178,42 @@ public void testPauseResume() {
peer2 = vertx.createDatagramSocket(new DatagramSocketOptions());
peer2.exceptionHandler(t -> fail(t.getMessage()));
peer2.listen(1234, "127.0.0.1", ar -> {
Buffer buffer = TestUtils.randomBuffer(128);
AtomicBoolean received = new AtomicBoolean();
peer2.handler(packet -> received.set(true));
final AtomicBoolean suspendedReceive = new AtomicBoolean();
peer2.handler(packet -> suspendedReceive.set(true));
peer2.pause();
Buffer buffer = TestUtils.randomBuffer(128);
peer1.send(buffer, 1234, "127.0.0.1", ar2 -> {
assertTrue(ar2.succeeded());
});
vertx.setTimer(1000, l -> {
AtomicInteger count = new AtomicInteger();
final int MAX_FAILED_ATTEMPTS = 10;
vertx.setTimer(1000, ignore -> {
Assert.assertFalse(suspendedReceive.get());
AtomicBoolean resumedReceive = new AtomicBoolean();
peer2.handler(packet -> {
switch (count.getAndIncrement()) {
case 0:
assertEquals(buffer, packet.data());
peer1.send(buffer, 1234, "127.0.0.1", ar2 -> {
assertTrue(ar2.succeeded());
});
break;
case 1:
assertFalse(received.get());
assertEquals(buffer, packet.data());
testComplete();
break;
default:
fail();
Assert.assertEquals(buffer, packet.data());
if (resumedReceive.compareAndSet(false, true)) {
testComplete();
}
});
peer2.resume();
peer1.send(buffer, 1234, "127.0.0.1", ar2 -> {
assertTrue(ar2.succeeded());
});
AtomicInteger failedAttempts = new AtomicInteger();
vertx.setPeriodic(1000, l -> {
if (resumedReceive.get()) {
vertx.cancelTimer(l.longValue());
return;
}
if (failedAttempts.incrementAndGet() == MAX_FAILED_ATTEMPTS) {
vertx.cancelTimer(l.longValue());
fail("failed to receive any packet while resumed: retried " + MAX_FAILED_ATTEMPTS + " times");
return;
}
peer1.send(buffer, 1234, "127.0.0.1", ar2 -> {
assertTrue(ar2.succeeded());
});
});
});
});
await();
Expand Down

0 comments on commit 6f2bcae

Please sign in to comment.