Skip to content

Commit

Permalink
[FLINK-7053][blob] Improve code quality in BlobServer related tests
Browse files Browse the repository at this point in the history
This lets BlobClientSslTest extend BlobClientTest as most of its implementation
came from there and was simply copied.

[FLINK-7053][blob] verify some of the buffers returned by GET

[FLINK-7053][blob] use TemporaryFolder for local BLOB dir in unit tests

This replaces the use of some temporary directory where it is not guaranteed
that it will be deleted after the test.

This closes apache#4234.
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Aug 7, 2017
1 parent 3050986 commit c26c2e7
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.api.java.io.AvroOutputFormat;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
Expand All @@ -48,7 +49,9 @@
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.File;
import java.io.IOException;
Expand All @@ -70,6 +73,9 @@ public class HDFSTest {
private org.apache.hadoop.fs.Path hdPath;
protected org.apache.hadoop.fs.FileSystem hdfs;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@BeforeClass
public static void verifyOS() {
Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
Expand Down Expand Up @@ -242,6 +248,8 @@ public void testBlobServerRecovery() throws Exception {
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobStoreService blobStoreService = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.blob;

import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.junit.Rule;
Expand Down Expand Up @@ -45,6 +46,8 @@ public class BlobCacheRetriesTest {
@Test
public void testBlobFetchRetries() throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());

testBlobFetchRetries(config, new VoidBlobStore());
}
Expand All @@ -56,9 +59,11 @@ public void testBlobFetchRetries() throws IOException {
@Test
public void testBlobFetchRetriesHa() throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.getRoot().getPath());
temporaryFolder.newFolder().getPath());

BlobStoreService blobStoreService = null;

Expand Down Expand Up @@ -136,6 +141,8 @@ private void testBlobFetchRetries(final Configuration config, final BlobStore bl
@Test
public void testBlobFetchWithTooManyFailures() throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());

testBlobFetchWithTooManyFailures(config, new VoidBlobStore());
}
Expand All @@ -147,6 +154,8 @@ public void testBlobFetchWithTooManyFailures() throws IOException {
@Test
public void testBlobFetchWithTooManyFailuresHa() throws IOException {
final Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.getRoot().getPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.flink.runtime.blob;

import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.util.Preconditions;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -52,6 +55,9 @@ public class BlobCacheSuccessTest {
@Test
public void testBlobCache() throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());

uploadFileGetTest(config, false, false);
}

Expand All @@ -63,27 +69,63 @@ public void testBlobCache() throws IOException {
@Test
public void testBlobCacheHa() throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.getRoot().getPath());
temporaryFolder.newFolder().getPath());
uploadFileGetTest(config, true, true);
}

/**
* BlobCache is configured in HA mode and the cache can download files from
* the file system directly and does not need to download BLOBs from the
* BlobServer.
*/
@Test
public void testBlobCacheHa2() throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.newFolder().getPath());
uploadFileGetTest(config, false, true);
}

/**
* BlobCache is configured in HA mode but the cache itself cannot access the
* file system and thus needs to download BLOBs from the BlobServer.
*/
@Test
public void testBlobCacheHaFallback() throws IOException {
Configuration config = new Configuration();
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.getRoot().getPath());
temporaryFolder.newFolder().getPath());
uploadFileGetTest(config, false, false);
}

private void uploadFileGetTest(final Configuration config, boolean cacheWorksWithoutServer,
boolean cacheHasAccessToFs) throws IOException {
/**
* Uploads two different BLOBs to the {@link BlobServer} via a {@link BlobClient} and verifies
* we can access the files from a {@link BlobCache}.
*
* @param config
* configuration to use for the server and cache (the final cache's configuration will
* actually get some modifications)
* @param shutdownServerAfterUpload
* whether the server should be shut down after uploading the BLOBs (only useful with HA mode)
* - this implies that the cache has access to the shared <tt>HA_STORAGE_PATH</tt>
* @param cacheHasAccessToFs
* whether the cache should have access to a shared <tt>HA_STORAGE_PATH</tt> (only useful with
* HA mode)
*/
private void uploadFileGetTest(final Configuration config, boolean shutdownServerAfterUpload,
boolean cacheHasAccessToFs) throws IOException {
Preconditions.checkArgument(!shutdownServerAfterUpload || cacheHasAccessToFs);

// First create two BLOBs and upload them to BLOB server
final byte[] buf = new byte[128];
final List<BlobKey> blobKeys = new ArrayList<BlobKey>(2);
Expand All @@ -92,15 +134,15 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit
BlobCache blobCache = null;
BlobStoreService blobStoreService = null;
try {
final Configuration cacheConfig;
if (cacheHasAccessToFs) {
cacheConfig = config;
} else {
// just in case parameters are still read from the server,
// create a separate configuration object for the cache
cacheConfig = new Configuration(config);
final Configuration cacheConfig = new Configuration(config);
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
if (!cacheHasAccessToFs) {
// make sure the cache cannot access the HA store directly
cacheConfig.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
cacheConfig.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.getRoot().getPath() + "/does-not-exist");
temporaryFolder.newFolder().getPath() + "/does-not-exist");
}

blobStoreService = BlobUtils.createBlobStoreFromConfig(cacheConfig);
Expand All @@ -124,7 +166,7 @@ private void uploadFileGetTest(final Configuration config, boolean cacheWorksWit
}
}

if (cacheWorksWithoutServer) {
if (shutdownServerAfterUpload) {
// Now, shut down the BLOB server, the BLOBs must still be accessible through the cache.
blobServer.close();
blobServer = null;
Expand Down
Loading

0 comments on commit c26c2e7

Please sign in to comment.