From 90bf97252500317fca287182dcb8a8e96f3d5878 Mon Sep 17 00:00:00 2001 From: reuvenlax Date: Sun, 31 Mar 2019 16:24:33 -0700 Subject: [PATCH] Merge pull request #7840: [BEAM-6602] BigQueryIO.write natively understands Beam schemas --- ...ltCoderCloudObjectTranslatorRegistrar.java | 2 - .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 113 +++++++++--- .../sdk/io/gcp/bigquery/BigQueryUtils.java | 138 +++++++++------ .../io/gcp/bigquery/DynamicDestinations.java | 6 +- .../bigquery/DynamicDestinationsHelpers.java | 30 +++- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 162 ++++++++++++++---- .../io/gcp/bigquery/BigQueryUtilsTest.java | 20 +-- 7 files changed, 340 insertions(+), 131 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java index d43106bc2d610..25d6df9be610d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DefaultCoderCloudObjectTranslatorRegistrar.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.coders.TextualIntegerCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder; import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2; import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; @@ -98,7 +97,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar KeyPrefixCoder.class, RandomAccessDataCoder.class, StringUtf8Coder.class, - TableDestinationCoder.class, TableDestinationCoderV2.class, TableRowJsonCoder.class, TextualIntegerCoder.class, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 98b114f7ef42f..85cdba478d136 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -201,7 +201,16 @@ * BigQueryIO.Write#withFormatFunction(SerializableFunction)}. * *
{@code
- * class Quote { Instant timestamp; String exchange; String symbol; double price; }
+ * class Quote {
+ *   final Instant timestamp;
+ *   final String exchange;
+ *   final String symbol;
+ *   final double price;
+ *
+ *   Quote(Instant timestamp, String exchange, String symbol, double price) {
+ *     // initialize all member variables.
+ *   }
+ * }
  *
  * PCollection quotes = ...
  *
@@ -223,6 +232,34 @@
  * written to must already exist. Unbounded PCollections can only be written using {@link
  * Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.
  *
+ * 

BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on + * the input PCollection. Beam can also automatically format the input into a TableRow in this case, + * if no format function is provide. In the above example, the quotes PCollection has a schema that + * Beam infers from the Quote POJO. So the write could be done more simply as follows: + * + *

{@code
+ * {@literal @}DefaultSchema(JavaFieldSchema.class)
+ * class Quote {
+ *   final Instant timestamp;
+ *   final String exchange;
+ *   final String symbol;
+ *   final double price;
+ *
+ *   {@literal @}SchemaCreate
+ *   Quote(Instant timestamp, String exchange, String symbol, double price) {
+ *     // initialize all member variables.
+ *   }
+ * }
+ *
+ * PCollection quotes = ...
+ *
+ * quotes.apply(BigQueryIO
+ *     .write()
+ *     .to("my-project:my_dataset.my_table")
+ *     .useBeamSchema()
+ *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ * }
+ * *

Loading historical data into time-partitioned BigQuery tables

* *

To load historical data into a time-partitioned BigQuery table, specify {@link @@ -1331,6 +1368,7 @@ public static Write write() { .setMaxFilesPerPartition(BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION) .setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION) .setOptimizeWrites(false) + .setUseBeamSchema(false) .build(); } @@ -1452,6 +1490,8 @@ public enum Method { abstract Boolean getOptimizeWrites(); + abstract Boolean getUseBeamSchema(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1511,6 +1551,8 @@ abstract Builder setTableFunction( abstract Builder setOptimizeWrites(Boolean optimizeWrites); + abstract Builder setUseBeamSchema(Boolean useBeamSchema); + abstract Write build(); } @@ -1618,7 +1660,6 @@ public Write to(DynamicDestinations dynamicDestinations) { /** Formats the user's type into a {@link TableRow} to be written to BigQuery. */ public Write withFormatFunction(SerializableFunction formatFunction) { - checkArgument(formatFunction != null, "formatFunction can not be null"); return toBuilder().setFormatFunction(formatFunction).build(); } @@ -1826,10 +1867,20 @@ public Write withKmsKey(String kmsKey) { * BigQuery. Not enabled by default in order to maintain backwards compatibility. */ @Experimental - public Write withOptimizedWrites() { + public Write optimizedWrites() { return toBuilder().setOptimizeWrites(true).build(); } + /** + * If true, then the BigQuery schema will be inferred from the input schema. If no + * formatFunction is set, then BigQueryIO will automatically turn the input records into + * TableRows that match the schema. + */ + @Experimental + public Write useBeamSchema() { + return toBuilder().setUseBeamSchema(true).build(); + } + @VisibleForTesting /** This method is for test usage only */ public Write withTestServices(BigQueryServices testServices) { @@ -1910,19 +1961,6 @@ public WriteResult expand(PCollection input) { || getDynamicDestinations() != null, "must set the table reference of a BigQueryIO.Write transform"); - checkArgument( - getFormatFunction() != null, - "A function must be provided to convert type into a TableRow. " - + "use BigQueryIO.Write.withFormatFunction to provide a formatting function."); - - // Require a schema if creating one or more tables. - checkArgument( - getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED - || getJsonSchema() != null - || getDynamicDestinations() != null - || getSchemaFromView() != null, - "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); - List allToArgs = Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations()); checkArgument( @@ -1995,8 +2033,9 @@ public WriteResult expand(PCollection input) { // Wrap with a DynamicDestinations class that will provide the proper TimePartitioning. if (getJsonTimePartitioning() != null) { dynamicDestinations = - new ConstantTimePartitioningDestinations( - dynamicDestinations, getJsonTimePartitioning()); + new ConstantTimePartitioningDestinations<>( + (DynamicDestinations) dynamicDestinations, + getJsonTimePartitioning()); } } return expandTyped(input, dynamicDestinations); @@ -2004,6 +2043,38 @@ public WriteResult expand(PCollection input) { private WriteResult expandTyped( PCollection input, DynamicDestinations dynamicDestinations) { + boolean optimizeWrites = getOptimizeWrites(); + SerializableFunction formatFunction = getFormatFunction(); + if (getUseBeamSchema()) { + checkArgument(input.hasSchema()); + optimizeWrites = true; + if (formatFunction == null) { + // If no format function set, then we will automatically convert the input type to a + // TableRow. + formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction()); + } + // Infer the TableSchema from the input Beam schema. + TableSchema tableSchema = BigQueryUtils.toTableSchema(input.getSchema()); + dynamicDestinations = + new ConstantSchemaDestinations<>( + dynamicDestinations, + StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema))); + } else { + // Require a schema if creating one or more tables. + checkArgument( + getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED + || getJsonSchema() != null + || getDynamicDestinations() != null + || getSchemaFromView() != null, + "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided."); + } + + checkArgument( + formatFunction != null, + "A function must be provided to convert type into a TableRow. " + + "use BigQueryIO.Write.withFormatFunction to provide a formatting function." + + "A format function is not required if Beam schemas are used."); + Coder destinationCoder = null; try { destinationCoder = @@ -2014,7 +2085,7 @@ private WriteResult expandTyped( } Method method = resolveMethod(input); - if (getOptimizeWrites()) { + if (optimizeWrites) { PCollection> rowsWithDestination = input .apply( @@ -2026,12 +2097,12 @@ private WriteResult expandTyped( input.getCoder(), destinationCoder, dynamicDestinations, - getFormatFunction(), + formatFunction, method); } else { PCollection> rowsWithDestination = input - .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction())) + .apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, formatFunction)) .setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of())); return continueExpandTyped( rowsWithDestination, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java index ceab03a523127..6d1d15a78834a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java @@ -26,41 +26,30 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.IntStream; import org.apache.avro.generic.GenericRecord; -import org.apache.beam.sdk.coders.RowCoder; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.FieldType; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.transforms.SerializableFunctions; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v20_0.com.google.common.io.BaseEncoding; import org.joda.time.DateTime; import org.joda.time.Instant; import org.joda.time.chrono.ISOChronology; import org.joda.time.format.DateTimeFormatter; import org.joda.time.format.DateTimeFormatterBuilder; -/** - * Utility methods for BigQuery related operations. - * - *

Example: Writing to BigQuery - * - *

{@code
- * PCollection rows = ...;
- *
- * rows.apply(BigQueryIO.write()
- *       .withSchema(BigQueryUtils.toTableSchema(rows))
- *       .withFormatFunction(BigQueryUtils.toTableRow())
- *       .to("my-project:my_dataset.my_table"));
- * }
- */ +/** Utility methods for BigQuery related operations. */ public class BigQueryUtils { private static final Map BEAM_TO_BIGQUERY_TYPE_MAPPING = ImmutableMap.builder() @@ -76,6 +65,7 @@ public class BigQueryUtils { .put(TypeName.ROW, StandardSQLTypeName.STRUCT) .put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP) .put(TypeName.STRING, StandardSQLTypeName.STRING) + .put(TypeName.BYTES, StandardSQLTypeName.BYTES) .build(); private static final Map> JSON_VALUE_PARSERS = @@ -137,12 +127,18 @@ private static List toTableFieldSchema(Schema schema) { } if (TypeName.ARRAY == type.getTypeName()) { type = type.getCollectionElementType(); + if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) { + throw new IllegalArgumentException("Array of collection is not supported in BigQuery."); + } field.setMode(Mode.REPEATED.toString()); } if (TypeName.ROW == type.getTypeName()) { Schema subType = type.getRowSchema(); field.setFields(toTableFieldSchema(subType)); } + if (TypeName.MAP == type.getTypeName()) { + throw new IllegalArgumentException("Maps are not supported in BigQuery."); + } field.setType(toStandardSQLTypeName(type).toString()); fields.add(field); @@ -155,17 +151,18 @@ public static TableSchema toTableSchema(Schema schema) { return new TableSchema().setFields(toTableFieldSchema(schema)); } - /** Convert a Beam {@link PCollection} to a BigQuery {@link TableSchema}. */ - public static TableSchema toTableSchema(PCollection rows) { - RowCoder coder = (RowCoder) rows.getCoder(); - return toTableSchema(coder.getSchema()); - } - - private static final SerializableFunction TO_TABLE_ROW = new ToTableRow(); + private static final SerializableFunction ROW_TO_TABLE_ROW = + new ToTableRow(SerializableFunctions.identity()); /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */ public static SerializableFunction toTableRow() { - return TO_TABLE_ROW; + return ROW_TO_TABLE_ROW; + } + + /** Convert a Beam schema type to a BigQuery {@link TableRow}. */ + public static SerializableFunction toTableRow( + SerializableFunction toRow) { + return new ToTableRow<>(toRow); } /** Convert {@link SchemaAndRecord} to a Beam {@link Row}. */ @@ -174,10 +171,16 @@ public static SerializableFunction toBeamRow(Schema schema } /** Convert a Beam {@link Row} to a BigQuery {@link TableRow}. */ - private static class ToTableRow implements SerializableFunction { + private static class ToTableRow implements SerializableFunction { + private final SerializableFunction toRow; + + ToTableRow(SerializableFunction toRow) { + this.toRow = toRow; + } + @Override - public TableRow apply(Row input) { - return toTableRow(input); + public TableRow apply(T input) { + return toTableRow(toRow.apply(input)); } } @@ -214,40 +217,61 @@ public static TableRow toTableRow(Row row) { TableRow output = new TableRow(); for (int i = 0; i < row.getFieldCount(); i++) { Object value = row.getValue(i); - Field schemaField = row.getSchema().getField(i); - TypeName type = schemaField.getType().getTypeName(); - - switch (type) { - case ARRAY: - type = schemaField.getType().getCollectionElementType().getTypeName(); - if (TypeName.ROW == type) { - List rows = (List) value; - List tableRows = new ArrayList<>(rows.size()); - for (int j = 0; j < rows.size(); j++) { - tableRows.add(toTableRow(rows.get(j))); - } - value = tableRows; - } - break; - case ROW: - value = toTableRow((Row) value); - break; - case DATETIME: - DateTimeFormatter patternFormat = - new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") - .toFormatter(); - value = value == null ? null : ((Instant) value).toDateTime().toString(patternFormat); - break; - default: - value = row.getValue(i); - break; + output = output.set(schemaField.getName(), fromBeamField(schemaField.getType(), value)); + } + return output; + } + + private static Object fromBeamField(FieldType fieldType, Object fieldValue) { + if (fieldValue == null) { + if (!fieldType.getNullable()) { + throw new IllegalArgumentException("Field is not nullable."); } + return null; + } - output = output.set(schemaField.getName(), value); + switch (fieldType.getTypeName()) { + case ARRAY: + FieldType elementType = fieldType.getCollectionElementType(); + List items = (List) fieldValue; + List convertedItems = Lists.newArrayListWithCapacity(items.size()); + for (Object item : items) { + convertedItems.add(fromBeamField(elementType, item)); + } + return convertedItems; + + case ROW: + return toTableRow((Row) fieldValue); + + case DATETIME: + DateTimeFormatter patternFormat = + new DateTimeFormatterBuilder() + .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ") + .toFormatter(); + return ((Instant) fieldValue).toDateTime().toString(patternFormat); + + case INT16: + case INT32: + case INT64: + case FLOAT: + case DOUBLE: + case STRING: + case BOOLEAN: + return fieldValue.toString(); + + case DECIMAL: + return fieldValue.toString(); + + case BYTES: + ByteBuffer byteBuffer = (ByteBuffer) fieldValue; + byte[] bytes = new byte[byteBuffer.limit()]; + byteBuffer.get(bytes); + return BaseEncoding.base64().encode(bytes); + + default: + return fieldValue; } - return output; } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index c1567ccb0de2d..ea3d4352b4567 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -112,11 +112,7 @@ protected final SideInputT sideInput(PCollectionView vi return sideInputAccessor.sideInput(view); } - final void setSideInputAccessor(SideInputAccessor sideInputAccessor) { - this.sideInputAccessor = sideInputAccessor; - } - - final void setSideInputAccessorFromProcessContext(DoFn.ProcessContext context) { + void setSideInputAccessorFromProcessContext(DoFn.ProcessContext context) { this.sideInputAccessor = new SideInputAccessorViaProcessContext(context); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index 99c3f6f64fffc..d006220e40bbe 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -23,10 +23,13 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -75,7 +78,7 @@ public TableDestination getTable(TableDestination destination) { @Override public Coder getDestinationCoder() { - return TableDestinationCoder.of(); + return TableDestinationCoderV2.of(); } } @@ -148,6 +151,23 @@ public Coder getDestinationCoder() { return inner.getDestinationCoder(); } + @Override + Coder getDestinationCoderWithDefault(CoderRegistry registry) + throws CannotProvideCoderException { + return inner.getDestinationCoderWithDefault(registry); + } + + @Override + public List> getSideInputs() { + return inner.getSideInputs(); + } + + @Override + void setSideInputAccessorFromProcessContext(DoFn.ProcessContext context) { + super.setSideInputAccessorFromProcessContext(context); + inner.setSideInputAccessorFromProcessContext(context); + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("inner", inner).toString(); @@ -155,19 +175,19 @@ public String toString() { } /** Returns the same schema for every table. */ - static class ConstantSchemaDestinations - extends DelegatingDynamicDestinations { + static class ConstantSchemaDestinations + extends DelegatingDynamicDestinations { @Nullable private final ValueProvider jsonSchema; ConstantSchemaDestinations( - DynamicDestinations inner, ValueProvider jsonSchema) { + DynamicDestinations inner, ValueProvider jsonSchema) { super(inner); checkArgument(jsonSchema != null, "jsonSchema can not be null"); this.jsonSchema = jsonSchema; } @Override - public TableSchema getSchema(TableDestination destination) { + public TableSchema getSchema(DestinationT destination) { String jsonSchema = this.jsonSchema.get(); checkArgument(jsonSchema != null, "jsonSchema can not be null"); return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index df18467feb413..a2ea13b03156e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -61,8 +61,14 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.schemas.JavaFieldSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaCreate; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -84,6 +90,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; @@ -166,7 +173,7 @@ public void tearDown() throws IOException { } // Create an intermediate type to ensure that coder inference up the inheritance tree is tested. - abstract static class StringIntegerDestinations extends DynamicDestinations {} + abstract static class StringLongDestinations extends DynamicDestinations {} @Test public void testWriteEmptyPCollection() throws Exception { @@ -193,15 +200,28 @@ public void testWriteEmptyPCollection() throws Exception { @Test public void testWriteDynamicDestinationsBatch() throws Exception { - writeDynamicDestinations(false); + writeDynamicDestinations(false, false); + } + + @Test + public void testWriteDynamicDestinationsBatchWithSchemas() throws Exception { + writeDynamicDestinations(false, true); } @Test public void testWriteDynamicDestinationsStreaming() throws Exception { - writeDynamicDestinations(true); + writeDynamicDestinations(true, false); } - public void writeDynamicDestinations(boolean streaming) throws Exception { + @Test + public void testWriteDynamicDestinationsStreamingWithSchemas() throws Exception { + writeDynamicDestinations(true, true); + } + + public void writeDynamicDestinations(boolean streaming, boolean schemas) throws Exception { + final Schema schema = + Schema.builder().addField("name", FieldType.STRING).addField("id", FieldType.INT32).build(); + final Pattern userPattern = Pattern.compile("([a-z]+)([0-9]+)"); final PCollectionView> sideInput1 = @@ -231,43 +251,45 @@ public void writeDynamicDestinations(boolean streaming) throws Exception { users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED); } + if (schemas) { + users = + users.setSchema( + schema, + user -> { + Matcher matcher = userPattern.matcher(user); + checkState(matcher.matches()); + return Row.withSchema(schema) + .addValue(matcher.group(1)) + .addValue(Integer.valueOf(matcher.group(2))) + .build(); + }, + r -> r.getString(0) + r.getInt32(1)); + } + // Use a partition decorator to verify that partition decorators are supported. final String partitionDecorator = "20171127"; - users.apply( - "WriteBigQuery", + BigQueryIO.Write write = BigQueryIO.write() .withTestServices(fakeBqServices) .withMaxFilesPerBundle(5) .withMaxFileSize(10) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withFormatFunction( - user -> { - Matcher matcher = userPattern.matcher(user); - if (matcher.matches()) { - return new TableRow() - .set("name", matcher.group(1)) - .set("id", Integer.valueOf(matcher.group(2))); - } - throw new RuntimeException("Unmatching element " + user); - }) .to( - new StringIntegerDestinations() { + new StringLongDestinations() { @Override - public Integer getDestination(ValueInSingleWindow element) { + public Long getDestination(ValueInSingleWindow element) { assertThat( element.getWindow(), Matchers.instanceOf(PartitionedGlobalWindow.class)); Matcher matcher = userPattern.matcher(element.getValue()); - if (matcher.matches()) { - // Since we name tables by userid, we can simply store an Integer to represent - // a table. - return Integer.valueOf(matcher.group(2)); - } - throw new RuntimeException("Unmatching destination " + element.getValue()); + checkState(matcher.matches()); + // Since we name tables by userid, we can simply store a Long to represent + // a table. + return Long.valueOf(matcher.group(2)); } @Override - public TableDestination getTable(Integer userId) { + public TableDestination getTable(Long userId) { verifySideInputs(); // Each user in it's own table. return new TableDestination( @@ -276,7 +298,7 @@ public TableDestination getTable(Integer userId) { } @Override - public TableSchema getSchema(Integer userId) { + public TableSchema getSchema(Long userId) { verifySideInputs(); return new TableSchema() .setFields( @@ -299,21 +321,33 @@ private void verifySideInputs() { allOf(hasEntry("a", "a"), hasEntry("b", "b"), hasEntry("c", "c"))); } }) - .withoutValidation()); + .withoutValidation(); + if (schemas) { + write = write.useBeamSchema(); + } else { + write = + write.withFormatFunction( + user -> { + Matcher matcher = userPattern.matcher(user); + checkState(matcher.matches()); + return new TableRow().set("name", matcher.group(1)).set("id", matcher.group(2)); + }); + } + users.apply("WriteBigQuery", write); p.run(); - Map> expectedTableRows = Maps.newHashMap(); + Map> expectedTableRows = Maps.newHashMap(); for (String anUserList : userList) { Matcher matcher = userPattern.matcher(anUserList); checkState(matcher.matches()); String nickname = matcher.group(1); - int userid = Integer.valueOf(matcher.group(2)); + Long userid = Long.valueOf(matcher.group(2)); List expected = expectedTableRows.computeIfAbsent(userid, k -> Lists.newArrayList()); - expected.add(new TableRow().set("name", nickname).set("id", userid)); + expected.add(new TableRow().set("name", nickname).set("id", userid.toString())); } - for (Map.Entry> entry : expectedTableRows.entrySet()) { + for (Map.Entry> entry : expectedTableRows.entrySet()) { assertThat( fakeDatasetService.getAllRows("project-id", "dataset-id", "userid-" + entry.getKey()), containsInAnyOrder(Iterables.toArray(entry.getValue(), TableRow.class))); @@ -577,6 +611,72 @@ public void testStreamingWrite() throws Exception { new TableRow().set("name", "d").set("number", 4))); } + @DefaultSchema(JavaFieldSchema.class) + static class SchemaPojo { + final String name; + final int number; + + @SchemaCreate + SchemaPojo(String name, int number) { + this.name = name; + this.number = number; + } + } + + @Test + public void testSchemaWriteLoads() throws Exception { + p.apply( + Create.of( + new SchemaPojo("a", 1), + new SchemaPojo("b", 2), + new SchemaPojo("c", 3), + new SchemaPojo("d", 4))) + .apply( + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withMethod(Method.FILE_LOADS) + .useBeamSchema() + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3"), + new TableRow().set("name", "d").set("number", "4"))); + } + + @Test + public void testSchemaWriteStreams() throws Exception { + p.apply( + Create.of( + new SchemaPojo("a", 1), + new SchemaPojo("b", 2), + new SchemaPojo("c", 3), + new SchemaPojo("d", 4))) + .apply( + BigQueryIO.write() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withMethod(Method.STREAMING_INSERTS) + .useBeamSchema() + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + assertThat( + fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"), + containsInAnyOrder( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3"), + new TableRow().set("name", "d").set("number", "4"))); + } + /** * A generic window function that allows partitioning data into windows by a string value. * diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java index 2f599d793788e..3e8a8803e3b8e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java @@ -136,17 +136,17 @@ public void testToTableRow_flat() { TableRow row = toTableRow().apply(FLAT_ROW); assertThat(row.size(), equalTo(5)); - assertThat(row, hasEntry("id", 123L)); - assertThat(row, hasEntry("value", 123.456)); + assertThat(row, hasEntry("id", "123")); + assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("name", "test")); - assertThat(row, hasEntry("valid", false)); + assertThat(row, hasEntry("valid", "false")); } @Test public void testToTableRow_array() { TableRow row = toTableRow().apply(ARRAY_ROW); - assertThat(row, hasEntry("ids", Arrays.asList(123L, 124L))); + assertThat(row, hasEntry("ids", Arrays.asList("123", "124"))); assertThat(row.size(), equalTo(1)); } @@ -157,10 +157,10 @@ public void testToTableRow_row() { assertThat(row.size(), equalTo(1)); row = (TableRow) row.get("row"); assertThat(row.size(), equalTo(5)); - assertThat(row, hasEntry("id", 123L)); - assertThat(row, hasEntry("value", 123.456)); + assertThat(row, hasEntry("id", "123")); + assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("name", "test")); - assertThat(row, hasEntry("valid", false)); + assertThat(row, hasEntry("valid", "false")); } @Test @@ -170,10 +170,10 @@ public void testToTableRow_array_row() { assertThat(row.size(), equalTo(1)); row = ((List) row.get("rows")).get(0); assertThat(row.size(), equalTo(5)); - assertThat(row, hasEntry("id", 123L)); - assertThat(row, hasEntry("value", 123.456)); + assertThat(row, hasEntry("id", "123")); + assertThat(row, hasEntry("value", "123.456")); assertThat(row, hasEntry("name", "test")); - assertThat(row, hasEntry("valid", false)); + assertThat(row, hasEntry("valid", "false")); } @Test