Skip to content

Commit

Permalink
Merge pull request apache#10883: [BEAM-9331] Add better Row builders
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Mar 27, 2020
1 parent c835ab7 commit 267f76f
Show file tree
Hide file tree
Showing 14 changed files with 1,029 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ static Row decodeDelegate(Schema schema, Coder[] coders, InputStream inputStream
// all values. Since we assume that decode is always being called on a previously-encoded
// Row, the values should already be validated and of the correct type. So, we can save
// some processing by simply transferring ownership of the list to the Row.
return Row.withSchema(schema).attachValues(fieldValues).build();
return Row.withSchema(schema).attachValues(fieldValues);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -610,12 +610,12 @@ private Schema getFieldDescriptorSchema(FieldDescriptor fieldDescriptor, Schema
private static Schema getFieldSchema(FieldType type) {
if (TypeName.ROW.equals(type.getTypeName())) {
return type.getRowSchema();
} else if (type.getTypeName().isCollectionType()
&& TypeName.ROW.equals(type.getCollectionElementType().getTypeName())) {
return type.getCollectionElementType().getRowSchema();
} else if (TypeName.MAP.equals(type.getTypeName())
&& TypeName.ROW.equals(type.getMapValueType().getTypeName())) {
return type.getMapValueType().getRowSchema();
} else if (type.getTypeName().isCollectionType()) {
return getFieldSchema(type.getCollectionElementType());
} else if (TypeName.MAP.equals(type.getTypeName())) {
return getFieldSchema(type.getMapValueType());
} else if (TypeName.LOGICAL_TYPE.equals(type.getTypeName())) {
return getFieldSchema(type.getLogicalType().getBaseType());
} else {
throw new IllegalArgumentException(
"FieldType " + type + " must be either a row or a container containing rows");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public ToRowWithValueGetters(Schema schema) {

@Override
public Row apply(T input) {
return Row.withSchema(schema).withFieldValueGetters(getterFactory, input).build();
return Row.withSchema(schema).withFieldValueGetters(getterFactory, input);
}

private GetterBasedSchemaProvider getOuter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private static Row fillNewFields(Row row, AddFieldsInformation addFieldsInformat
}
}

return Row.withSchema(outputSchema).attachValues(newValues).build();
return Row.withSchema(outputSchema).attachValues(newValues);
}

private static Object fillNewFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ void outputUnexpandedRow(Schema outputSchema, OutputReceiver<Row> o) {
List<Object> fields = Lists.newArrayListWithCapacity(getIterables().size() + 1);
fields.add(getKey());
fields.addAll(getIterables());
o.output(Row.withSchema(outputSchema).attachValues(fields).build());
o.output(Row.withSchema(outputSchema).attachValues(fields));
}

static void verifyExpandedArgs(JoinInformation joinInformation, JoinArguments joinArgs) {
Expand Down Expand Up @@ -632,9 +632,7 @@ private void crossProductHelper(
if (atBottom) {
// Bottom of recursive call, so output the row we've accumulated.
Row row =
Row.withSchema(getOutputSchema())
.attachValues(Lists.newArrayList(accumulatedRows))
.build();
Row.withSchema(getOutputSchema()).attachValues(Lists.newArrayList(accumulatedRows));
o.output(row);
} else {
crossProduct(tagIndex + 1, accumulatedRows, iterables, o);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,7 @@ public PCollection<Row> expand(PCollection<InputT> input) {
public void process(@Element KV<Row, Iterable<Row>> e, OutputReceiver<Row> o) {
o.output(
Row.withSchema(outputSchema)
.attachValues(Lists.newArrayList(e.getKey(), e.getValue()))
.build());
.attachValues(Lists.newArrayList(e.getKey(), e.getValue())));
}
}))
.setRowSchema(outputSchema);
Expand Down Expand Up @@ -1140,8 +1139,7 @@ public void process(@Element KV<Row, Row> element, OutputReceiver<Row> o) {
o.output(
Row.withSchema(outputSchema)
.attachValues(
Lists.newArrayList(element.getKey(), element.getValue()))
.build());
Lists.newArrayList(element.getKey(), element.getValue())));
}
}))
.setRowSchema(outputSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public PCollection<Row> expand(PCollection<T> input) {
new DoFn<T, Row>() {
@ProcessElement
public void processElement(@Element Row row, OutputReceiver<Row> o) {
o.output(Row.withSchema(outputSchema).attachValues(row.getValues()).build());
o.output(Row.withSchema(outputSchema).attachValues(row.getValues()));
}
}))
.setRowSchema(outputSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,6 @@ public ByteCodeAppender appender(final Target implementationTarget) {
ElementMatchers.named("attachValues")
.and(ElementMatchers.takesArguments(Object[].class)))
.getOnly()),
MethodInvocation.invoke(
new ForLoadedType(Row.Builder.class)
.getDeclaredMethods()
.filter(ElementMatchers.named("build"))
.getOnly()),
MethodReturn.REFERENCE);
size = size.aggregate(attachToRow.apply(methodVisitor, implementationContext));
return new Size(size.getMaximalSize(), localVariables.getTotalNumVariables());
Expand Down
Loading

0 comments on commit 267f76f

Please sign in to comment.