Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use attachValues in SQL #11259

Merged
merged 7 commits into from
Apr 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ public interface BeamSqlPipelineOptions extends PipelineOptions {
String getPlannerName();

void setPlannerName(String className);

@Description("Enables extra verification of row values for debugging.")
@Default.Boolean(false)
Boolean getVerifyRowValues();

void setVerifyRowValues(Boolean verifyRowValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
Expand Down Expand Up @@ -280,9 +281,13 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
ignoreValues = true;
}

boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
return windowedStream
.apply(combiner)
.apply("mergeRecord", ParDo.of(mergeRecord(outputSchema, windowFieldIndex, ignoreValues)))
.apply(
"mergeRecord",
ParDo.of(mergeRecord(outputSchema, windowFieldIndex, ignoreValues, verifyRowValues)))
.setRowSchema(outputSchema);
}

Expand Down Expand Up @@ -324,7 +329,10 @@ private void validateWindowIsSupported(PCollection<Row> upstream) {
}

static DoFn<Row, Row> mergeRecord(
Schema outputSchema, int windowStartFieldIndex, boolean ignoreValues) {
Schema outputSchema,
int windowStartFieldIndex,
boolean ignoreValues,
boolean verifyRowValues) {
return new DoFn<Row, Row>() {
@ProcessElement
public void processElement(
Expand All @@ -343,7 +351,11 @@ public void processElement(
fieldValues.add(windowStartFieldIndex, ((IntervalWindow) window).start());
}

o.output(Row.withSchema(outputSchema).addValues(fieldValues).build());
Row row =
verifyRowValues
? Row.withSchema(outputSchema).addValues(fieldValues).build()
: Row.withSchema(outputSchema).attachValues(fieldValues);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is missing a .build()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pr/10883 changed attachValues() to return a Row, so build() not needed.

o.output(row);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import java.math.BigDecimal;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
Expand Down Expand Up @@ -79,11 +81,11 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BuiltInMethod;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.janino.ScriptEvaluator;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/** BeamRelNode to replace {@code Project} and {@code Filter} node. */
Expand Down Expand Up @@ -165,20 +167,30 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
new InputGetterImpl(input, upstream.getSchema()),
null);

// Expressions.call is equivalent to: output = Row.withSchema(outputSchema)
Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam);
Method addValue = Types.lookupMethod(Row.Builder.class, "addValue", Object.class);
boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();

List<Expression> listValues = Lists.newArrayListWithCapacity(expressions.size());
for (int index = 0; index < expressions.size(); index++) {
Expression value = expressions.get(index);
FieldType toType = outputSchema.getField(index).getType();

// Expressions.call is equivalent to: .addValue(value)
output = Expressions.call(output, addValue, castOutput(value, toType));
listValues.add(castOutput(value, toType));
}
Method newArrayList = Types.lookupMethod(Arrays.class, "asList");
Expression valueList = Expressions.call(newArrayList, listValues);

// Expressions.call is equivalent to: output =
// Row.withSchema(outputSchema).attachValue(values);
Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam);

// Expressions.call is equivalent to: .build();
output = Expressions.call(output, "build");
if (verifyRowValues) {
Method attachValues = Types.lookupMethod(Row.Builder.class, "addValues", List.class);
output = Expressions.call(output, attachValues, valueList);
output = Expressions.call(output, "build");
} else {
Method attachValues = Types.lookupMethod(Row.Builder.class, "attachValues", List.class);
output = Expressions.call(output, attachValues, valueList);
}

