Skip to content

Commit

Permalink
[FLINK-7068][blob] change BlobService sub-classes for permanent and t…
Browse files Browse the repository at this point in the history
…ransient BLOBs

[FLINK-7068][blob] start introducing a new BLOB storage abstraction

This is incomplete and may not compile and/or run tests successfully yet.

[FLINK-7068][blob] remove BlobView from TransientBlobCache

The transient BLOB cache is not supposed to work with the HA store since it only
serves non-HA files.

[FLINK-7068][blob] remove unnecessary use of BlobClient

[FLINK-7068][blob] implement TransientBlobCache#put methods

[FLINK-7068][blob] remove further unnecessary use of BlobClient and adapt to HA get/put methods

[FLINK-7068][blob] fix BlobServer#getFileInternal not being guarded by locks

[FLINK-7068][blob] add incoming file cleanup at BlobServer in cases of errors

[FLINK-7068] fix missing BlobServer#putHA() jobId propagation

[FLINK-7068][blob] remove BlobClient use from BlobServer{Get|Put}Test

[FLINK-7068][blob] make helper methods work with any BlobService

[FLINK-7068][blob] start adding a BlobCacheGetTest

[FLINK-7068][blob] verify get contents in separate threads

This allows (at a slight chance) that we may see an intermediate file.

[FLINK-7068][blob] better locking granularity during file retrieval

This allows multiple parallel downloads from the HA store to the BlobServer's
local store although only one of these downloaded staging files will actually
be used. In practice, this happens only during recovery and not in parallel
anyways.

[FLINK-7068][blob] share more code among BlobServer and BlobServerConnection

This also applies the better locking granularity of the previous commit to
BlobServerConnection.

[FLINK-7068][blob] properly cleanup temporary staging files in all cases

[FLINK-7068][blob] make PermanentBlobCache and TransientBlobCache thread-safe

[FLINK-7068][tests] improve various tests

[FLINK-7068][blob] change the signature of the delete calls to return success

We will not throw exceptions in case of failures anymore and return whether the
operation was successful instead. Failure details will still be accessible in
the written logs.

[FLINK-7068][tests] extend and adapt BlobServerDeleteTest

[FLINK-7068][tests] adapt further BlobCache tests

[FLINK-7068][tests] adapt BlobClientTest

[FLINK-7068][blob] cleanup BlobClient methods

BlobClient is not supposed to be used by anyone else than the
BlobServer/BlobCache classes. Most accessors were already package-private, now
remove the ones that just blow up the code.

[FLINK-7068] add a TODO to fix the currently failing tests

[FLINK-7068][tests] add a BlobCacheRecoveryTest

This currently fails due to TransientBlobCache#put also storing files in HA
store which it should not!

[FLINK-7068][tests] improve failure message

[FLINK-7068][blob] add permanent/transient BLOB modes to BlobClient

This allows a better control of which should end up in HA store and which should
not. Also, during GET methods, we do not check the HA store unnecessarily.

[FLINK-7068][tests] extend the Blob{Server|Cache}GetTest

This adds some failing GET operations and verifies that the files are cleaned
up accordingly.

[FLINK-7068][blob] remove "final" flag from BlobCache class

This re-enables mocking in various unit tests.

[FLINK-7068][tests] fix test relying on order of folder contents

[FLINK-7068][blob] some BlobServer cleanup

[FLINK-7068][hotfix] fix checkstyle errors

[FLINK-7068][tests] fix tests now requiring a more complete BlobCache mock

A suitable BlobCache mock should at least return a mock for a permanent and a
transient BLOB store, so mock(BlobCache.class) is not sufficient anymore.

[FLINK-7068] final wrap up

* remove a left-over TODO
* remove useless tests for the concurrency of the GET operations (we cannot test
that the file write is guarded by a lock directly - rely on the concurrent
checks in the individual threads instead)
* fix some log messages

[FLINK-7068][blob] remove Thread#start() call from BlobServer constructor

This is bad design and limits extensibility, e.g. in tests like the
BlobCacheRetriesTest where this caused a race condition with the sub-class.
Instead, the user must now call BlobServer#start() explicitely.

[FLINK-7068][tests] remove unused imports

[FLINK-7068][tests] fix a typo

[FLINK-7068][tests] add some tests that verify behaviour with corrupted files

Also add corruption checks for HA-store downloads which was not implemented yet.

[FLINK-7068][blob] ensure consistency in PermanentBlobCache even in cases of invalid use

