diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java index 58d02a10d0902..578031cc843ba 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java @@ -29,4 +29,10 @@ public interface BeamSqlPipelineOptions extends PipelineOptions { String getPlannerName(); void setPlannerName(String className); + + @Description("Enables extra verification of row values for debugging.") + @Default.Boolean(false) + Boolean getVerifyRowValues(); + + void setVerifyRowValues(Boolean verifyRowValues); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java index 1be91e9db3347..e39db027919f2 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.util.List; import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; import org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter; @@ -280,9 +281,13 @@ public PCollection expand(PCollectionList pinput) { ignoreValues = true; } + boolean verifyRowValues = + pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues(); return windowedStream .apply(combiner) - .apply("mergeRecord", ParDo.of(mergeRecord(outputSchema, windowFieldIndex, ignoreValues))) + .apply( + "mergeRecord", + ParDo.of(mergeRecord(outputSchema, windowFieldIndex, ignoreValues, verifyRowValues))) .setRowSchema(outputSchema); } @@ -324,7 +329,10 @@ private void validateWindowIsSupported(PCollection upstream) { } static DoFn mergeRecord( - Schema outputSchema, int windowStartFieldIndex, boolean ignoreValues) { + Schema outputSchema, + int windowStartFieldIndex, + boolean ignoreValues, + boolean verifyRowValues) { return new DoFn() { @ProcessElement public void processElement( @@ -343,7 +351,11 @@ public void processElement( fieldValues.add(windowStartFieldIndex, ((IntervalWindow) window).start()); } - o.output(Row.withSchema(outputSchema).addValues(fieldValues).build()); + Row row = + verifyRowValues + ? Row.withSchema(outputSchema).addValues(fieldValues).build() + : Row.withSchema(outputSchema).attachValues(fieldValues); + o.output(row); } }; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java index 77cd3887053d2..b9821aa1f237c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java @@ -29,11 +29,13 @@ import java.math.BigDecimal; import java.util.AbstractList; import java.util.AbstractMap; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamJavaTypeFactory; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.CharType; @@ -79,11 +81,11 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.util.BuiltInMethod; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; import org.codehaus.commons.compiler.CompileException; import org.codehaus.janino.ScriptEvaluator; -import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; +import org.joda.time.Instant; import org.joda.time.ReadableInstant; /** BeamRelNode to replace {@code Project} and {@code Filter} node. */ @@ -165,20 +167,30 @@ public PCollection expand(PCollectionList pinput) { new InputGetterImpl(input, upstream.getSchema()), null); - // Expressions.call is equivalent to: output = Row.withSchema(outputSchema) - Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam); - Method addValue = Types.lookupMethod(Row.Builder.class, "addValue", Object.class); + boolean verifyRowValues = + pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues(); + List listValues = Lists.newArrayListWithCapacity(expressions.size()); for (int index = 0; index < expressions.size(); index++) { Expression value = expressions.get(index); FieldType toType = outputSchema.getField(index).getType(); - - // Expressions.call is equivalent to: .addValue(value) - output = Expressions.call(output, addValue, castOutput(value, toType)); + listValues.add(castOutput(value, toType)); } + Method newArrayList = Types.lookupMethod(Arrays.class, "asList"); + Expression valueList = Expressions.call(newArrayList, listValues); + + // Expressions.call is equivalent to: output = + // Row.withSchema(outputSchema).attachValue(values); + Expression output = Expressions.call(Row.class, "withSchema", outputSchemaParam); - // Expressions.call is equivalent to: .build(); - output = Expressions.call(output, "build"); + if (verifyRowValues) { + Method attachValues = Types.lookupMethod(Row.Builder.class, "addValues", List.class); + output = Expressions.call(output, attachValues, valueList); + output = Expressions.call(output, "build"); + } else { + Method attachValues = Types.lookupMethod(Row.Builder.class, "attachValues", List.class); + output = Expressions.call(output, attachValues, valueList); + } builder.add( // Expressions.ifThen is equivalent to: @@ -263,31 +275,41 @@ public void processElement(ProcessContext c) { .build(); private static Expression castOutput(Expression value, FieldType toType) { + Expression returnValue = value; if (value.getType() == Object.class || !(value.getType() instanceof Class)) { // fast copy path, just pass object through - return value; + returnValue = value; } else if (CalciteUtils.isDateTimeType(toType) && !Types.isAssignableFrom(ReadableInstant.class, (Class) value.getType())) { - return castOutputTime(value, toType); - + returnValue = castOutputTime(value, toType); } else if (toType.getTypeName() == TypeName.DECIMAL && !Types.isAssignableFrom(BigDecimal.class, (Class) value.getType())) { - return Expressions.new_(BigDecimal.class, value); + returnValue = Expressions.new_(BigDecimal.class, value); } else if (toType.getTypeName() == TypeName.BYTES && Types.isAssignableFrom(ByteString.class, (Class) value.getType())) { - - return Expressions.condition( - Expressions.equal(value, Expressions.constant(null)), - Expressions.constant(null), - Expressions.call(value, "getBytes")); + returnValue = + Expressions.condition( + Expressions.equal(value, Expressions.constant(null)), + Expressions.constant(null), + Expressions.call(value, "getBytes")); } else if (((Class) value.getType()).isPrimitive() || Types.isAssignableFrom(Number.class, (Class) value.getType())) { Type rawType = rawTypeMap.get(toType.getTypeName()); if (rawType != null) { - return Types.castIfNecessary(rawType, value); + returnValue = Types.castIfNecessary(rawType, value); } + } else if (Types.isAssignableFrom(Iterable.class, value.getType())) { + // Passing an Iterable into newArrayList gets interpreted to mean copying each individual + // element. We want the + // entire Iterable to be treated as a single element, so we cast to Object. + returnValue = Expressions.convert_(value, Object.class); } - return value; + returnValue = + Expressions.condition( + Expressions.equal(value, Expressions.constant(null)), + Expressions.constant(null), + returnValue); + return returnValue; } private static Expression castOutputTime(Expression value, FieldType toType) { @@ -314,12 +336,8 @@ private static Expression castOutputTime(Expression value, FieldType toType) { throw new UnsupportedOperationException("Unknown DateTime type " + toType); } - // Second, convert to joda DateTime - valueDateTime = - Expressions.new_( - DateTime.class, - valueDateTime, - Expressions.parameter(DateTimeZone.class, "org.joda.time.DateTimeZone.UTC")); + // Second, convert to joda Instant + valueDateTime = Expressions.new_(Instant.class, valueDateTime); // Third, make conversion conditional on non-null input. if (!((Class) value.getType()).isPrimitive()) { diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java index 0d6d25baa9bca..4861dbd7d3393 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.impl.rel.AbstractBeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BeamBigQuerySqlDialect; @@ -53,6 +54,7 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserPos; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; /** * BeamRelNode to replace {@code Project} and {@code Filter} node based on the {@code ZetaSQL} @@ -101,13 +103,16 @@ public PCollection expand(PCollectionList pinput) { .collect(Collectors.toList()); final RexNode condition = getProgram().getCondition(); + boolean verifyRowValues = + pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues(); Schema outputSchema = CalciteUtils.toSchema(getRowType()); CalcFn calcFn = new CalcFn( projects, condition == null ? null : unparseRexNode(condition), upstream.getSchema(), - outputSchema); + outputSchema, + verifyRowValues); // validate prepared expressions calcFn.setup(); @@ -129,6 +134,7 @@ private static class CalcFn extends DoFn { @Nullable private final String condition; private final Schema inputSchema; private final Schema outputSchema; + private final boolean verifyRowValues; private transient List projectExps; @Nullable private transient PreparedExpression conditionExp; @@ -136,12 +142,14 @@ private static class CalcFn extends DoFn { List projects, @Nullable String condition, Schema inputSchema, - Schema outputSchema) { + Schema outputSchema, + boolean verifyRowValues) { Preconditions.checkArgument(projects.size() == outputSchema.getFieldCount()); this.projects = ImmutableList.copyOf(projects); this.condition = condition; this.inputSchema = inputSchema; this.outputSchema = outputSchema; + this.verifyRowValues = verifyRowValues; } @Setup @@ -185,14 +193,19 @@ public void processElement(ProcessContext c) { return; } - Row.Builder output = Row.withSchema(outputSchema); + List values = Lists.newArrayListWithExpectedSize(outputSchema.getFieldCount()); for (int i = 0; i < outputSchema.getFieldCount(); i++) { // TODO[BEAM-8630]: performance optimization by bundling the gRPC calls Value v = projectExps.get(i).execute(columns, params); - output.addValue( - ZetaSqlUtils.zetaSqlValueToJavaObject(v, outputSchema.getField(i).getType())); + values.add( + ZetaSqlUtils.zetaSqlValueToJavaObject( + v, outputSchema.getField(i).getType(), verifyRowValues)); } - c.output(output.build()); + Row outputRow = + verifyRowValues + ? Row.withSchema(outputSchema).addValues(values).build() + : Row.withSchema(outputSchema).attachValues(values); + c.output(outputRow); } @Teardown diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java index 2ee1691312aee..f74e35ff245c0 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtils.java @@ -188,7 +188,8 @@ public static Value beamRowToZetaSqlStructValue(Row row, Schema schema) { return Value.createStructValue(createZetaSqlStructTypeFromBeamSchema(schema), values); } - public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType) { + public static Object zetaSqlValueToJavaObject( + Value value, FieldType fieldType, boolean verifyValues) { if (value.isNull()) { return null; } @@ -213,9 +214,10 @@ public static Object zetaSqlValueToJavaObject(Value value, FieldType fieldType) case BYTES: return value.getBytesValue().toByteArray(); case ARRAY: - return zetaSqlArrayValueToJavaList(value, fieldType.getCollectionElementType()); + return zetaSqlArrayValueToJavaList( + value, fieldType.getCollectionElementType(), verifyValues); case ROW: - return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema()); + return zetaSqlStructValueToBeamRow(value, fieldType.getRowSchema(), verifyValues); default: throw new UnsupportedOperationException( "Unsupported Beam fieldType: " + fieldType.getTypeName()); @@ -227,18 +229,25 @@ private static Instant zetaSqlTimestampValueToJodaInstant(Value timestampValue) return Instant.ofEpochMilli(millis); } - private static List zetaSqlArrayValueToJavaList(Value arrayValue, FieldType elementType) { + private static List zetaSqlArrayValueToJavaList( + Value arrayValue, FieldType elementType, boolean verifyValues) { return arrayValue.getElementList().stream() - .map(e -> zetaSqlValueToJavaObject(e, elementType)) + .map(e -> zetaSqlValueToJavaObject(e, elementType, verifyValues)) .collect(Collectors.toList()); } - private static Row zetaSqlStructValueToBeamRow(Value structValue, Schema schema) { + private static Row zetaSqlStructValueToBeamRow( + Value structValue, Schema schema, boolean verifyValues) { List objects = new ArrayList<>(schema.getFieldCount()); List values = structValue.getFieldList(); for (int i = 0; i < values.size(); i++) { - objects.add(zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType())); + objects.add( + zetaSqlValueToJavaObject(values.get(i), schema.getField(i).getType(), verifyValues)); } - return Row.withSchema(schema).addValues(objects).build(); + Row row = + verifyValues + ? Row.withSchema(schema).addValues(objects).build() + : Row.withSchema(schema).attachValues(objects); + return row; } } diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtilsTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtilsTest.java index a2da5c1bd6b6d..dbe7c18afd2eb 100644 --- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtilsTest.java +++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlUtilsTest.java @@ -127,6 +127,7 @@ public void testJavaObjectToZetaSqlValue() { @Test public void testZetaSqlValueToJavaObject() { - assertEquals(ZetaSqlUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE), TEST_ROW); + assertEquals( + ZetaSqlUtils.zetaSqlValueToJavaObject(TEST_VALUE, TEST_FIELD_TYPE, true), TEST_ROW); } }