Skip to content

Commit

Permalink
Merge pull request apache#7840: [BEAM-6602] BigQueryIO.write natively…
Browse files Browse the repository at this point in the history
… understands Beam schemas
  • Loading branch information
reuvenlax committed Mar 31, 2019
1 parent e5601fc commit 90bf972
Show file tree
Hide file tree
Showing 7 changed files with 340 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.beam.sdk.coders.TextualIntegerCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoder;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestinationCoderV2;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -98,7 +97,6 @@ public class DefaultCoderCloudObjectTranslatorRegistrar
KeyPrefixCoder.class,
RandomAccessDataCoder.class,
StringUtf8Coder.class,
TableDestinationCoder.class,
TableDestinationCoderV2.class,
TableRowJsonCoder.class,
TextualIntegerCoder.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,16 @@
* BigQueryIO.Write#withFormatFunction(SerializableFunction)}.
*
* <pre>{@code
* class Quote { Instant timestamp; String exchange; String symbol; double price; }
* class Quote {
* final Instant timestamp;
* final String exchange;
* final String symbol;
* final double price;
*
* Quote(Instant timestamp, String exchange, String symbol, double price) {
* // initialize all member variables.
* }
* }
*
* PCollection<Quote> quotes = ...
*
Expand All @@ -223,6 +232,34 @@
* written to must already exist. Unbounded PCollections can only be written using {@link
* Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.
*
* <p>BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on
* the input PCollection. Beam can also automatically format the input into a TableRow in this case,
* if no format function is provide. In the above example, the quotes PCollection has a schema that
* Beam infers from the Quote POJO. So the write could be done more simply as follows:
*
* <pre>{@code
* {@literal @}DefaultSchema(JavaFieldSchema.class)
* class Quote {
* final Instant timestamp;
* final String exchange;
* final String symbol;
* final double price;
*
* {@literal @}SchemaCreate
* Quote(Instant timestamp, String exchange, String symbol, double price) {
* // initialize all member variables.
* }
* }
*
* PCollection<Quote> quotes = ...
*
* quotes.apply(BigQueryIO
* .<Quote>write()
* .to("my-project:my_dataset.my_table")
* .useBeamSchema()
* .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
* }</pre>
*
* <h3>Loading historical data into time-partitioned BigQuery tables</h3>
*
* <p>To load historical data into a time-partitioned BigQuery table, specify {@link
Expand Down Expand Up @@ -1331,6 +1368,7 @@ public static <T> Write<T> write() {
.setMaxFilesPerPartition(BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION)
.setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION)
.setOptimizeWrites(false)
.setUseBeamSchema(false)
.build();
}

Expand Down Expand Up @@ -1452,6 +1490,8 @@ public enum Method {

abstract Boolean getOptimizeWrites();

abstract Boolean getUseBeamSchema();

abstract Builder<T> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1511,6 +1551,8 @@ abstract Builder<T> setTableFunction(

abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);

abstract Builder<T> setUseBeamSchema(Boolean useBeamSchema);

abstract Write<T> build();
}

Expand Down Expand Up @@ -1618,7 +1660,6 @@ public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) {

/** Formats the user's type into a {@link TableRow} to be written to BigQuery. */
public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {
checkArgument(formatFunction != null, "formatFunction can not be null");
return toBuilder().setFormatFunction(formatFunction).build();
}

Expand Down Expand Up @@ -1826,10 +1867,20 @@ public Write<T> withKmsKey(String kmsKey) {
* BigQuery. Not enabled by default in order to maintain backwards compatibility.
*/
@Experimental
public Write<T> withOptimizedWrites() {
public Write<T> optimizedWrites() {
return toBuilder().setOptimizeWrites(true).build();
}

/**
* If true, then the BigQuery schema will be inferred from the input schema. If no
* formatFunction is set, then BigQueryIO will automatically turn the input records into
* TableRows that match the schema.
*/
@Experimental
public Write<T> useBeamSchema() {
return toBuilder().setUseBeamSchema(true).build();
}

@VisibleForTesting
/** This method is for test usage only */
public Write<T> withTestServices(BigQueryServices testServices) {
Expand Down Expand Up @@ -1910,19 +1961,6 @@ public WriteResult expand(PCollection<T> input) {
|| getDynamicDestinations() != null,
"must set the table reference of a BigQueryIO.Write transform");

checkArgument(
getFormatFunction() != null,
"A function must be provided to convert type into a TableRow. "
+ "use BigQueryIO.Write.withFormatFunction to provide a formatting function.");

// Require a schema if creating one or more tables.
checkArgument(
getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
|| getJsonSchema() != null
|| getDynamicDestinations() != null
|| getSchemaFromView() != null,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");

List<?> allToArgs =
Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());
checkArgument(
Expand Down Expand Up @@ -1995,15 +2033,48 @@ public WriteResult expand(PCollection<T> input) {
// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
if (getJsonTimePartitioning() != null) {
dynamicDestinations =
new ConstantTimePartitioningDestinations(
dynamicDestinations, getJsonTimePartitioning());
new ConstantTimePartitioningDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getJsonTimePartitioning());
}
}
return expandTyped(input, dynamicDestinations);
}

private <DestinationT> WriteResult expandTyped(
PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
boolean optimizeWrites = getOptimizeWrites();
SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
if (getUseBeamSchema()) {
checkArgument(input.hasSchema());
optimizeWrites = true;
if (formatFunction == null) {
// If no format function set, then we will automatically convert the input type to a
// TableRow.
formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
}
// Infer the TableSchema from the input Beam schema.
TableSchema tableSchema = BigQueryUtils.toTableSchema(input.getSchema());
dynamicDestinations =
new ConstantSchemaDestinations<>(
dynamicDestinations,
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
} else {
// Require a schema if creating one or more tables.
checkArgument(
getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED
|| getJsonSchema() != null
|| getDynamicDestinations() != null
|| getSchemaFromView() != null,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
}

checkArgument(
formatFunction != null,
"A function must be provided to convert type into a TableRow. "
+ "use BigQueryIO.Write.withFormatFunction to provide a formatting function."
+ "A format function is not required if Beam schemas are used.");

Coder<DestinationT> destinationCoder = null;
try {
destinationCoder =
Expand All @@ -2014,7 +2085,7 @@ private <DestinationT> WriteResult expandTyped(
}

Method method = resolveMethod(input);
if (getOptimizeWrites()) {
if (optimizeWrites) {
PCollection<KV<DestinationT, T>> rowsWithDestination =
input
.apply(
Expand All @@ -2026,12 +2097,12 @@ private <DestinationT> WriteResult expandTyped(
input.getCoder(),
destinationCoder,
dynamicDestinations,
getFormatFunction(),
formatFunction,
method);
} else {
PCollection<KV<DestinationT, TableRow>> rowsWithDestination =
input
.apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, getFormatFunction()))
.apply("PrepareWrite", new PrepareWrite<>(dynamicDestinations, formatFunction))
.setCoder(KvCoder.of(destinationCoder, TableRowJsonCoder.of()));
return continueExpandTyped(
rowsWithDestination,
Expand Down
Loading

0 comments on commit 90bf972

Please sign in to comment.