Skip to content

Commit

Permalink
[hotfix] Speed up flink container creation in testcontainers
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Dec 4, 2020
1 parent 9365b5d commit d6dc8b0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -90,9 +89,6 @@ public class SQLClientSchemaRegistryITCase {
private final KafkaContainerClient kafkaClient = new KafkaContainerClient(kafka);
private CachedSchemaRegistryClient registryClient;

public SQLClientSchemaRegistryITCase() throws IOException {
}

@Before
public void setUp() {
registryClient = new CachedSchemaRegistryClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.exception.NotFoundException;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -243,10 +245,9 @@ public FlinkContainerBuilder javaVersion(String javaVersion) {
return this;
}

public FlinkContainer build() throws IOException {
public FlinkContainer build() {
try {
Path flinkDist = FileUtils.findFlinkDist();
String flinkDistName = flinkDist.getFileName().toString();
temporaryFolder.create();
Path tmp = temporaryFolder.newFolder().toPath();
Path workersFile = tmp.resolve("workers");
Expand All @@ -256,28 +257,75 @@ public FlinkContainer build() throws IOException {
.mapToObj(i -> "localhost")
.collect(Collectors.toList()));

ImageFromDockerfile image = new ImageFromDockerfile("flink-dist", true)
.withDockerfileFromBuilder(
builder -> {
builder.from("openjdk:" + getJavaVersionSuffix())
.copy(flinkDistName, "flink")
.copy(flinkDistName + "/conf/workers", "workers")
.cmd(FLINK_BIN + "/start-cluster.sh && tail -f /dev/null")
.build();
}
)
.withFileFromPath("workers", workersFile)
.withFileFromPath(flinkDistName, flinkDist);
// Building the docker image is split into two stages:
// 1. build a base image with an immutable flink-dist
// 2. based on the base image add any mutable files such as e.g. workers files
//
// This lets us save some time for archiving and copying big, immutable files
// between tests runs.
String baseImage = buildBaseImage(flinkDist);
ImageFromDockerfile configuredImage = buildConfiguredImage(
workersFile,
baseImage);

Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
if (!logBackupDirectory.isPresent()) {
LOG.warn(
"Property {} not set, logs will not be backed up in case of test failures.",
DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
}
return new FlinkContainer(image, numTaskManagers, logBackupDirectory.orElse(null));
} finally {
return new FlinkContainer(
configuredImage,
numTaskManagers,
logBackupDirectory.orElse(null));
} catch (Exception e) {
temporaryFolder.delete();
throw new RuntimeException("Could not build the flink-dist image", e);
}
}

private ImageFromDockerfile buildConfiguredImage(
Path workersFile,
String baseImage) {
return new ImageFromDockerfile(
"flink-dist-configured")
.withDockerfileFromBuilder(
builder -> builder.from(baseImage)
.copy("workers", "flink/conf/workers")
.cmd(FLINK_BIN + "/start-cluster.sh && tail -f /dev/null")
.build()
)
.withFileFromPath("workers", workersFile);
}

@Nonnull
private String buildBaseImage(Path flinkDist)
throws java.util.concurrent.TimeoutException {
String baseImage = "flink-dist-base";
if (!imageExists(baseImage)) {
new ImageFromDockerfile(baseImage)
.withDockerfileFromBuilder(
builder -> builder
.from("openjdk:" + getJavaVersionSuffix())
.copy("flink", "flink")
.build()
)
.withFileFromPath("flink", flinkDist)
.get(1, TimeUnit.MINUTES);
}
return baseImage;
}

private boolean imageExists(String baseImage) {
try {
DockerClientFactory
.instance()
.client()
.inspectImageCmd(baseImage)
.exec();
return true;
} catch (NotFoundException e) {
return false;
}
}

Expand Down

0 comments on commit d6dc8b0

Please sign in to comment.