Skip to content

Commit

Permalink
Merge pull request apache#11259: Use attachValues in SQL
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Apr 5, 2020
1 parent 1e52e42 commit ed5a358
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,10 @@ public interface BeamSqlPipelineOptions extends PipelineOptions {
String getPlannerName();

void setPlannerName(String className);

@Description("Enables extra verification of row values for debugging.")
@Default.Boolean(false)
Boolean getVerifyRowValues();

void setVerifyRowValues(Boolean verifyRowValues);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter;
Expand Down Expand Up @@ -280,9 +281,13 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
ignoreValues = true;
}

boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
return windowedStream
.apply(combiner)
.apply("mergeRecord", ParDo.of(mergeRecord(outputSchema, windowFieldIndex, ignoreValues)))
.apply(
"mergeRecord",
ParDo.of(mergeRecord(outputSchema, windowFieldIndex, ignoreValues, verifyRowValues)))
.setRowSchema(outputSchema);
}

Expand Down Expand Up @@ -324,7 +329,10 @@ private void validateWindowIsSupported(PCollection<Row> upstream) {
}

static DoFn<Row, Row> mergeRecord(
Schema outputSchema, int windowStartFieldIndex, boolean ignoreValues) {
Schema outputSchema,
int windowStartFieldIndex,
boolean ignoreValues,
boolean verifyRowValues) {
return new DoFn<Row, Row>() {
@ProcessElement
public void processElement(
Expand All @@ -343,7 +351,11 @@ public void processElement(
fieldValues.add(windowStartFieldIndex, ((IntervalWindow) window).start());
}

o.output(Row.withSchema(outputSchema).addValues(fieldValues).build());
Row row =
verifyRowValues
? Row.withSchema(outputSchema).addValues(fieldValues).build()
: Row.withSchema(outputSchema).attachValues(fieldValues);
o.output(row);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import java.math.BigDecimal;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType;
Expand Down Expand Up @@ -79,11 +81,11 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BuiltInMethod;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.janino.ScriptEvaluator;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

/** BeamRelNode to replace {@code Project} and {@code Filter} node. */
Expand Down Expand Up @@ -165,20 +167,30 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
new InputGetterImpl(input, upstream.getSchema()),
null);

// Expressions.call is equivalent to: output = Row.withSchema(outputSchema)
Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam);
Method addValue = Types.lookupMethod(Row.Builder.class, "addValue", Object.class);
boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();

List<Expression> listValues = Lists.newArrayListWithCapacity(expressions.size());
for (int index = 0; index < expressions.size(); index++) {
Expression value = expressions.get(index);
FieldType toType = outputSchema.getField(index).getType();

// Expressions.call is equivalent to: .addValue(value)
output = Expressions.call(output, addValue, castOutput(value, toType));
listValues.add(castOutput(value, toType));
}
Method newArrayList = Types.lookupMethod(Arrays.class, "asList");
Expression valueList = Expressions.call(newArrayList, listValues);

// Expressions.call is equivalent to: output =
// Row.withSchema(outputSchema).attachValue(values);
Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam);

// Expressions.call is equivalent to: .build();
output = Expressions.call(output, "build");
if (verifyRowValues) {
Method attachValues = Types.lookupMethod(Row.Builder.class, "addValues", List.class);
output = Expressions.call(output, attachValues, valueList);
output = Expressions.call(output, "build");
} else {
Method attachValues = Types.lookupMethod(Row.Builder.class, "attachValues", List.class);
output = Expressions.call(output, attachValues, valueList);
}

