Skip to content

Commit

Permalink
[FLINK-16572] Clean up PubSub connector e2e test
Browse files Browse the repository at this point in the history
- execute as regular test to have proper logging
- document copied code
- fix typos
  • Loading branch information
rmetzger committed Jun 4, 2020
1 parent 9211cb5 commit 52861e3
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 15 deletions.
6 changes: 6 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUT
DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE
USE OR PERFORMANCE OF THIS SOFTWARE.

The Apache Flink project contains or reuses code that is licensed under the Apache 2.0 license from the following projects:
- Google Cloud Client Library for Java (https://github.com/googleapis/google-cloud-java) Copyright 2017 Google LLC

See: flink-end-to-end-tests/flink-connector-gcp-pubsub-emulator-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java

Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.12.4</version>
<configuration>
<skipTests>${skipTests}</skipTests>
<forkCount>1</forkCount> <!-- Enforce single test execution -->
</configuration>
<executions>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ public class CheckPubSubEmulatorTest extends GCloudUnitTestBase {
private static final String TOPIC_NAME = "Topic";
private static final String SUBSCRIPTION_NAME = "Subscription";

private static PubsubHelper pubsubHelper = getPubsubHelper();
private static PubsubHelper pubsubHelper;

@BeforeClass
public static void setUp() throws Exception {
pubsubHelper = getPubsubHelper();
pubsubHelper.createTopic(PROJECT_NAME, TOPIC_NAME);
pubsubHelper.createSubscription(PROJECT_NAME, SUBSCRIPTION_NAME, PROJECT_NAME, TOPIC_NAME);
}
Expand All @@ -78,7 +79,7 @@ public void testPull() throws Exception {

//TODO this is just to test if we need to wait longer, or if something has gone wrong and the message will never arrive
if (receivedMessages.isEmpty()) {
LOG.error("Message did not arrive, gonna wait 60s and try to pull again.");
LOG.error("Message did not arrive, gonna wait 30s and try to pull again.");
Thread.sleep(30 * 1000);
receivedMessages = pubsubHelper.pullMessages(PROJECT_NAME, SUBSCRIPTION_NAME, 1);
}
Expand Down Expand Up @@ -108,30 +109,28 @@ public void testPub() throws Exception {

LOG.info("Waiting a while to receive the message...");

waitUntill(() -> receivedMessages.size() > 0);
waitUntil(() -> receivedMessages.size() > 0);

assertEquals(1, receivedMessages.size());
assertEquals("Hello World", receivedMessages.get(0).getData().toStringUtf8());

try {
subscriber.stopAsync().awaitTerminated(100, MILLISECONDS);
} catch (TimeoutException tme) {
// Yeah, whatever. Don't care about clean shutdown here.
LOG.info("Timeout during shutdown", tme);
}
publisher.shutdown();
}

/*
* Returns when predicate returns true or if 10 seconds have passed
*/
private void waitUntill(Supplier<Boolean> predicate) {
private void waitUntil(Supplier<Boolean> predicate) throws InterruptedException {
int retries = 0;

while (!predicate.get() && retries < 100) {
retries++;
try {
Thread.sleep(10);
} catch (InterruptedException e) { }
Thread.sleep(10);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.spotify.docker.client.DefaultDockerClient;
import com.spotify.docker.client.DockerClient;
import com.spotify.docker.client.LogStream;
import com.spotify.docker.client.exceptions.ContainerNotFoundException;
import com.spotify.docker.client.exceptions.DockerCertificateException;
import com.spotify.docker.client.exceptions.DockerException;
Expand Down Expand Up @@ -116,7 +117,7 @@ public static void launchDocker() throws DockerException, InterruptedException,
.hostConfig(hostConfig)
.exposedPorts(INTERNAL_PUBSUB_PORT)
.image(DOCKER_IMAGE_NAME)
.cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT)
.cmd("sh", "-c", "mkdir -p /opt/data/pubsub ; gcloud beta emulators pubsub start --data-dir=/opt/data/pubsub --host-port=0.0.0.0:" + INTERNAL_PUBSUB_PORT)
.build();

final ContainerCreation creation = docker.createContainer(containerConfig, CONTAINER_NAME_JUNIT);
Expand Down Expand Up @@ -220,15 +221,24 @@ private static void terminateAndDiscardAnyExistingContainers(boolean warnAboutEx
containerInfo = docker.inspectContainer(CONTAINER_NAME_JUNIT);
// Already have this container running.

assertNotNull("We should either we get containerInfo or we get an exception", containerInfo);
assertNotNull("We should either get a containerInfo or we get an exception", containerInfo);

LOG.info("");
LOG.info("/===========================================");
if (warnAboutExisting) {
LOG.warn("| >>> FOUND OLD EMULATOR INSTANCE RUNNING <<< ");
LOG.warn("| Destroying that one to keep tests running smoothly.");
}
LOG.info("| Cleanup of GCloud Emulator");
LOG.info("| Cleanup of GCloud Emulator. Log output of container: ");

if (LOG.isInfoEnabled()) {
try (LogStream stream = docker.logs(
containerInfo.id(),
DockerClient.LogsParam.stdout(),
DockerClient.LogsParam.stderr())) {
LOG.info("| > {}", stream.readFully());
}
}

// We REQUIRE 100% accurate side effect free unit tests
// So we completely discard this one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.BeforeClass;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudEmulatorManager.getDockerIpAddress;
Expand All @@ -44,6 +45,9 @@ public static void launchGCloudEmulator() throws Exception {

@AfterClass
public static void terminateGCloudEmulator() throws DockerException, InterruptedException {
channel.shutdownNow();
channel.awaitTermination(1, TimeUnit.MINUTES);
channel = null;
GCloudEmulatorManager.terminateDocker();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ public void deleteSubscription(ProjectSubscriptionName subscriptionName) throws
}
}

//
// Mostly copied from the example on https://cloud.google.com/pubsub/docs/pull
// Licensed under the Apache 2.0 License to "Google LLC" from https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/snippets/SubscriberSnippets.java.
//
public List<ReceivedMessage> pullMessages(String projectId, String subscriptionId, int maxNumberOfMessages) throws Exception {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
# limitations under the License.
################################################################################

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
# Set logging level to INFO: Tests are executed as a Bash e2e test.
rootLogger.level = INFO
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# limitations under the License.
################################################################################

# This test is a Java end to end test, but it requires Docker. Therefore, we run it from bash.

cd "${END_TO_END_DIR}/flink-connector-gcp-pubsub-emulator-tests"

run_mvn test -DskipTests=false

0 comments on commit 52861e3

Please sign in to comment.