Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-10444] Simpler PAssert for simpler cases. #12218

Merged
merged 3 commits into from
Jul 11, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
Expand All @@ -73,6 +74,7 @@
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
Expand Down Expand Up @@ -1153,7 +1155,8 @@ public Void apply(T actual) {
/**
* A transform that gathers the contents of a {@link PCollection} into a single main input
* iterable in the global window. This requires a runner to support {@link GroupByKey} in the
* global window, but not side inputs or other windowing or triggers.
* global window, but not side inputs or other windowing or triggers unless the input is
* non-trivially windowed or triggered.
*
* <p>If the {@link PCollection} is empty, this transform returns a {@link PCollection} containing
* a single empty iterable, even though in practice most runners will not produce any element.
Expand All @@ -1171,6 +1174,41 @@ public GroupGlobally(AssertionWindows rewindowingStrategy) {
public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input) {
final int combinedKey = 42;

if (input.getWindowingStrategy().equals(WindowingStrategy.globalDefault())
&& rewindowingStrategy instanceof IntoGlobalWindow) {
// If we don't have to worry about complicated triggering semantics we can generate
// a much simpler pipeline. This is particularly useful for bootstrapping runners so that
// we can run subsets of the validates runner test suite requiring support of only the
// most basic primitives.

// In order to ensure we actually get an (empty) iterable rather than an empty PCollection
// when the input is an empty PCollection, we flatten with a dummy PCollection containing
// an empty iterable before grouping on a singleton key and concatenating.
PCollection<Iterable<ValueInSingleWindow<T>>> actual =
input.apply(Reify.windows()).apply(ParDo.of(new ToSingletonIterables<>()));
PCollection<Iterable<ValueInSingleWindow<T>>> dummy =
input
.getPipeline()
.apply(
Create.<Iterable<ValueInSingleWindow<T>>>of(
ImmutableList.of(ImmutableList.of()))
.withCoder(actual.getCoder()));
return PCollectionList.of(dummy)
.and(actual)
.apply(Flatten.pCollections())
.apply(
// Default end-of-window trigger disallowed for unbounded PCollections.
input.isBounded() == PCollection.IsBounded.UNBOUNDED
? Window.<Iterable<ValueInSingleWindow<T>>>configure()
.triggering(Never.ever())
.discardingFiredPanes()
: Window.<Iterable<ValueInSingleWindow<T>>>configure())
.apply(WithKeys.of(combinedKey))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(ParDo.of(new ConcatFn<>()));
}

// Remove the triggering on both
PTransform<
PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>,
Expand Down Expand Up @@ -1223,9 +1261,16 @@ public PCollection<Iterable<ValueInSingleWindow<T>>> expand(PCollection<T> input
}
}

private static final class ToSingletonIterables<T> extends DoFn<T, Iterable<T>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(ImmutableList.of(c.element()));
}
}

private static final class ConcatFn<T> extends DoFn<Iterable<Iterable<T>>, Iterable<T>> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
public void processElement(ProcessContext c) {
c.output(Iterables.concat(c.element()));
}
}
Expand Down