Skip to content

Commit

Permalink
[FLINK-11464][tests] Add DownloadCache
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 15, 2019
1 parent 751184d commit f0be162
Show file tree
Hide file tree
Showing 10 changed files with 648 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.cache;

import org.apache.flink.tests.util.AutoClosableProcess;
import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.TestUtils;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.stream.Stream;

/**
* Base-class for {@link DownloadCache} implementations. This class handles the download and caching of files and
* provides hooks for encoding/decoding a time-to-live into the file name.
*/
abstract class AbstractDownloadCache implements DownloadCache {

protected final Logger log = LoggerFactory.getLogger(getClass());

private final Path tmpDir;
private final Path downloadsDir;
private final Path cacheFilesDir;

AbstractDownloadCache(final Path tmpDir) {
this.tmpDir = tmpDir;
this.downloadsDir = tmpDir.resolve("downloads");
this.cacheFilesDir = tmpDir.resolve("cachefiles");
}

@Override
public void before() throws IOException {
Files.createDirectories(tmpDir);
Files.createDirectories(downloadsDir);
Files.createDirectories(cacheFilesDir);

try (Stream<Path> cacheFiles = Files.list(cacheFilesDir)) {
final Iterator<Path> iterator = cacheFiles.iterator();
while (iterator.hasNext()) {
final Path cacheFile = iterator.next();
final String cacheFileName = cacheFile.getFileName().toString();

final Matcher matcher = createCacheFileMatcher(cacheFileName);

if (matcher.matches()) {
if (exceedsTimeToLive(matcher)) {
log.info("Invalidating cache entry {}.", regenerateOriginalFileName(matcher));
Files.delete(cacheFile);
}
}
}
}
}

@Override
public void afterTestSuccess() {
}

abstract Matcher createCacheFileMatcher(String cacheFileName);

abstract String generateCacheFileName(String url, String fileName);

abstract String regenerateOriginalFileName(Matcher matcher);

abstract boolean exceedsTimeToLive(Matcher matcher);

abstract boolean matchesCachedFile(Matcher matcher, String url);

@Override
public Path getOrDownload(final String url, final Path targetDir) throws IOException {
final Optional<Path> cachedFile = getCachedFile(url);

final Path cacheFile;
if (cachedFile.isPresent()) {
log.info("Using cached version of {}.", url);
cacheFile = cachedFile.get();
} else {
final Path scopedDownloadDir = downloadsDir.resolve(String.valueOf(url.hashCode()));
Files.createDirectories(scopedDownloadDir);
log.info("Downloading {}.", url);
AutoClosableProcess
.create(
CommandLineWrapper.wget(url)
.targetDir(scopedDownloadDir)
.build())
.runBlocking(Duration.ofMinutes(2));

final Path download;
try (Stream<Path> files = Files.list(scopedDownloadDir)) {
final Optional<Path> any = files.findAny();
download = any.orElseThrow(() -> new IOException("Failed to download " + url + '.'));
}

final String cacheFileName = generateCacheFileName(url, download.getFileName().toString());

cacheFile = cacheFilesDir.resolve(cacheFileName);
if (Files.isDirectory(download)) {
FileUtils.moveDirectory(download.toFile(), cacheFile.toFile());
} else {
Files.move(download, cacheFile);
}
}

final String cacheFileName = cacheFile.getFileName().toString();

final Matcher matcher = createCacheFileMatcher(cacheFileName);
if (!matcher.matches()) {
// this indicates an implementation error or corrupted cache
throw new RuntimeException("Cache file matcher did not accept file retrieved from cache.");
}
final String originalFileName = regenerateOriginalFileName(matcher);

return TestUtils.copyDirectory(cacheFile, targetDir.resolve(originalFileName));
}

private Optional<Path> getCachedFile(final String url) throws IOException {
try (Stream<Path> cacheFiles = Files.list(cacheFilesDir)) {
return cacheFiles
.filter(cacheFile -> {
final String cacheFileName = cacheFile.getFileName().toString();
final Matcher matcher = createCacheFileMatcher(cacheFileName);
return matcher.matches() && matchesCachedFile(matcher, url);
})
.findAny();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.cache;

import org.apache.flink.tests.util.util.FactoryUtils;
import org.apache.flink.util.ExternalResource;

import java.io.IOException;
import java.nio.file.Path;

/**
* A {@link DownloadCache} allows tests to download a files and/or directories and optionally caches them.
*
* <p>Whether, how, and for how long files are cached is implementation-dependent.
*/
public interface DownloadCache extends ExternalResource {

/**
* Returns either a cached or newly downloaded version of the given file. The returned file path is guaranteed to be
* located in the given target directory.
*
* @param url File/directory to download
* @param targetDir directory to place file in
* @return downloaded or cached file
* @throws IOException if any IO operation fails
*/
Path getOrDownload(String url, Path targetDir) throws IOException;

/**
* Returns the configured DownloadCache implementation, or a {@link LolCache} if none is configured.
*
* @return configured DownloadCache, or {@link LolCache} is none is configured
*/
static DownloadCache get() {
return FactoryUtils.loadAndInvokeFactory(
DownloadCacheFactory.class,
DownloadCacheFactory::create,
LolCacheFactory::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.cache;

import java.util.Optional;

/**
* A factory for {@link DownloadCache} implementations.
*/
@FunctionalInterface
public interface DownloadCacheFactory {

/**
* Returns a {@link DownloadCache} instance. If the instance could not be instantiated (for example, because a
* mandatory parameter was missing), then an empty {@link Optional} should be returned.
*
* @return DownloadCache instance, or an empty Optional if the instance could not be instantiated
*/
Optional<DownloadCache> create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.cache;

import org.junit.rules.TemporaryFolder;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* {@link DownloadCache} implementation that does not cache anything.
*
* @see LolCacheFactory
*/
public final class LolCache extends AbstractDownloadCache {

private static final Pattern CACHE_FILE_NAME_PATTERN = Pattern.compile(".*");
private final TemporaryFolder folder;

public LolCache(TemporaryFolder folder) {
super(folder.getRoot().toPath());
this.folder = folder;
}

@Override
public void afterTestSuccess() {
folder.delete();
}

@Override
Matcher createCacheFileMatcher(String cacheFileName) {
return CACHE_FILE_NAME_PATTERN.matcher(cacheFileName);
}

@Override
String generateCacheFileName(String url, String fileName) {
return fileName;
}

@Override
String regenerateOriginalFileName(Matcher matcher) {
return matcher.group(0);
}

@Override
boolean exceedsTimeToLive(Matcher matcher) {
return true;
}

@Override
boolean matchesCachedFile(Matcher matcher, String url) {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.tests.util.cache;

import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Optional;

/**
* A {@link DownloadCacheFactory} for the {@link LolCache}.
*/
public final class LolCacheFactory implements DownloadCacheFactory {
private static final Logger LOG = LoggerFactory.getLogger(LolCacheFactory.class);

@Override
public Optional<DownloadCache> create() {
final TemporaryFolder folder = new TemporaryFolder();
try {
folder.create();
} catch (IOException e) {
throw new RuntimeException("Could not initialize temporary directory.", e);
}
LOG.info("Created {}.", LolCache.class.getSimpleName());
return Optional.of(new LolCache(folder));
}
}
Loading

0 comments on commit f0be162

Please sign in to comment.