From a715f15d7575165dbd239ed9c9f692fc29c47cc1 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 11 Dec 2019 17:58:03 -0800 Subject: [PATCH] attach values so we don't iterate over everything to verify types --- runners/spark/build.gradle | 2 -- .../org/apache/beam/sdk/schemas/transforms/CoGroup.java | 7 ++++--- .../java/org/apache/beam/sdk/schemas/transforms/Group.java | 7 ++++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/runners/spark/build.gradle b/runners/spark/build.gradle index e4367d3d522ba..80130bc72fead 100644 --- a/runners/spark/build.gradle +++ b/runners/spark/build.gradle @@ -142,8 +142,6 @@ task validatesRunnerBatch(type: Test) { // Portability excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse' excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms' - // TODO(BEAM-8955): Figure out why AvroSchemaTest.testAvroPipelineGroupBy is broken for Spark - exclude '**/AvroSchemaTest.class' } jvmArgs '-Xmx3g' } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java index 9400f9856f2fb..163140827e2eb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/CoGroup.java @@ -534,13 +534,14 @@ public Row next() { public void process(@Element KV kv, OutputReceiver o) { Row key = kv.getKey(); CoGbkResult result = kv.getValue(); - List fields = Lists.newArrayListWithCapacity(sortedTags.size()); + List fields = Lists.newArrayListWithCapacity(sortedTags.size() + 1); + fields.add(key); for (int i = 0; i < sortedTags.size(); ++i) { String tupleTag = tagToKeyedTag.get(i); SerializableFunction toRow = toRows.get(i); fields.add(new Result(result.getAll(tupleTag), toRow)); } - Row row = Row.withSchema(outputSchema).addValue(key).addValues(fields).build(); + Row row = Row.withSchema(outputSchema).attachValues(fields).build(); o.output(row); } } @@ -681,7 +682,7 @@ private void crossProductHelper( } private Row buildOutputRow(List rows) { - return Row.withSchema(outputSchema).addValues(rows).build(); + return Row.withSchema(outputSchema).attachValues(Lists.newArrayList(rows)).build(); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java index 27cd654f56268..f0e891c72bcb6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; /** * A generic grouping transform for schema {@link PCollection}s. @@ -707,8 +708,7 @@ public PCollection expand(PCollection input) { public void process(@Element KV> e, OutputReceiver o) { o.output( Row.withSchema(outputSchema) - .addValue(e.getKey()) - .addIterable(e.getValue()) + .attachValues(Lists.newArrayList(e.getKey(), e.getValue())) .build()); } })) @@ -929,7 +929,8 @@ public PCollection expand(PCollection input) { public void process(@Element KV element, OutputReceiver o) { o.output( Row.withSchema(outputSchema) - .addValues(element.getKey(), element.getValue()) + .attachValues( + Lists.newArrayList(element.getKey(), element.getValue())) .build()); } }))