Skip to content

Commit

Permalink
Merge pull request apache#7776: Make sure that BigQuery temp tables m…
Browse files Browse the repository at this point in the history
…atch the final output table.
  • Loading branch information
reuvenlax committed May 30, 2019
1 parent 3455c9f commit de19c75
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,17 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
ListCoder.of(StringUtf8Coder.of()));

// If the final destination table exists already (and we're appending to it), then the temp
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
// with one that makes this happen.
@SuppressWarnings("unchecked")
DynamicDestinations<?, DestinationT> destinations = dynamicDestinations;
if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED)
|| createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
destinations =
DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices);
}

// If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or
// DEFAULT_MAX_BYTES_PER_PARTITION bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
Expand All @@ -570,7 +581,7 @@ private PCollection<KV<TableDestination, String>> writeTempTables(
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
dynamicDestinations,
destinations,
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -78,6 +79,7 @@ interface SideInputAccessor {
}

@Nullable private transient SideInputAccessor sideInputAccessor;
@Nullable private transient PipelineOptions options;

static class SideInputAccessorViaProcessContext implements SideInputAccessor {
private DoFn<?, ?>.ProcessContext processContext;
Expand All @@ -92,6 +94,12 @@ public <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> view) {
}
}

/** Get the current PipelineOptions if set. */
@Nullable
PipelineOptions getPipelineOptions() {
return options;
}

/**
* Specifies that this object needs access to one or more side inputs. This side inputs must be
* globally windowed, as they will be accessed from the global window.
Expand All @@ -114,6 +122,7 @@ protected final <SideInputT> SideInputT sideInput(PCollectionView<SideInputT> vi

void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
this.sideInputAccessor = new SideInputAccessorViaProcessContext(context);
this.options = context.getPipelineOptions();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@

import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -31,13 +37,20 @@
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Contains some useful helper instances of {@link DynamicDestinations}. */
class DynamicDestinationsHelpers {
private static final Logger LOG = LoggerFactory.getLogger(DynamicDestinationsHelpers.class);

/** Always returns a constant table destination. */
static class ConstantTableDestinations<T> extends DynamicDestinations<T, TableDestination> {
private final ValueProvider<String> tableSpec;
Expand Down Expand Up @@ -285,4 +298,81 @@ public String toString() {
.toString();
}
}

static <T, DestinationT> DynamicDestinations<T, DestinationT> matchTableDynamicDestinations(
DynamicDestinations<T, DestinationT> inner, BigQueryServices bqServices) {
return new MatchTableDynamicDestinations<>(inner, bqServices);
}

static class MatchTableDynamicDestinations<T, DestinationT>
extends DelegatingDynamicDestinations<T, DestinationT> {
private final BigQueryServices bqServices;

private MatchTableDynamicDestinations(
DynamicDestinations<T, DestinationT> inner, BigQueryServices bqServices) {
super(inner);
this.bqServices = bqServices;
}

private Table getBigQueryTable(TableReference tableReference) {
BackOff backoff =
BackOffAdapter.toGcpBackOff(
FluentBackoff.DEFAULT
.withMaxRetries(3)
.withInitialBackoff(Duration.standardSeconds(1))
.withMaxBackoff(Duration.standardSeconds(2))
.backoff());
try {
do {
try {
BigQueryOptions bqOptions = getPipelineOptions().as(BigQueryOptions.class);
return bqServices.getDatasetService(bqOptions).getTable(tableReference);
} catch (InterruptedException | IOException e) {
LOG.info("Failed to get BigQuery table " + tableReference);
}
} while (nextBackOff(Sleeper.DEFAULT, backoff));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
}

/** Identical to {@link BackOffUtils#next} but without checked IOException. */
private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
throws InterruptedException {
try {
return BackOffUtils.next(sleeper, backoff);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Returns a {@link TableDestination} object for the destination. May not return null. */
@Override
public TableDestination getTable(DestinationT destination) {
TableDestination wrappedDestination = super.getTable(destination);
Table existingTable = getBigQueryTable(wrappedDestination.getTableReference());

if (existingTable == null) {
return wrappedDestination;
} else {
return new TableDestination(
wrappedDestination.getTableSpec(),
existingTable.getDescription(),
existingTable.getTimePartitioning());
}
}

/** Returns the table schema for the destination. May not return null. */
@Override
public TableSchema getSchema(DestinationT destination) {
TableDestination wrappedDestination = super.getTable(destination);
Table existingTable = getBigQueryTable(wrappedDestination.getTableReference());
if (existingTable == null) {
return super.getSchema(destination);
} else {
return existingTable.getSchema();
}
}
}
}

0 comments on commit de19c75

Please sign in to comment.