diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index e80205a75b305..29defb49a5e59 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reify; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.Values; @@ -73,6 +74,7 @@ import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.joda.time.Duration; @@ -1153,7 +1155,8 @@ public Void apply(T actual) { /** * A transform that gathers the contents of a {@link PCollection} into a single main input * iterable in the global window. This requires a runner to support {@link GroupByKey} in the - * global window, but not side inputs or other windowing or triggers. + * global window, but not side inputs or other windowing or triggers unless the input is + * non-trivially windowed or triggered. * *

If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing * a single empty iterable, even though in practice most runners will not produce any element. @@ -1171,6 +1174,41 @@ public GroupGlobally(AssertionWindows rewindowingStrategy) { public PCollection>> expand(PCollection input) { final int combinedKey = 42; + if (input.getWindowingStrategy().equals(WindowingStrategy.globalDefault()) + && rewindowingStrategy instanceof IntoGlobalWindow) { + // If we don't have to worry about complicated triggering semantics we can generate + // a much simpler pipeline. This is particularly useful for bootstrapping runners so that + // we can run subsets of the validates runner test suite requiring support of only the + // most basic primitives. + + // In order to ensure we actually get an (empty) iterable rather than an empty PCollection + // when the input is an empty PCollection, we flatten with a dummy PCollection containing + // an empty iterable before grouping on a singleton key and concatenating. + PCollection>> actual = + input.apply(Reify.windows()).apply(ParDo.of(new ToSingletonIterables<>())); + PCollection>> dummy = + input + .getPipeline() + .apply( + Create.>>of( + ImmutableList.of(ImmutableList.of())) + .withCoder(actual.getCoder())); + return PCollectionList.of(dummy) + .and(actual) + .apply(Flatten.pCollections()) + .apply( + // Default end-of-window trigger disallowed for unbounded PCollections. + input.isBounded() == PCollection.IsBounded.UNBOUNDED + ? Window.>>configure() + .triggering(Never.ever()) + .discardingFiredPanes() + : Window.>>configure()) + .apply(WithKeys.of(combinedKey)) + .apply(GroupByKey.create()) + .apply(Values.create()) + .apply(ParDo.of(new ConcatFn<>())); + } + // Remove the triggering on both PTransform< PCollection>>>, @@ -1223,9 +1261,16 @@ public PCollection>> expand(PCollection input } } + private static final class ToSingletonIterables extends DoFn> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(ImmutableList.of(c.element())); + } + } + private static final class ConcatFn extends DoFn>, Iterable> { @ProcessElement - public void processElement(ProcessContext c) throws Exception { + public void processElement(ProcessContext c) { c.output(Iterables.concat(c.element())); } }