During cleanup, no write lock was taken but the storage directory of an
(unused!) job was deleted. Normally, there should be no process left accessing
its data and no new process can jump in since the registration is locked. In
case of invalid use cases, i.e. using a job's data outside a register() and
release() block, this could lead to strange effects.
By guarding the cleanup with the write lock as well, we circumvent that.

[FLINK-7068][hotfix] remove an unused import
  • Loading branch information
Nico Kruber authored and tillrohrmann committed Oct 5, 2017
1 parent 98f6dea commit 071e27f
Show file tree
Hide file tree
Showing 63 changed files with 5,555 additions and 2,078 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.hdfstests;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
Expand All @@ -30,7 +31,10 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.blob.BlobRecoveryITCase;
import org.apache.flink.runtime.blob.BlobCacheCorruptionTest;
import org.apache.flink.runtime.blob.BlobCacheRecoveryTest;
import org.apache.flink.runtime.blob.BlobServerCorruptionTest;
import org.apache.flink.runtime.blob.BlobServerRecoveryTest;
import org.apache.flink.runtime.blob.BlobStoreService;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
Expand All @@ -51,6 +55,7 @@
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

import java.io.File;
Expand All @@ -76,6 +81,9 @@ public class HDFSTest {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Rule
public final ExpectedException exception = ExpectedException.none();

@BeforeClass
public static void verifyOS() {
Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows());
Expand Down Expand Up @@ -240,7 +248,7 @@ public void testDeletePathIfEmpty() throws IOException {

/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobServer.
* participating BlobServer when talking to the {@link org.apache.flink.runtime.blob.BlobServer} directly.
*/
@Test
public void testBlobServerRecovery() throws Exception {
Expand All @@ -257,7 +265,88 @@ public void testBlobServerRecovery() throws Exception {
try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

BlobRecoveryITCase.testBlobServerRecovery(config, blobStoreService);
BlobServerRecoveryTest.testBlobServerRecovery(config, blobStoreService);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}

/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobServer}.
*/
@Test
public void testBlobServerCorruptedFile() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobStoreService blobStoreService = null;

try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

BlobServerCorruptionTest.testGetFailsFromCorruptFile(config, blobStoreService, exception);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}

/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed JARs are recoverable from any
* participating BlobServer when uploaded via a {@link org.apache.flink.runtime.blob.BlobCache}.
*/
@Test
public void testBlobCacheRecovery() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobStoreService blobStoreService = null;

try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

BlobCacheRecoveryTest.testBlobCacheRecovery(config, blobStoreService);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}

/**
* Tests that with {@link HighAvailabilityMode#ZOOKEEPER} distributed corrupted JARs are
* recognised during the download via a {@link org.apache.flink.runtime.blob.BlobCache}.
*/
@Test
public void testBlobCacheCorruptedFile() throws Exception {
org.apache.flink.configuration.Configuration
config = new org.apache.flink.configuration.Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(CoreOptions.STATE_BACKEND, "ZOOKEEPER");
config.setString(BlobServerOptions.STORAGE_DIRECTORY,
temporaryFolder.newFolder().getAbsolutePath());
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, hdfsURI);

BlobStoreService blobStoreService = null;

try {
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);

BlobCacheCorruptionTest.testGetFailsFromCorruptFile(new JobID(), true, true, config, blobStoreService, exception);
} finally {
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
Expand Down Expand Up @@ -156,7 +155,6 @@ public class WebRuntimeMonitor implements WebMonitor {
public WebRuntimeMonitor(
Configuration config,
LeaderRetrievalService leaderRetrievalService,
BlobView blobView,
LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
Expand Down Expand Up @@ -297,17 +295,15 @@ public WebRuntimeMonitor(
localRestAddress,
timeout,
TaskManagerLogHandler.FileMode.LOG,
config,
blobView));
config));
get(router,
new TaskManagerLogHandler(
retriever,
scheduledExecutor,
localRestAddress,
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
blobView));
config));
get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher));

router
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
Expand Down Expand Up @@ -175,7 +174,6 @@ public void testRedirectToLeader() throws Exception {
webMonitor[i] = new WebRuntimeMonitor(
config,
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
highAvailabilityServices.createBlobStore(),
jobManagerRetrievers[i],
new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT),
TIMEOUT,
Expand Down Expand Up @@ -319,7 +317,6 @@ public void testLeaderNotAvailable() throws Exception {
webRuntimeMonitor = new WebRuntimeMonitor(
config,
mock(LeaderRetrievalService.class),
mock(BlobView.class),
new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(actorSystem, TIMEOUT),
TIMEOUT,
Expand Down
Loading

0 comments on commit 071e27f

Please sign in to comment.