builder.add(
// Expressions.ifThen is equivalent to:
Expand Down Expand Up @@ -263,31 +275,41 @@ public void processElement(ProcessContext c) {
.build();

private static Expression castOutput(Expression value, FieldType toType) {
Expression returnValue = value;
if (value.getType() == Object.class || !(value.getType() instanceof Class)) {
// fast copy path, just pass object through
return value;
returnValue = value;
} else if (CalciteUtils.isDateTimeType(toType)
&& !Types.isAssignableFrom(ReadableInstant.class, (Class) value.getType())) {
return castOutputTime(value, toType);

returnValue = castOutputTime(value, toType);
} else if (toType.getTypeName() == TypeName.DECIMAL
&& !Types.isAssignableFrom(BigDecimal.class, (Class) value.getType())) {
return Expressions.new_(BigDecimal.class, value);
returnValue = Expressions.new_(BigDecimal.class, value);
} else if (toType.getTypeName() == TypeName.BYTES
&& Types.isAssignableFrom(ByteString.class, (Class) value.getType())) {

return Expressions.condition(
Expressions.equal(value, Expressions.constant(null)),
Expressions.constant(null),
Expressions.call(value, "getBytes"));
returnValue =
Expressions.condition(
Expressions.equal(value, Expressions.constant(null)),
Expressions.constant(null),
Expressions.call(value, "getBytes"));
} else if (((Class) value.getType()).isPrimitive()
|| Types.isAssignableFrom(Number.class, (Class) value.getType())) {
Type rawType = rawTypeMap.get(toType.getTypeName());
if (rawType != null) {
return Types.castIfNecessary(rawType, value);
returnValue = Types.castIfNecessary(rawType, value);
}
} else if (Types.isAssignableFrom(Iterable.class, value.getType())) {
// Passing an Iterable into newArrayList gets interpreted to mean copying each individual
// element. We want the
// entire Iterable to be treated as a single element, so we cast to Object.
returnValue = Expressions.convert_(value, Object.class);
}
return value;
returnValue =
Expressions.condition(
Expressions.equal(value, Expressions.constant(null)),
Expressions.constant(null),
returnValue);
return returnValue;
}

private static Expression castOutputTime(Expression value, FieldType toType) {
Expand All @@ -314,12 +336,8 @@ private static Expression castOutputTime(Expression value, FieldType toType) {
throw new UnsupportedOperationException("Unknown DateTime type " + toType);
}

// Second, convert to joda DateTime
valueDateTime =
Expressions.new_(
DateTime.class,
valueDateTime,
Expressions.parameter(DateTimeZone.class, "org.joda.time.DateTimeZone.UTC"));
// Second, convert to joda Instant
valueDateTime = Expressions.new_(Instant.class, valueDateTime);

// Third, make conversion conditional on non-null input.
if (!((Class) value.getType()).isPrimitive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
Expand All @@ -53,6 +54,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/**
* BeamRelNode to replace {@code Project} and {@code Filter} node based on the {@code ZetaSQL}
Expand Down Expand Up @@ -101,13 +103,16 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
.collect(Collectors.toList());
final RexNode condition = getProgram().getCondition();

boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
Schema outputSchema = CalciteUtils.toSchema(getRowType());
CalcFn calcFn =
new CalcFn(
projects,
condition == null ? null : unparseRexNode(condition),
upstream.getSchema(),
outputSchema);
outputSchema,
verifyRowValues);

// validate prepared expressions
calcFn.setup();
Expand All @@ -129,19 +134,22 @@ private static class CalcFn extends DoFn<Row, Row> {
@Nullable private final String condition;
private final Schema inputSchema;
private final Schema outputSchema;
private final boolean verifyRowValues;
private transient List<PreparedExpression> projectExps;
@Nullable private transient PreparedExpression conditionExp;

CalcFn(
List<String> projects,
@Nullable String condition,
Schema inputSchema,
Schema outputSchema) {
Schema outputSchema,
boolean verifyRowValues) {
Preconditions.checkArgument(projects.size() == outputSchema.getFieldCount());
this.projects = ImmutableList.copyOf(projects);
this.condition = condition;
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
this.verifyRowValues = verifyRowValues;
}

@Setup
Expand Down Expand Up @@ -185,14 +193,19 @@ public void processElement(ProcessContext c) {
return;
}

Row.Builder output = Row.withSchema(outputSchema);
List<Object> values = Lists.newArrayListWithExpectedSize(outputSchema.getFieldCount());
for (int i = 0; i < outputSchema.getFieldCount(); i++) {
// TODO[BEAM-8630]: performance optimization by bundling the gRPC calls
Value v = projectExps.get(i).execute(columns, params);
output.addValue(
ZetaSqlUtils.zetaSqlValueToJavaObject(v, outputSchema.getField(i).getType()));
values.add(
ZetaSqlUtils.zetaSqlValueToJavaObject(
v, outputSchema.getField(i).getType(), verifyRowValues));
}
c.output(output.build());
Row outputRow =
verifyRowValues
? Row.withSchema(outputSchema).addValues(values).build()
: Row.withSchema(outputSchema).attachValues(values);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

c.output(outputRow);
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) {
return Value.createStructValue(createZetaSqlStructTypeFromBeamSchema(schema), values);
}

public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType) {
public static Object zetaSqlValueToJavaObject(
Value value, FieldType fieldType, boolean verifyValues) {
if (value.isNull()) {
return null;
}
Expand All @@ -213,9 +214,10 @@ public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType)
case BYTES:
return value.getBytesValue().toByteArray();
case ARRAY:
return zetaSqlArrayValueToJavaList(value, fieldType.getCollectionElementType());
return zetaSqlArrayValueToJavaList(
value, fieldType.getCollectionElementType(), verifyValues);
case ROW:
return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema());
return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues);
default:
throw new UnsupportedOperationException(
"Unsupported Beam fieldType: " + fieldType.getTypeName());
Expand All @@ -227,18 +229,25 @@ private static Instant zetaSqlTimestampValueToJodaInstant(Value timestampValue)
return Instant.ofEpochMilli(millis);
}

private static List<Object> zetaSqlArrayValueToJavaList(Value arrayValue, FieldType elementType) {
private static List<Object> zetaSqlArrayValueToJavaList(
Value arrayValue, FieldType elementType, boolean verifyValues) {
return arrayValue.getElementList().stream()
.map(e -> zetaSqlValueToJavaObject(e, elementType))
.map(e -> zetaSqlValueToJavaObject(e, elementType, verifyValues))
.collect(Collectors.toList());
}

private static Row zetaSqlStructValueToBeamRow(Value structValue, Schema schema) {
private static Row zetaSqlStructValueToBeamRow(
Value structValue, Schema schema, boolean verifyValues) {
List<Object> objects = new ArrayList<>(schema.getFieldCount());
List<Value> values = structValue.getFieldList();
for (int i = 0; i < values.size(); i++) {
objects.add(zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType()));
objects.add(
zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType(), verifyValues));
}
return Row.withSchema(schema).addValues(objects).build();
Row row =
verifyValues
? Row.withSchema(schema).addValues(objects).build()
: Row.withSchema(schema).attachValues(objects);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void testJavaObjectToZetaSqlValue() {

@Test
public void testZetaSqlValueToJavaObject() {
assertEquals(ZetaSqlUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE), TEST_ROW);
assertEquals(
ZetaSqlUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE, true), TEST_ROW);
}
}