Skip to content

Commit

Permalink
This closes apache#2297
Browse files Browse the repository at this point in the history
  • Loading branch information
dhalperi committed Mar 23, 2017
2 parents 75b6567 + b650897 commit 5e1be9f
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 58 deletions.
3 changes: 2 additions & 1 deletion runners/google-cloud-dataflow-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

<properties>
<dataflow.container_version>beam-master-20170314</dataflow.container_version>
<dataflow.environment_major_version>6</dataflow.environment_major_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
</properties>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ public Job translate(List<DataflowPackage> packages) {
workerPool.setNumWorkers(options.getNumWorkers());

if (options.isStreaming()
&& (options.getExperiments() == null
|| !options.getExperiments().contains("enable_windmill_service"))) {
&& !DataflowRunner.hasExperiment(options, "enable_windmill_service")) {
// Use separate data disk for streaming.
Disk disk = new Disk();
disk.setDiskType(options.getWorkerDiskType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -305,14 +304,12 @@ public static DataflowRunner fromOptions(PipelineOptions options) {
PTransformMatchers.parDoWithFnType(unsupported),
UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, true)));
}
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_source")) {
if (!hasExperiment(options, "enable_custom_pubsub_source")) {
ptoverrides.put(
PTransformMatchers.classEqualTo(PubsubUnboundedSource.class),
new ReflectiveRootOverrideFactory(StreamingPubsubIORead.class, this));
}
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_pubsub_sink")) {
if (!hasExperiment(options, "enable_custom_pubsub_sink")) {
ptoverrides.put(
PTransformMatchers.classEqualTo(PubsubUnboundedSink.class),
new StreamingPubsubIOWriteOverrideFactory(this));
Expand Down Expand Up @@ -543,20 +540,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
workerPool.setWorkerHarnessContainerImage(workerHarnessContainerImage);
}

// Requirements about the service.
Map<String, Object> environmentVersion = new HashMap<>();
environmentVersion.put(
PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY,
DataflowRunnerInfo.getDataflowRunnerInfo().getEnvironmentMajorVersion());
newJob.getEnvironment().setVersion(environmentVersion);
// Default jobType is JAVA_BATCH_AUTOSCALING: A Java job with workers that the job can
// autoscale if specified.
String jobType = "JAVA_BATCH_AUTOSCALING";

if (options.isStreaming()) {
jobType = "STREAMING";
}
environmentVersion.put(PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType);
newJob.getEnvironment().setVersion(getEnvironmentVersion(options));

