diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index e80205a75b305..29defb49a5e59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -50,6 +50,7 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
@@ -73,6 +74,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
@@ -1153,7 +1155,8 @@ public Void apply(T actual) {
/**
* A transform that gathers the contents of a {@link PCollection} into a single main input
* iterable in the global window. This requires a runner to support {@link GroupByKey} in the
- * global window, but not side inputs or other windowing or triggers.
+ * global window, but not side inputs or other windowing or triggers unless the input is
+ * non-trivially windowed or triggered.
*
*
If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing
* a single empty iterable, even though in practice most runners will not produce any element.
@@ -1171,6 +1174,41 @@ public GroupGlobally(AssertionWindows rewindowingStrategy) {
public PCollection>> expand(PCollection input) {
final int combinedKey = 42;
+ if (input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())
+ && rewindowingStrategy instanceof IntoGlobalWindow) {
+ // If we don't have to worry about complicated triggering semantics we can generate
+ // a much simpler pipeline. This is particularly useful for bootstrapping runners so that
+ // we can run subsets of the validates runner test suite requiring support of only the
+ // most basic primitives.
+
+ // In order to ensure we actually get an (empty) iterable rather than an empty PCollection
+ // when the input is an empty PCollection, we flatten with a dummy PCollection containing
+ // an empty iterable before grouping on a singleton key and concatenating.
+ PCollection>> actual =
+ input.apply(Reify.windows()).apply(ParDo.of(new ToSingletonIterables<>()));
+ PCollection>> dummy =
+ input
+ .getPipeline()
+ .apply(
+ Create.>>of(
+ ImmutableList.of(ImmutableList.of()))
+ .withCoder(actual.getCoder()));
+ return PCollectionList.of(dummy)
+ .and(actual)
+ .apply(Flatten.pCollections())
+ .apply(
+ // Default end-of-window trigger disallowed for unbounded PCollections.
+ input.isBounded() == PCollection.IsBounded.UNBOUNDED
+ ? Window.>>configure()
+ .triggering(Never.ever())
+ .discardingFiredPanes()
+ : Window.>>configure())
+ .apply(WithKeys.of(combinedKey))
+ .apply(GroupByKey.create())
+ .apply(Values.create())
+ .apply(ParDo.of(new ConcatFn<>()));
+ }
+
// Remove the triggering on both
PTransform<
PCollection>>>,
@@ -1223,9 +1261,16 @@ public PCollection>> expand(PCollection input
}
}
+ private static final class ToSingletonIterables extends DoFn> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(ImmutableList.of(c.element()));
+ }
+ }
+
private static final class ConcatFn extends DoFn>, Iterable> {
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
+ public void processElement(ProcessContext c) {
c.output(Iterables.concat(c.element()));
}
}