Skip to content

Commit

Permalink
[FLINK-8501] [flip6] Use single BlobCacheService per TaskExecutor
Browse files Browse the repository at this point in the history
Instead of creating for each new JobManagerConnection a dedicated BlobCacheService
the TaskExecutor uses a single BlobCacheService which it shares between the
different JobManagerConnections. The initial BlobServer address is passed by the
ResourceManager when the TaskExecutor registers at it. In order to avoid the re-
creation of BlobCacheServices, this commit changes the behaviour such that one can
update the BlobServer address.

This closes apache#5350.
  • Loading branch information
tillrohrmann committed Feb 6, 2018
1 parent 9d0eb22 commit 22bf386
Show file tree
Hide file tree
Showing 49 changed files with 471 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand Down Expand Up @@ -132,6 +133,7 @@ protected ResourceManager<?> createResourceManager(
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
Expand All @@ -150,6 +152,7 @@ protected ResourceManager<?> createResourceManager(
rmRuntimeServices.getSlotManager(),
metricRegistry,
rmRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
configuration,
mesosServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand Down Expand Up @@ -122,6 +123,7 @@ protected ResourceManager<?> createResourceManager(
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
Expand All @@ -140,6 +142,7 @@ protected ResourceManager<?> createResourceManager(
rmRuntimeServices.getSlotManager(),
metricRegistry,
rmRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
configuration,
mesosServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
Expand Down Expand Up @@ -149,6 +150,7 @@ public MesosResourceManager(
SlotManager slotManager,
MetricRegistry metricRegistry,
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
// Mesos specifics
Configuration flinkConfig,
Expand All @@ -166,6 +168,7 @@ public MesosResourceManager(
slotManager,
metricRegistry,
jobLeaderIdService,
clusterInformation,
fatalErrorHandler);

this.mesosServices = Preconditions.checkNotNull(mesosServices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand Down Expand Up @@ -172,10 +173,23 @@ public TestingMesosResourceManager(
MesosConfiguration mesosConfig,
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec) {
super(rpcService, resourceManagerEndpointId, resourceId, resourceManagerConfiguration,
highAvailabilityServices, heartbeatServices, slotManager, metricRegistry,
jobLeaderIdService, fatalErrorHandler, flinkConfig, mesosServices, mesosConfig,
taskManagerParameters, taskManagerContainerSpec);
super(
rpcService,
resourceManagerEndpointId,
resourceId,
resourceManagerConfiguration,
highAvailabilityServices,
heartbeatServices,
slotManager,
metricRegistry,
jobLeaderIdService,
new ClusterInformation("localhost", 1234),
fatalErrorHandler,
flinkConfig,
mesosServices,
mesosConfig,
taskManagerParameters,
taskManagerContainerSpec);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public abstract class AbstractBlobCache implements Closeable {
*/
protected final AtomicLong tempFileCounter = new AtomicLong(0);

protected final InetSocketAddress serverAddress;

/**
* Root directory for local file storage.
*/
Expand Down Expand Up @@ -89,15 +87,16 @@ public abstract class AbstractBlobCache implements Closeable {
*/
protected final ReadWriteLock readWriteLock;

@Nullable
protected volatile InetSocketAddress serverAddress;

public AbstractBlobCache(
final InetSocketAddress serverAddress,
final Configuration blobClientConfig,
final BlobView blobView,
final Logger logger) throws IOException {
final Logger logger,
@Nullable final InetSocketAddress serverAddress) throws IOException {

this.log = checkNotNull(logger);

this.serverAddress = checkNotNull(serverAddress);
this.blobClientConfig = checkNotNull(blobClientConfig);
this.blobView = checkNotNull(blobView);
this.readWriteLock = new ReentrantReadWriteLock();
Expand All @@ -118,6 +117,8 @@ public AbstractBlobCache(

// Add shutdown hook to delete storage directory
shutdownHook = BlobUtils.addShutdownHook(this, log);

this.serverAddress = serverAddress;
}

/**
Expand Down Expand Up @@ -172,16 +173,22 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
log.info("Failed to copy from blob store. Downloading from BLOB server instead.", e);
}

// fallback: download from the BlobServer
BlobClient.downloadFromBlobServer(
jobId, blobKey, incomingFile, serverAddress, blobClientConfig, numFetchRetries);
final InetSocketAddress currentServerAddress = serverAddress;

readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
if (currentServerAddress != null) {
// fallback: download from the BlobServer
BlobClient.downloadFromBlobServer(
jobId, blobKey, incomingFile, currentServerAddress, blobClientConfig, numFetchRetries);

readWriteLock.writeLock().lock();
try {
BlobUtils.moveTempFileToStore(
incomingFile, jobId, blobKey, localFile, log, null);
} finally {
readWriteLock.writeLock().unlock();
}
} else {
throw new IOException("Cannot download from BlobServer, because the server address is unknown.");
}

return localFile;
Expand All @@ -197,10 +204,25 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO
/**
* Returns the port the BLOB server is listening on.
*
* @return BLOB server port
* @return BLOB server port or {@code -1} if no server address
*/
public int getPort() {
return serverAddress.getPort();
final InetSocketAddress currentServerAddress = serverAddress;

if (currentServerAddress != null) {
return serverAddress.getPort();
} else {
return -1;
}
}

/**
* Sets the address of the {@link BlobServer}.
*
* @param blobServerAddress address of the {@link BlobServer}.
*/
public void setBlobServerAddress(InetSocketAddress blobServerAddress) {
serverAddress = checkNotNull(blobServerAddress);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.configuration.Configuration;

import javax.annotation.Nullable;

import java.io.IOException;
import java.net.InetSocketAddress;

Expand All @@ -39,23 +41,22 @@ public class BlobCacheService implements BlobService {
/**
* Instantiates a new BLOB cache.
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from
* @param blobClientConfig
* global configuration
* @param blobView
* (distributed) blob store file system to retrieve files from first
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from or {@code null} if none yet
* @throws IOException
* thrown if the (local or distributed) file storage cannot be created or is not usable
*/
public BlobCacheService(
final InetSocketAddress serverAddress,
final Configuration blobClientConfig,
final BlobView blobView) throws IOException {
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress) throws IOException {

this(new PermanentBlobCache(serverAddress, blobClientConfig, blobView),
new TransientBlobCache(serverAddress, blobClientConfig));
this(new PermanentBlobCache(blobClientConfig, blobView, serverAddress),
new TransientBlobCache(blobClientConfig, serverAddress));
}

/**
Expand All @@ -82,6 +83,16 @@ public TransientBlobCache getTransientBlobService() {
return transientBlobCache;
}

/**
* Sets the address of the {@link BlobServer}.
*
* @param blobServerAddress address of the {@link BlobServer}.
*/
public void setBlobServerAddress(InetSocketAddress blobServerAddress) {
permanentBlobCache.setBlobServerAddress(blobServerAddress);
transientBlobCache.setBlobServerAddress(blobServerAddress);
}

@Override
public void close() throws IOException {
permanentBlobCache.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -85,23 +87,22 @@ static class RefCount {
/**
* Instantiates a new cache for permanent BLOBs which are also available in an HA store.
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from
* @param blobClientConfig
* global configuration
* @param blobView
* (distributed) HA blob store file system to retrieve files from first
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from or {@code null} if none yet
* @throws IOException
* thrown if the (local or distributed) file storage cannot be created or is not usable
*/
public PermanentBlobCache(
final InetSocketAddress serverAddress,
final Configuration blobClientConfig,
final BlobView blobView) throws IOException {
final BlobView blobView,
@Nullable final InetSocketAddress serverAddress) throws IOException {

super(serverAddress, blobClientConfig, blobView,
LoggerFactory.getLogger(PermanentBlobCache.class));
super(blobClientConfig, blobView, LoggerFactory.getLogger(PermanentBlobCache.class), serverAddress
);

// Initializing the clean up task
this.cleanupTimer = new Timer(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,19 @@ public class TransientBlobCache extends AbstractBlobCache implements TransientBl
/**
* Instantiates a new BLOB cache.
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from
* @param blobClientConfig
* global configuration
*
* @param serverAddress
* address of the {@link BlobServer} to use for fetching files from or {@code null} if none yet
* @throws IOException
* thrown if the (local or distributed) file storage cannot be created or is not usable
*/
public TransientBlobCache(
final InetSocketAddress serverAddress,
final Configuration blobClientConfig) throws IOException {
final Configuration blobClientConfig,
@Nullable final InetSocketAddress serverAddress) throws IOException {

super(serverAddress, blobClientConfig, new VoidBlobStore(),
LoggerFactory.getLogger(TransientBlobCache.class));
super(blobClientConfig, new VoidBlobStore(), LoggerFactory.getLogger(TransientBlobCache.class), serverAddress
);

// Initializing the clean up task
this.cleanupTimer = new Timer(true);
Expand Down Expand Up @@ -122,30 +121,30 @@ protected File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IO

@Override
public TransientBlobKey putTransient(byte[] value) throws IOException {
try (BlobClient bc = new BlobClient(serverAddress, blobClientConfig)) {
try (BlobClient bc = createClient()) {
return (TransientBlobKey) bc.putBuffer(null, value, 0, value.length, TRANSIENT_BLOB);
}
}

@Override
public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException {
checkNotNull(jobId);
try (BlobClient bc = new BlobClient(serverAddress, blobClientConfig)) {
try (BlobClient bc = createClient()) {
return (TransientBlobKey) bc.putBuffer(jobId, value, 0, value.length, TRANSIENT_BLOB);
}
}

@Override
public TransientBlobKey putTransient(InputStream inputStream) throws IOException {
try (BlobClient bc = new BlobClient(serverAddress, blobClientConfig)) {
try (BlobClient bc = createClient()) {
return (TransientBlobKey) bc.putInputStream(null, inputStream, TRANSIENT_BLOB);
}
}

@Override
public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException {
checkNotNull(jobId);
try (BlobClient bc = new BlobClient(serverAddress, blobClientConfig)) {
try (BlobClient bc = createClient()) {
return (TransientBlobKey) bc.putInputStream(jobId, inputStream, TRANSIENT_BLOB);
}
}
Expand Down Expand Up @@ -222,7 +221,13 @@ public File getStorageLocation(@Nullable JobID jobId, BlobKey key) throws IOExce
}

private BlobClient createClient() throws IOException {
return new BlobClient(serverAddress, blobClientConfig);
final InetSocketAddress currentServerAddress = serverAddress;

if (currentServerAddress != null) {
return new BlobClient(serverAddress, blobClientConfig);
} else {
throw new IOException("Could not create BlobClient because the BlobServer address is unknown.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public static ClassLoader retrieveClassLoader(
final PermanentBlobCache permanentBlobCache;
try {
// TODO: Fix lifecycle of PermanentBlobCache to properly close it upon usage
permanentBlobCache = new PermanentBlobCache(serverAddress, config, highAvailabilityServices.createBlobStore());
permanentBlobCache = new PermanentBlobCache(config, highAvailabilityServices.createBlobStore(), serverAddress);
} catch (IOException e) {
throw new JobRetrievalException(
jobID,
Expand Down
Loading

0 comments on commit 22bf386

Please sign in to comment.