if (hooks != null) {
hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
Expand Down Expand Up @@ -664,6 +648,30 @@ public DataflowPipelineJob run(Pipeline pipeline) {
return dataflowPipelineJob;
}

/** Returns true if the specified experiment is enabled, handling null experiments. */
public static boolean hasExperiment(DataflowPipelineDebugOptions options, String experiment) {
List<String> experiments =
firstNonNull(options.getExperiments(), Collections.<String>emptyList());
return experiments.contains(experiment);
}

/** Helper to configure the Dataflow Job Environment based on the user's job options. */
private static Map<String, Object> getEnvironmentVersion(DataflowPipelineOptions options) {
DataflowRunnerInfo runnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
String majorVersion;
String jobType;
if (hasExperiment(options, "beam_fn_api")) {
majorVersion = runnerInfo.getFnApiEnvironmentMajorVersion();
jobType = options.isStreaming() ? "FNAPI_STREAMING" : "FNAPI_BATCH";
} else {
majorVersion = runnerInfo.getLegacyEnvironmentMajorVersion();
jobType = options.isStreaming() ? "STREAMING" : "JAVA_BATCH_AUTOSCALING";
}
return ImmutableMap.<String, Object>of(
PropertyNames.ENVIRONMENT_VERSION_MAJOR_KEY, majorVersion,
PropertyNames.ENVIRONMENT_VERSION_JOB_TYPE_KEY, jobType);
}

@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,32 +47,34 @@ public static DataflowRunnerInfo getDataflowRunnerInfo() {

private Properties properties;

private static final String ENVIRONMENT_MAJOR_VERSION_KEY = "environment.major.version";
private static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY = "worker.image.batch";
private static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY =
"worker.image.streaming";
private static final String FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY =
"fnapi.environment.major.version";
private static final String LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY =
"legacy.environment.major.version";
private static final String CONTAINER_VERSION_KEY = "container.version";

/** Provides the environment's major version number. */
public String getEnvironmentMajorVersion() {
/** Provides the legacy environment's major version number. */
public String getLegacyEnvironmentMajorVersion() {
checkState(
properties.containsKey(ENVIRONMENT_MAJOR_VERSION_KEY), "Unknown environment major version");
return properties.getProperty(ENVIRONMENT_MAJOR_VERSION_KEY);
properties.containsKey(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY),
"Unknown legacy environment major version");
return properties.getProperty(LEGACY_ENVIRONMENT_MAJOR_VERSION_KEY);
}

/** Provides the batch worker harness container image name. */
public String getBatchWorkerHarnessContainerImage() {
/** Provides the FnAPI environment's major version number. */
public String getFnApiEnvironmentMajorVersion() {
checkState(
properties.containsKey(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY),
"Unknown batch worker harness container image");
return properties.getProperty(BATCH_WORKER_HARNESS_CONTAINER_IMAGE_KEY);
properties.containsKey(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY),
"Unknown FnAPI environment major version");
return properties.getProperty(FNAPI_ENVIRONMENT_MAJOR_VERSION_KEY);
}

/** Provides the streaming worker harness container image name. */
public String getStreamingWorkerHarnessContainerImage() {
/** Provides the container version that will be used for constructing harness image paths. */
public String getContainerVersion() {
checkState(
properties.containsKey(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY),
"Unknown streaming worker harness container image");
return properties.getProperty(STREAMING_WORKER_HARNESS_CONTAINER_IMAGE_KEY);
properties.containsKey(CONTAINER_VERSION_KEY),
"Unknown container version");
return properties.getProperty(CONTAINER_VERSION_KEY);
}

private DataflowRunnerInfo(String resourcePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.services.dataflow.Dataflow;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.util.DataflowTransport;
import org.apache.beam.runners.dataflow.util.GcsStager;
import org.apache.beam.runners.dataflow.util.Stager;
Expand Down Expand Up @@ -53,6 +54,7 @@ public interface DataflowPipelineDebugOptions extends PipelineOptions {
+ "be enabled with this flag. Please sync with the Dataflow team before enabling any "
+ "experiments.")
@Experimental
@Nullable
List<String> getExperiments();
void setExperiments(List<String> value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -129,11 +130,14 @@ class WorkerHarnessContainerImageFactory
@Override
public String create(PipelineOptions options) {
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (dataflowOptions.isStreaming()) {
return DataflowRunnerInfo.getDataflowRunnerInfo().getStreamingWorkerHarnessContainerImage();
String containerVersion = DataflowRunnerInfo.getDataflowRunnerInfo().getContainerVersion();
String containerType;
if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api")) {
containerType = "java";
} else {
return DataflowRunnerInfo.getDataflowRunnerInfo().getBatchWorkerHarnessContainerImage();
containerType = dataflowOptions.isStreaming() ? "beam-java-streaming" : "beam-java-batch";
}
return String.format("dataflow.gcr.io/v1beta3/%s:%s", containerType, containerVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
#
# Dataflow runtime properties

environment.major.version=${dataflow.environment_major_version}

worker.image.batch=dataflow.gcr.io/v1beta3/beam-java-batch:${dataflow.container_version}

worker.image.streaming=dataflow.gcr.io/v1beta3/beam-java-streaming:${dataflow.container_version}
legacy.environment.major.version=${dataflow.legacy_environment_major_version}
fnapi.environment.major.version=${dataflow.fnapi_environment_major_version}
container.version=${dataflow.container_version}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

Expand All @@ -32,20 +33,22 @@ public class DataflowRunnerInfoTest {
public void getDataflowRunnerInfo() throws Exception {
DataflowRunnerInfo info = DataflowRunnerInfo.getDataflowRunnerInfo();

String version = info.getEnvironmentMajorVersion();
String version = info.getLegacyEnvironmentMajorVersion();
// Validate major version is a number
assertTrue(
String.format("Environment major version number %s is not a number", version),
String.format("Legacy environment major version number %s is not a number", version),
version.matches("\\d+"));

// Validate container images contain gcr.io
assertThat(
"batch worker harness container image invalid",
info.getBatchWorkerHarnessContainerImage(),
containsString("gcr.io"));
version = info.getFnApiEnvironmentMajorVersion();
// Validate major version is a number
assertTrue(
String.format("FnAPI environment major version number %s is not a number", version),
version.matches("\\d+"));

// Validate container version does not contain a $ (indicating it was not filled in).
assertThat(
"streaming worker harness container image invalid",
info.getStreamingWorkerHarnessContainerImage(),
containsString("gcr.io"));
"container version invalid",
info.getContainerVersion(),
not(containsString("$")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -1118,4 +1119,20 @@ public void testTemplateRunnerLoggedErrorForFile() throws Exception {
thrown.expect(RuntimeException.class);
p.run();
}

@Test
public void testHasExperiment() {
DataflowPipelineDebugOptions options =
PipelineOptionsFactory.as(DataflowPipelineDebugOptions.class);

options.setExperiments(null);
assertFalse(DataflowRunner.hasExperiment(options, "foo"));

options.setExperiments(ImmutableList.of("foo", "bar"));
assertTrue(DataflowRunner.hasExperiment(options, "foo"));
assertTrue(DataflowRunner.hasExperiment(options, "bar"));
assertFalse(DataflowRunner.hasExperiment(options, "baz"));
assertFalse(DataflowRunner.hasExperiment(options, "ba"));
assertFalse(DataflowRunner.hasExperiment(options, "BAR"));
}
}

0 comments on commit 5e1be9f

Please sign in to comment.