-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use attachValues in SQL #11259
Use attachValues in SQL #11259
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
import java.util.stream.Collectors; | ||
import javax.annotation.Nullable; | ||
import org.apache.beam.sdk.annotations.Internal; | ||
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; | ||
import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel; | ||
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; | ||
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect; | ||
|
@@ -53,6 +54,7 @@ | |
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos; | ||
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; | ||
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; | ||
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; | ||
|
||
/** | ||
* BeamRelNode to replace {@code Project} and {@code Filter} node based on the {@code ZetaSQL} | ||
|
@@ -101,13 +103,16 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) { | |
.collect(Collectors.toList()); | ||
final RexNode condition = getProgram().getCondition(); | ||
|
||
boolean verifyRowValues = | ||
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues(); | ||
Schema outputSchema = CalciteUtils.toSchema(getRowType()); | ||
CalcFn calcFn = | ||
new CalcFn( | ||
projects, | ||
condition == null ? null : unparseRexNode(condition), | ||
upstream.getSchema(), | ||
outputSchema); | ||
outputSchema, | ||
verifyRowValues); | ||
|
||
// validate prepared expressions | ||
calcFn.setup(); | ||
|
@@ -129,19 +134,22 @@ private static class CalcFn extends DoFn<Row, Row> { | |
@Nullable private final String condition; | ||
private final Schema inputSchema; | ||
private final Schema outputSchema; | ||
private final boolean verifyRowValues; | ||
private transient List<PreparedExpression> projectExps; | ||
@Nullable private transient PreparedExpression conditionExp; | ||
|
||
CalcFn( | ||
List<String> projects, | ||
@Nullable String condition, | ||
Schema inputSchema, | ||
Schema outputSchema) { | ||
Schema outputSchema, | ||
boolean verifyRowValues) { | ||
Preconditions.checkArgument(projects.size() == outputSchema.getFieldCount()); | ||
this.projects = ImmutableList.copyOf(projects); | ||
this.condition = condition; | ||
this.inputSchema = inputSchema; | ||
this.outputSchema = outputSchema; | ||
this.verifyRowValues = verifyRowValues; | ||
} | ||
|
||
@Setup | ||
|
@@ -185,14 +193,19 @@ public void processElement(ProcessContext c) { | |
return; | ||
} | ||
|
||
Row.Builder output = Row.withSchema(outputSchema); | ||
List<Object> values = Lists.newArrayListWithExpectedSize(outputSchema.getFieldCount()); | ||
for (int i = 0; i < outputSchema.getFieldCount(); i++) { | ||
// TODO[BEAM-8630]: performance optimization by bundling the gRPC calls | ||
Value v = projectExps.get(i).execute(columns, params); | ||
output.addValue( | ||
ZetaSqlUtils.zetaSqlValueToJavaObject(v, outputSchema.getField(i).getType())); | ||
values.add( | ||
ZetaSqlUtils.zetaSqlValueToJavaObject( | ||
v, outputSchema.getField(i).getType(), verifyRowValues)); | ||
} | ||
c.output(output.build()); | ||
Row outputRow = | ||
verifyRowValues | ||
? Row.withSchema(outputSchema).addValues(values).build() | ||
: Row.withSchema(outputSchema).attachValues(values); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
c.output(outputRow); | ||
} | ||
|
||
@Teardown | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -188,7 +188,8 @@ public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) { | |
return Value.createStructValue(createZetaSqlStructTypeFromBeamSchema(schema), values); | ||
} | ||
|
||
public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType) { | ||
public static Object zetaSqlValueToJavaObject( | ||
Value value, FieldType fieldType, boolean verifyValues) { | ||
if (value.isNull()) { | ||
return null; | ||
} | ||
|
@@ -213,9 +214,10 @@ public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType) | |
case BYTES: | ||
return value.getBytesValue().toByteArray(); | ||
case ARRAY: | ||
return zetaSqlArrayValueToJavaList(value, fieldType.getCollectionElementType()); | ||
return zetaSqlArrayValueToJavaList( | ||
value, fieldType.getCollectionElementType(), verifyValues); | ||
case ROW: | ||
return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema()); | ||
return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues); | ||
default: | ||
throw new UnsupportedOperationException( | ||
"Unsupported Beam fieldType: " + fieldType.getTypeName()); | ||
|
@@ -227,18 +229,25 @@ private static Instant zetaSqlTimestampValueToJodaInstant(Value timestampValue) | |
return Instant.ofEpochMilli(millis); | ||
} | ||
|
||
private static List<Object> zetaSqlArrayValueToJavaList(Value arrayValue, FieldType elementType) { | ||
private static List<Object> zetaSqlArrayValueToJavaList( | ||
Value arrayValue, FieldType elementType, boolean verifyValues) { | ||
return arrayValue.getElementList().stream() | ||
.map(e -> zetaSqlValueToJavaObject(e, elementType)) | ||
.map(e -> zetaSqlValueToJavaObject(e, elementType, verifyValues)) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private static Row zetaSqlStructValueToBeamRow(Value structValue, Schema schema) { | ||
private static Row zetaSqlStructValueToBeamRow( | ||
Value structValue, Schema schema, boolean verifyValues) { | ||
List<Object> objects = new ArrayList<>(schema.getFieldCount()); | ||
List<Value> values = structValue.getFieldList(); | ||
for (int i = 0; i < values.size(); i++) { | ||
objects.add(zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType())); | ||
objects.add( | ||
zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType(), verifyValues)); | ||
} | ||
return Row.withSchema(schema).addValues(objects).build(); | ||
Row row = | ||
verifyValues | ||
? Row.withSchema(schema).addValues(objects).build() | ||
: Row.withSchema(schema).attachValues(objects); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
return row; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is missing a
.build()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pr/10883 changed attachValues() to return a Row, so build() not needed.