Skip to content

Commit

Permalink
Merge pull request apache#11646 from CraigChambersG/flatten-test
Browse files Browse the repository at this point in the history
[BEAM-9941] Add a BeamJava test with Flatten with different input and output Coders
  • Loading branch information
lukecwik committed May 9, 2020
2 parents da4a1b2 + 5312f74 commit e763824
Showing 1 changed file with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CollectionCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
Expand All @@ -54,12 +56,14 @@
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.junit.Assert;
Expand Down Expand Up @@ -347,6 +351,33 @@ public void processElement(ProcessContext c) {
p.run();
}

@Test
@Category(ValidatesRunner.class)
public void testFlattenWithDifferentInputAndOutputCoders() {
// This test exists to prevent a regression in Dataflow. It is important
// that Flatten is followed by a GroupByKey.
PCollection<KV<String, String>> output = p.apply(Create.of(LINES)).apply(WithKeys.of("a"));
output.setCoder(SerializableCoder.of(new TypeDescriptor<KV<String, String>>() {}));

PCollection<KV<String, String>> output2 =
PCollectionList.of(output).apply(Flatten.pCollections());
output2.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
PCollection<String> output3 =
output2
.apply(GroupByKey.create())
.apply(Values.create())
.apply(
FlatMapElements.via(
new SimpleFunction<Iterable<String>, Iterable<String>>() {
@Override
public Iterable<String> apply(Iterable<String> input) {
return input;
}
}));
PAssert.that(output3).containsInAnyOrder(LINES);
p.run();
}

/////////////////////////////////////////////////////////////////////////////

@Test
Expand Down

0 comments on commit e763824

Please sign in to comment.