Skip to content

Commit

Permalink
Merge pull request apache#10362 : [BEAM-8955] Attach values so we don…
Browse files Browse the repository at this point in the history
…'t iterate over everything to verify types
  • Loading branch information
reuvenlax committed Dec 12, 2019
2 parents 43cf74f + a715f15 commit b0c1d8c
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
2 changes: 0 additions & 2 deletions runners/spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ task validatesRunnerBatch(type: Test) {
// Portability
excludeCategories 'org.apache.beam.sdk.testing.UsesImpulse'
excludeCategories 'org.apache.beam.sdk.testing.UsesCrossLanguageTransforms'
// TODO(BEAM-8955): Figure out why AvroSchemaTest.testAvroPipelineGroupBy is broken for Spark
exclude '**/AvroSchemaTest.class'
}
jvmArgs '-Xmx3g'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,13 +534,14 @@ public Row next() {
public void process(@Element KV<Row, CoGbkResult> kv, OutputReceiver<Row> o) {
Row key = kv.getKey();
CoGbkResult result = kv.getValue();
List<Object> fields = Lists.newArrayListWithCapacity(sortedTags.size());
List<Object> fields = Lists.newArrayListWithCapacity(sortedTags.size() + 1);
fields.add(key);
for (int i = 0; i < sortedTags.size(); ++i) {
String tupleTag = tagToKeyedTag.get(i);
SerializableFunction<Object, Row> toRow = toRows.get(i);
fields.add(new Result(result.getAll(tupleTag), toRow));
}
Row row = Row.withSchema(outputSchema).addValue(key).addValues(fields).build();
Row row = Row.withSchema(outputSchema).attachValues(fields).build();
o.output(row);
}
}
Expand Down Expand Up @@ -681,7 +682,7 @@ private void crossProductHelper(
}

private Row buildOutputRow(List rows) {
return Row.withSchema(outputSchema).addValues(rows).build();
return Row.withSchema(outputSchema).attachValues(Lists.newArrayList(rows)).build();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/**
* A generic grouping transform for schema {@link PCollection}s.
Expand Down Expand Up @@ -707,8 +708,7 @@ public PCollection<Row> expand(PCollection<InputT> input) {
public void process(@Element KV<Row, Iterable<Row>> e, OutputReceiver<Row> o) {
o.output(
Row.withSchema(outputSchema)
.addValue(e.getKey())
.addIterable(e.getValue())
.attachValues(Lists.newArrayList(e.getKey(), e.getValue()))
.build());
}
}))
Expand Down Expand Up @@ -929,7 +929,8 @@ public PCollection<Row> expand(PCollection<InputT> input) {
public void process(@Element KV<Row, Row> element, OutputReceiver<Row> o) {
o.output(
Row.withSchema(outputSchema)
.addValues(element.getKey(), element.getValue())
.attachValues(
Lists.newArrayList(element.getKey(), element.getValue()))
.build());
}
}))
Expand Down

0 comments on commit b0c1d8c

Please sign in to comment.