Skip to content

Commit

Permalink
This closes apache#2276
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed Mar 23, 2017
2 parents 890bc1a + 28a840d commit 87c8ef0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GroupedValues;
Expand Down Expand Up @@ -160,9 +161,6 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
/** Translator for this DataflowRunner, based on options. */
private final DataflowPipelineTranslator translator;

/** Custom transforms implementations. */
private final ImmutableMap<PTransformMatcher, PTransformOverrideFactory> overrides;

/** A set of user defined functions to invoke at different points in execution. */
private DataflowRunnerHooks hooks;

Expand Down Expand Up @@ -289,13 +287,15 @@ public static DataflowRunner fromOptions(PipelineOptions options) {
this.translator = DataflowPipelineTranslator.fromOptions(options);
this.pcollectionsRequiringIndexedFormat = new HashSet<>();
this.ptransformViewsWithNonDeterministicKeyCoders = new HashSet<>();
}

private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides(boolean streaming) {
ImmutableMap.Builder<PTransformMatcher, PTransformOverrideFactory> ptoverrides =
ImmutableMap.builder();
// Create is implemented in terms of a Read, so it must precede the override to Read in
// streaming
ptoverrides.put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
if (options.isStreaming()) {
if (streaming) {
// In streaming mode must use either the custom Pubsub unbounded source/sink or
// defer to Windmill's built-in implementation.
for (Class<? extends DoFn> unsupported :
Expand Down Expand Up @@ -334,10 +334,6 @@ public static DataflowRunner fromOptions(PipelineOptions options) {
PTransformMatchers.classEqualTo(unsupported),
UnsupportedOverrideFactory.withMessage(getUnsupportedMessage(unsupported, false)));
}
ptoverrides.put(
PTransformMatchers.classEqualTo(Read.Unbounded.class),
UnsupportedOverrideFactory.withMessage(
"The DataflowRunner in batch mode does not support Read.Unbounded"));
ptoverrides
// State and timer pardos are implemented by expansion to GBK-then-ParDo
.put(PTransformMatchers.stateOrTimerParDoMulti(),
Expand Down Expand Up @@ -372,7 +368,7 @@ public static DataflowRunner fromOptions(PipelineOptions options) {
PTransformMatchers.classEqualTo(Combine.GroupedValues.class),
new PrimitiveCombineGroupedValuesOverrideFactory())
.put(PTransformMatchers.classEqualTo(ParDo.Bound.class), new PrimitiveParDoSingleFactory());
overrides = ptoverrides.build();
return ptoverrides.build();
}

private String getUnsupportedMessage(Class<?> unsupported, boolean streaming) {
Expand Down Expand Up @@ -485,6 +481,9 @@ private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifie
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
if (containsUnboundedPCollection(pipeline)) {
options.setStreaming(true);
}
replaceTransforms(pipeline);

LOG.info("Executing pipeline on the Dataflow Service, which will have billing implications "
Expand Down Expand Up @@ -674,11 +673,29 @@ private static Map<String, Object> getEnvironmentVersion(DataflowPipelineOptions

@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override : overrides.entrySet()) {
boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
getOverrides(streaming).entrySet()) {
pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
}
}

private boolean containsUnboundedPCollection(Pipeline p) {
class BoundednessVisitor extends PipelineVisitor.Defaults {
IsBounded boundedness = IsBounded.BOUNDED;

@Override
public void visitValue(PValue value, Node producer) {
if (value instanceof PCollection) {
boundedness = boundedness.and(((PCollection) value).isBounded());
}
}
}
BoundednessVisitor visitor = new BoundednessVisitor();
p.traverseTopologically(visitor);
return visitor.boundedness == IsBounded.UNBOUNDED;
};

/**
* Returns the DataflowPipelineTranslator associated with this object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,12 @@
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand All @@ -82,7 +80,6 @@
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.hamcrest.Description;
Expand Down Expand Up @@ -1049,35 +1046,6 @@ public void testToString() {
DataflowRunner.fromOptions(options).toString());
}

private static PipelineOptions makeOptions(boolean streaming) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(streaming);
options.setJobName("TestJobName");
options.setProject("test-project");
options.setTempLocation("gs:https://test/temp/location");
options.setGcpCredential(new TestCredential());
options.setPathValidatorClass(NoopPathValidator.class);
return options;
}

private void testUnsupportedSource(PTransform<PBegin, ?> source, String name, boolean streaming)
throws Exception {
String mode = streaming ? "streaming" : "batch";
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage(
"The DataflowRunner in " + mode + " mode does not support " + name);

Pipeline p = Pipeline.create(makeOptions(streaming));
p.apply(source);
p.run();
}

@Test
public void testReadUnboundedUnsupportedInBatch() throws Exception {
testUnsupportedSource(Read.from(new TestCountingSource(1)), "Read.Unbounded", false);
}

/**
* Tests that the {@link DataflowRunner} with {@code --templateLocation} returns normally
* when the runner issuccessfully run.
Expand Down

0 comments on commit 87c8ef0

Please sign in to comment.