Skip to content

Commit

Permalink
[BEAM-9577] Update Java Runners to handle dependency-based artifact s…
Browse files Browse the repository at this point in the history
…taging. (apache#11521)
  • Loading branch information
robertwb authored May 12, 2020
1 parent d8e8616 commit 3584015
Show file tree
Hide file tree
Showing 23 changed files with 356 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testJobServerDriver() throws Exception {
newErr.flush();
String output = baos.toString(Charsets.UTF_8.name());
if (output.contains("JobService started on localhost:")
&& output.contains("LegacyArtifactStagingService started on localhost:")
&& output.contains("ArtifactStagingService started on localhost:")
&& output.contains("ExpansionService started on localhost:")) {
success = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
package org.apache.beam.runners.fnexecution;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Server;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/**
* A {@link Server gRPC Server} which manages a single {@link FnService}. The lifetime of the
Expand All @@ -39,6 +43,58 @@ public static <ServiceT extends FnService> GrpcFnServer<ServiceT> allocatePortAn
return new GrpcFnServer<>(server, service, apiServiceDescriptor.build());
}

/**
* Create {@link GrpcFnServer}s for the provided {@link FnService}s running on an arbitrary port.
*/
public static List<GrpcFnServer<? extends FnService>> allocatePortAndCreateFor(
List<? extends FnService> services, ServerFactory factory) throws IOException {
ApiServiceDescriptor.Builder apiServiceDescriptor = ApiServiceDescriptor.newBuilder();
Server server =
factory.allocateAddressAndCreate(
Collections.unmodifiableList(services), apiServiceDescriptor);
AtomicInteger countdown = new AtomicInteger(services.size());
return Lists.transform(
services,
service ->
new SharedGrpcFnServer<>(server, service, apiServiceDescriptor.build(), countdown));
}

/**
* Create {@link GrpcFnServer}s for the provided {@link FnService}s running on a specified port.
*/
public static List<GrpcFnServer<? extends FnService>> create(
List<? extends FnService> services, ApiServiceDescriptor endpoint, ServerFactory factory)
throws IOException {
Server server = factory.create(Collections.unmodifiableList(services), endpoint);
AtomicInteger countdown = new AtomicInteger(services.size());
return Lists.transform(
services, service -> new SharedGrpcFnServer<>(server, service, endpoint, countdown));
}

private static class SharedGrpcFnServer<ServiceT extends FnService>
extends GrpcFnServer<ServiceT> {

private AtomicInteger countdown;

private SharedGrpcFnServer(
Server server,
ServiceT service,
ApiServiceDescriptor apiServiceDescriptor,
AtomicInteger countdown) {
super(server, service, apiServiceDescriptor);
this.countdown = countdown;
}

@Override
public void close() throws Exception {
if (countdown.addAndGet(-1) == 0) {
super.close();
} else {
getService().close();
}
}
}

/**
* Create a {@link GrpcFnServer} for the provided {@link FnService} which will run at the endpoint
* specified in the {@link ApiServiceDescriptor}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.MimeTypes;
Expand All @@ -54,8 +56,11 @@
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.StatusException;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,9 +108,15 @@ public Map<String, List<RunnerApi.ArtifactInformation>> getStagedArtifacts(Strin
return staged.remove(stagingToken);
}

public void removeStagedArtifacts(String stagingToken) throws IOException {
destinationProvider.removeStagedArtifacts(stagingToken);
}

/** Provides a concrete location to which artifacts can be staged on retrieval. */
public interface ArtifactDestinationProvider {
ArtifactDestination getDestination(String stagingToken, String name) throws IOException;

void removeStagedArtifacts(String stagingToken) throws IOException;
}

/**
Expand Down Expand Up @@ -147,12 +158,40 @@ public static ArtifactDestination fromFile(String path, OutputStream out) {
* @param root the directory in which to place all artifacts
*/
public static ArtifactDestinationProvider beamFilesystemArtifactDestinationProvider(String root) {
return (stagingToken, name) -> {
ResourceId path =
FileSystems.matchNewResource(root, true)
.resolve(stagingToken, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
return ArtifactDestination.fromFile(path.toString());
return new ArtifactDestinationProvider() {
@Override
public ArtifactDestination getDestination(String stagingToken, String name)
throws IOException {
ResourceId path =
stagingDir(stagingToken)
.resolve(name, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
return ArtifactDestination.fromFile(path.toString());
}

@Override
public void removeStagedArtifacts(String stagingToken) throws IOException {
// TODO(robertwb): Consider adding recursive delete.
ResourceId stagingDir = stagingDir(stagingToken);
List<ResourceId> toDelete = new ArrayList<>();
for (MatchResult match :
FileSystems.matchResources(
ImmutableList.of(
stagingDir.resolve("*", ResolveOptions.StandardResolveOptions.RESOLVE_FILE)))) {
for (MatchResult.Metadata m : match.metadata()) {
toDelete.add(m.resourceId());
}
}
FileSystems.delete(toDelete, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
FileSystems.delete(
ImmutableList.of(stagingDir), MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}

private ResourceId stagingDir(String stagingToken) {
return FileSystems.matchNewResource(root, true)
.resolve(
Hashing.sha256().hashString(stagingToken, Charsets.UTF_8).toString(),
ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand All @@ -42,6 +43,7 @@
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemLegacyArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.ClassLoaderLegacyArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.LegacyArtifactRetrievalService;
Expand Down Expand Up @@ -234,6 +236,11 @@ public WrappedSdkHarnessClient load(Environment environment) throws Exception {
environmentFactoryProviderMap.get(environment.getUrn());
ServerFactory serverFactory = environmentFactoryProvider.getServerFactory();
ServerInfo serverInfo = serverInfoCreator.apply(serverFactory);
String workerId = stageIdGenerator.getId();
serverInfo
.getProvisioningServer()
.getService()
.registerEnvironment(workerId, environment);
EnvironmentFactory environmentFactory =
environmentFactoryProvider.createEnvironmentFactory(
serverInfo.getControlServer(),
Expand All @@ -243,7 +250,7 @@ public WrappedSdkHarnessClient load(Environment environment) throws Exception {
clientPool,
stageIdGenerator);
return WrappedSdkHarnessClient.wrapping(
environmentFactory.createEnvironment(environment), serverInfo);
environmentFactory.createEnvironment(environment, workerId), serverInfo);
}
});

Expand Down Expand Up @@ -613,6 +620,7 @@ public synchronized void close() {
// These will be closed in the reverse creation order:
try (AutoCloseable envCloser = environment;
AutoCloseable provisioningServer = serverInfo.getProvisioningServer();
AutoCloseable legacyRetrievalServer = serverInfo.getLegacyRetrievalServer();
AutoCloseable retrievalServer = serverInfo.getRetrievalServer();
AutoCloseable stateServer = serverInfo.getStateServer();
AutoCloseable dataServer = serverInfo.getDataServer();
Expand Down Expand Up @@ -653,12 +661,12 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory
PortablePipelineOptions portableOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())
.as(PortablePipelineOptions.class);
LegacyArtifactRetrievalService artifactRetrievalService;
LegacyArtifactRetrievalService legacyArtifactRetrievalService;

if (portableOptions.getRetrievalServiceType() == RetrievalServiceType.CLASSLOADER) {
artifactRetrievalService = new ClassLoaderLegacyArtifactRetrievalService();
legacyArtifactRetrievalService = new ClassLoaderLegacyArtifactRetrievalService();
} else {
artifactRetrievalService = BeamFileSystemLegacyArtifactRetrievalService.create();
legacyArtifactRetrievalService = BeamFileSystemLegacyArtifactRetrievalService.create();
}

GrpcFnServer<FnApiControlClientPoolService> controlServer =
Expand All @@ -669,15 +677,23 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory
GrpcFnServer<GrpcLoggingService> loggingServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
GrpcFnServer<LegacyArtifactRetrievalService> retrievalServer =
GrpcFnServer.allocatePortAndCreateFor(artifactRetrievalService, serverFactory);
List<GrpcFnServer<?>> artifactServers =
GrpcFnServer.allocatePortAndCreateFor(
ImmutableList.of(new ArtifactRetrievalService(), legacyArtifactRetrievalService),
serverFactory);
GrpcFnServer<ArtifactRetrievalService> retrievalServer =
(GrpcFnServer<ArtifactRetrievalService>) artifactServers.get(0);
GrpcFnServer<LegacyArtifactRetrievalService> legacyRetrievalServer =
(GrpcFnServer<LegacyArtifactRetrievalService>) artifactServers.get(1);
ProvisionApi.ProvisionInfo.Builder provisionInfo = jobInfo.toProvisionInfo().toBuilder();
provisionInfo.setLoggingEndpoint(loggingServer.getApiServiceDescriptor());
provisionInfo.setArtifactEndpoint(retrievalServer.getApiServiceDescriptor());
provisionInfo.setArtifactEndpoint(legacyRetrievalServer.getApiServiceDescriptor());
provisionInfo.setControlEndpoint(controlServer.getApiServiceDescriptor());
GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
GrpcFnServer.allocatePortAndCreateFor(
StaticGrpcProvisionService.create(provisionInfo.build()), serverFactory);
StaticGrpcProvisionService.create(
provisionInfo.build(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
serverFactory);
GrpcFnServer<GrpcDataService> dataServer =
GrpcFnServer.allocatePortAndCreateFor(
GrpcDataService.create(
Expand All @@ -691,6 +707,7 @@ private ServerInfo createServerInfo(JobInfo jobInfo, ServerFactory serverFactory
.setControlServer(controlServer)
.setLoggingServer(loggingServer)
.setRetrievalServer(retrievalServer)
.setLegacyRetrievalServer(legacyRetrievalServer)
.setProvisioningServer(provisioningServer)
.setDataServer(dataServer)
.setStateServer(stateServer)
Expand All @@ -705,7 +722,9 @@ public abstract static class ServerInfo {

abstract GrpcFnServer<GrpcLoggingService> getLoggingServer();

abstract GrpcFnServer<LegacyArtifactRetrievalService> getRetrievalServer();
abstract GrpcFnServer<ArtifactRetrievalService> getRetrievalServer();

abstract GrpcFnServer<LegacyArtifactRetrievalService> getLegacyRetrievalServer();

abstract GrpcFnServer<StaticGrpcProvisionService> getProvisioningServer();

Expand All @@ -721,7 +740,10 @@ abstract static class Builder {

abstract Builder setLoggingServer(GrpcFnServer<GrpcLoggingService> server);

abstract Builder setRetrievalServer(GrpcFnServer<LegacyArtifactRetrievalService> server);
abstract Builder setRetrievalServer(GrpcFnServer<ArtifactRetrievalService> server);

abstract Builder setLegacyRetrievalServer(
GrpcFnServer<LegacyArtifactRetrievalService> server);

abstract Builder setProvisioningServer(GrpcFnServer<StaticGrpcProvisionService> server);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private StageBundleFactory createBundleFactory(ExecutableStage stage) {
stage.getEnvironment(),
env -> {
try {
return environmentFactory.createEnvironment(env);
return environmentFactory.createEnvironment(env, idGenerator.getId());
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.LegacyArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
Expand Down Expand Up @@ -86,15 +86,15 @@ private DockerEnvironmentFactory(

/** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */
@Override
public RemoteEnvironment createEnvironment(Environment environment) throws Exception {
public RemoteEnvironment createEnvironment(Environment environment, String workerId)
throws Exception {
Preconditions.checkState(
environment
.getUrn()
.equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)),
"The passed environment does not contain a DockerPayload.");
final RunnerApi.DockerPayload dockerPayload =
RunnerApi.DockerPayload.parseFrom(environment.getPayload());
final String workerId = idGenerator.getId();

// Prepare docker invocation.
String containerImage = dockerPayload.getContainerImage();
Expand Down Expand Up @@ -243,7 +243,7 @@ public Provider(PipelineOptions options) {
public EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<LegacyArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool clientPool,
IdGenerator idGenerator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.LegacyArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.ControlClientPool.Source;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
Expand Down Expand Up @@ -87,14 +87,15 @@ private EmbeddedEnvironmentFactory(

@Override
@SuppressWarnings("FutureReturnValueIgnored") // no need to monitor shutdown thread
public RemoteEnvironment createEnvironment(Environment environment) throws Exception {
public RemoteEnvironment createEnvironment(Environment environment, String workerId)
throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> fnHarness =
executor.submit(
() -> {
try {
FnHarness.main(
"id",
workerId,
options,
loggingServer.getApiServiceDescriptor(),
controlServer.getApiServiceDescriptor(),
Expand Down Expand Up @@ -157,7 +158,7 @@ public Provider(PipelineOptions pipelineOptions) {
public EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServer,
GrpcFnServer<GrpcLoggingService> loggingServer,
GrpcFnServer<LegacyArtifactRetrievalService> retrievalServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServer,
ControlClientPool clientPool,
IdGenerator idGenerator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.LegacyArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
Expand All @@ -32,7 +32,8 @@
/** Creates {@link Environment environments} which communicate to an {@link SdkHarnessClient}. */
public interface EnvironmentFactory {
/** Creates an active {@link Environment} and returns a handle to it. */
RemoteEnvironment createEnvironment(RunnerApi.Environment environment) throws Exception;
RemoteEnvironment createEnvironment(RunnerApi.Environment environment, String workerId)
throws Exception;

/** Provider for a {@link EnvironmentFactory} and {@link ServerFactory} for the environment. */
interface Provider {
Expand All @@ -41,7 +42,7 @@ interface Provider {
EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<LegacyArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool clientPool,
IdGenerator idGenerator);
Expand Down
Loading

0 comments on commit 3584015

Please sign in to comment.