Skip to content

Commit

Permalink
[FLINK-16572][e2e][pubsub] Acknowledge message in previous test
Browse files Browse the repository at this point in the history
The test running before the failing test did not properly acknowledge the
reception of the message.
That's also the reason why this test always logged a timeout exception.

With this change, the test will fail with timeout exceptions, and maybe this
improves the overall test stability.
  • Loading branch information
rmetzger committed Jun 8, 2020
1 parent a08061b commit 0358292
Showing 1 changed file with 9 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.junit.Assert.assertEquals;

/**
Expand Down Expand Up @@ -77,12 +76,6 @@ public void testPull() throws Exception {

List<ReceivedMessage> receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 1);

//TODO this is just to test if we need to wait longer, or if something has gone wrong and the message will never arrive
if (receivedMessages.isEmpty()) {
LOG.error("Message did not arrive, gonna wait 30s and try to pull again.");
Thread.sleep(30 * 1000);
receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 1);
}
assertEquals(1, receivedMessages.size());
assertEquals("Hello World PULL", receivedMessages.get(0).getMessage().getData().toStringUtf8());

Expand All @@ -96,8 +89,12 @@ public void testPub() throws Exception {
subscribeToSubscription(
PROJECT_NAME,
SUBSCRIPTION_NAME,
(message, consumer) -> receivedMessages.add(message)
(message, consumer) -> {
receivedMessages.add(message);
consumer.ack();
}
);
subscriber.awaitRunning(5, MINUTES);

Publisher publisher = pubsubHelper.createPublisher(PROJECT_NAME, TOPIC_NAME);
publisher
Expand All @@ -114,11 +111,9 @@ public void testPub() throws Exception {
assertEquals(1, receivedMessages.size());
assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());

try {
subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
} catch (TimeoutException tme) {
LOG.info("Timeout during shutdown", tme);
}
LOG.info("Received message. Shutting down ...");

subscriber.stopAsync().awaitTerminated(5, MINUTES);
publisher.shutdown();
}

Expand Down

0 comments on commit 0358292

Please sign in to comment.