builder.add(
// Expressions.ifThen is equivalent to:
Expand Down Expand Up @@ -263,31 +275,41 @@ public void processElement(ProcessContext c) {
.build();

private static Expression castOutput(Expression value, FieldType toType) {
Expression returnValue = value;
if (value.getType() == Object.class || !(value.getType() instanceof Class)) {
// fast copy path, just pass object through
return value;
returnValue = value;
} else if (CalciteUtils.isDateTimeType(toType)
&& !Types.isAssignableFrom(ReadableInstant.class, (Class) value.getType())) {
return castOutputTime(value, toType);

returnValue = castOutputTime(value, toType);
} else if (toType.getTypeName() == TypeName.DECIMAL
&& !Types.isAssignableFrom(BigDecimal.class, (Class) value.getType())) {
return Expressions.new_(BigDecimal.class, value);
returnValue = Expressions.new_(BigDecimal.class, value);
} else if (toType.getTypeName() == TypeName.BYTES
&& Types.isAssignableFrom(ByteString.class, (Class) value.getType())) {

return Expressions.condition(
Expressions.equal(value, Expressions.constant(null)),
Expressions.constant(null),
Expressions.call(value, "getBytes"));
returnValue =
Expressions.condition(
Expressions.equal(value, Expressions.constant(null)),
Expressions.constant(null),
Expressions.call(value, "getBytes"));
} else if (((Class) value.getType()).isPrimitive()
|| Types.isAssignableFrom(Number.class, (Class) value.getType())) {
Type rawType = rawTypeMap.get(toType.getTypeName());
if (rawType != null) {
return Types.castIfNecessary(rawType, value);
returnValue = Types.castIfNecessary(rawType, value);
}
} else if (Types.isAssignableFrom(Iterable.class, value.getType())) {
// Passing an Iterable into newArrayList gets interpreted to mean copying each individual
// element. We want the
// entire Iterable to be treated as a single element, so we cast to Object.
returnValue = Expressions.convert_(value, Object.class);
}
return value;
returnValue =
Expressions.condition(
Expressions.equal(value, Expressions.constant(null)),
Expressions.constant(null),
returnValue);
return returnValue;
}

private static Expression castOutputTime(Expression value, FieldType toType) {
Expand All @@ -314,12 +336,8 @@ private static Expression castOutputTime(Expression value, FieldType toType) {
throw new UnsupportedOperationException("Unknown DateTime type " + toType);
}

// Second, convert to joda DateTime
valueDateTime =
Expressions.new_(
DateTime.class,
valueDateTime,
Expressions.parameter(DateTimeZone.class, "org.joda.time.DateTimeZone.UTC"));
// Second, convert to joda Instant
valueDateTime = Expressions.new_(Instant.class, valueDateTime);

// Third, make conversion conditional on non-null input.
if (!((Class) value.getType()).isPrimitive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect;
Expand All @@ -53,6 +54,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;

/**
* BeamRelNode to replace {@code Project} and {@code Filter} node based on the {@code ZetaSQL}
Expand Down Expand Up @@ -101,13 +103,16 @@ public PCollection<Row> expand(PCollectionList<Row> pinput) {
.collect(Collectors.toList());
final RexNode condition = getProgram().getCondition();

boolean verifyRowValues =
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
Schema outputSchema = CalciteUtils.toSchema(getRowType());
CalcFn calcFn =
new CalcFn(
projects,
condition == null ? null : unparseRexNode(condition),
upstream.getSchema(),
outputSchema);
outputSchema,
verifyRowValues);

// validate prepared expressions
calcFn.setup();
Expand All @@ -129,19 +134,22 @@ private static class CalcFn extends DoFn<Row, Row> {
@Nullable private final String condition;
private final Schema inputSchema;
private final Schema outputSchema;
private final boolean verifyRowValues;
private transient List<PreparedExpression> projectExps;
@Nullable private transient PreparedExpression conditionExp;

CalcFn(
List<String> projects,
@Nullable String condition,
Schema inputSchema,
Schema outputSchema) {
Schema outputSchema,
boolean verifyRowValues) {
Preconditions.checkArgument(projects.size() == outputSchema.getFieldCount());
this.projects = ImmutableList.copyOf(projects);
this.condition = condition;
this.inputSchema = inputSchema;
this.outputSchema = outputSchema;
this.verifyRowValues = verifyRowValues;
}

@Setup
Expand Down Expand Up @@ -185,14 +193,19 @@ public void processElement(ProcessContext c) {
return;
}

Row.Builder output = Row.withSchema(outputSchema);
List<Object> values = Lists.newArrayListWithExpectedSize(outputSchema.getFieldCount());
for (int i = 0; i < outputSchema.getFieldCount(); i++) {
// TODO[BEAM-8630]: performance optimization by bundling the gRPC calls
Value v = projectExps.get(i).execute(columns, params);
output.addValue(
ZetaSqlUtils.zetaSqlValueToJavaObject(v, outputSchema.getField(i).getType()));
values.add(
ZetaSqlUtils.zetaSqlValueToJavaObject(
v, outputSchema.getField(i).getType(), verifyRowValues));
}
c.output(output.build());
Row outputRow =
verifyRowValues
? Row.withSchema(outputSchema).addValues(values).build()
: Row.withSchema(outputSchema).attachValues(values);
c.output(outputRow);
}

@Teardown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) {
return Value.createStructValue(createZetaSqlStructTypeFromBeamSchema(schema), values);
}

public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType) {
public static Object zetaSqlValueToJavaObject(
Value value, FieldType fieldType, boolean verifyValues) {
if (value.isNull()) {
return null;
}
Expand All @@ -213,9 +214,10 @@ public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType)
case BYTES:
return value.getBytesValue().toByteArray();
case ARRAY:
return zetaSqlArrayValueToJavaList(value, fieldType.getCollectionElementType());
return zetaSqlArrayValueToJavaList(
value, fieldType.getCollectionElementType(), verifyValues);
case ROW:
return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema());
return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues);
default:
throw new UnsupportedOperationException(
"Unsupported Beam fieldType: " + fieldType.getTypeName());
Expand All @@ -227,18 +229,25 @@ private static Instant zetaSqlTimestampValueToJodaInstant(Value timestampValue)
return Instant.ofEpochMilli(millis);
}

private static List<Object> zetaSqlArrayValueToJavaList(Value arrayValue, FieldType elementType) {
private static List<Object> zetaSqlArrayValueToJavaList(
Value arrayValue, FieldType elementType, boolean verifyValues) {
return arrayValue.getElementList().stream()
.map(e -> zetaSqlValueToJavaObject(e, elementType))
.map(e -> zetaSqlValueToJavaObject(e, elementType, verifyValues))
.collect(Collectors.toList());
}

private static Row zetaSqlStructValueToBeamRow(Value structValue, Schema schema) {
private static Row zetaSqlStructValueToBeamRow(
Value structValue, Schema schema, boolean verifyValues) {
List<Object> objects = new ArrayList<>(schema.getFieldCount());
List<Value> values = structValue.getFieldList();
for (int i = 0; i < values.size(); i++) {
objects.add(zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType()));
objects.add(
zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType(), verifyValues));
}
return Row.withSchema(schema).addValues(objects).build();
Row row =
verifyValues
? Row.withSchema(schema).addValues(objects).build()
: Row.withSchema(schema).attachValues(objects);
return row;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public void testJavaObjectToZetaSqlValue() {

@Test
public void testZetaSqlValueToJavaObject() {
assertEquals(ZetaSqlUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE), TEST_ROW);
assertEquals(
ZetaSqlUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE, true), TEST_ROW);
}
}

0 comments on commit ed5a358

Please sign in to comment.