diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java index 6f3bc0a94b9cd..faaf11ac4f652 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java @@ -552,6 +552,17 @@ private PCollection> writeTempTables( ShardedKeyCoder.of(NullableCoder.of(destinationCoder)), ListCoder.of(StringUtf8Coder.of())); + // If the final destination table exists already (and we're appending to it), then the temp + // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object + // with one that makes this happen. + @SuppressWarnings("unchecked") + DynamicDestinations destinations = dynamicDestinations; + if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED) + || createDisposition.equals(CreateDisposition.CREATE_NEVER)) { + destinations = + DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices); + } + // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then // the import needs to be split into multiple partitions, and those partitions will be @@ -570,7 +581,7 @@ private PCollection> writeTempTables( WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED, sideInputs, - dynamicDestinations, + destinations, loadJobProjectId, maxRetryJobs, ignoreUnknownValues, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java index ea3d4352b4567..560d51becb1c0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TypeDescriptor; @@ -78,6 +79,7 @@ interface SideInputAccessor { } @Nullable private transient SideInputAccessor sideInputAccessor; + @Nullable private transient PipelineOptions options; static class SideInputAccessorViaProcessContext implements SideInputAccessor { private DoFn.ProcessContext processContext; @@ -92,6 +94,12 @@ public SideInputT sideInput(PCollectionView view) { } } + /** Get the current PipelineOptions if set. */ + @Nullable + PipelineOptions getPipelineOptions() { + return options; + } + /** * Specifies that this object needs access to one or more side inputs. This side inputs must be * globally windowed, as they will be accessed from the global window. @@ -114,6 +122,7 @@ protected final SideInputT sideInput(PCollectionView vi void setSideInputAccessorFromProcessContext(DoFn.ProcessContext context) { this.sideInputAccessor = new SideInputAccessorViaProcessContext(context); + this.options = context.getPipelineOptions(); } /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index d006220e40bbe..4895004e71bff 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -19,7 +19,13 @@ import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableSchema; +import java.io.IOException; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -31,13 +37,20 @@ import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.BackOffAdapter; +import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Contains some useful helper instances of {@link DynamicDestinations}. */ class DynamicDestinationsHelpers { + private static final Logger LOG = LoggerFactory.getLogger(DynamicDestinationsHelpers.class); + /** Always returns a constant table destination. */ static class ConstantTableDestinations extends DynamicDestinations { private final ValueProvider tableSpec; @@ -285,4 +298,81 @@ public String toString() { .toString(); } } + + static DynamicDestinations matchTableDynamicDestinations( + DynamicDestinations inner, BigQueryServices bqServices) { + return new MatchTableDynamicDestinations<>(inner, bqServices); + } + + static class MatchTableDynamicDestinations + extends DelegatingDynamicDestinations { + private final BigQueryServices bqServices; + + private MatchTableDynamicDestinations( + DynamicDestinations inner, BigQueryServices bqServices) { + super(inner); + this.bqServices = bqServices; + } + + private Table getBigQueryTable(TableReference tableReference) { + BackOff backoff = + BackOffAdapter.toGcpBackOff( + FluentBackoff.DEFAULT + .withMaxRetries(3) + .withInitialBackoff(Duration.standardSeconds(1)) + .withMaxBackoff(Duration.standardSeconds(2)) + .backoff()); + try { + do { + try { + BigQueryOptions bqOptions = getPipelineOptions().as(BigQueryOptions.class); + return bqServices.getDatasetService(bqOptions).getTable(tableReference); + } catch (InterruptedException | IOException e) { + LOG.info("Failed to get BigQuery table " + tableReference); + } + } while (nextBackOff(Sleeper.DEFAULT, backoff)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; + } + + /** Identical to {@link BackOffUtils#next} but without checked IOException. */ + private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) + throws InterruptedException { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** Returns a {@link TableDestination} object for the destination. May not return null. */ + @Override + public TableDestination getTable(DestinationT destination) { + TableDestination wrappedDestination = super.getTable(destination); + Table existingTable = getBigQueryTable(wrappedDestination.getTableReference()); + + if (existingTable == null) { + return wrappedDestination; + } else { + return new TableDestination( + wrappedDestination.getTableSpec(), + existingTable.getDescription(), + existingTable.getTimePartitioning()); + } + } + + /** Returns the table schema for the destination. May not return null. */ + @Override + public TableSchema getSchema(DestinationT destination) { + TableDestination wrappedDestination = super.getTable(destination); + Table existingTable = getBigQueryTable(wrappedDestination.getTableReference()); + if (existingTable == null) { + return super.getSchema(destination); + } else { + return existingTable.getSchema(); + } + } + } }