Skip to content

Commit

Permalink
Add storage api support for reading table for query function in bigquery
Browse files Browse the repository at this point in the history
  • Loading branch information
krvikash committed Jul 4, 2024
1 parent d0cf5ec commit 1cd3b48
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobException;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
Expand Down Expand Up @@ -399,6 +400,36 @@ public Schema getSchema(String sql)
return requireNonNull(queryStatistics.getSchema(), "Cannot determine schema for query");
}

public boolean useStorageApi(String sql, TableId destinationTable)
{
JobInfo jobInfo = JobInfo.of(QueryJobConfiguration.newBuilder(sql).setDryRun(true).setDestinationTable(destinationTable).build());
try {
bigQuery.create(jobInfo);
}
catch (BigQueryException e) {
if (e.getMessage().startsWith("Duplicate column names in the result are not supported when a destination table is present.")) {
return false;
}
}
return true;
}

public TableId getDestinationTable(String sql)
{
log.debug("Get destination table from query: %s", sql);
JobInfo jobInfo = JobInfo.of(QueryJobConfiguration.newBuilder(sql).setDryRun(true).build());

JobConfiguration jobConfiguration;
try {
jobConfiguration = bigQuery.create(jobInfo).getConfiguration();
}
catch (BigQueryException e) {
throw new TrinoException(BIGQUERY_INVALID_STATEMENT, "Failed to get destination table for query: " + sql, e);
}

return requireNonNull(((QueryJobConfiguration) jobConfiguration).getDestinationTable(), "Cannot determine destination table for query");
}

public static String selectSql(TableId table, List<String> requiredColumns, Optional<String> filter)
{
String columns = requiredColumns.stream().map(column -> format("`%s`", column)).collect(joining(","));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
import java.util.Objects;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class BigQueryQueryRelationHandle
extends BigQueryRelationHandle
{
private final String query;
private final RemoteTableName destinationTableName;
private final boolean useStorageApi;

@JsonCreator
public BigQueryQueryRelationHandle(String query)
public BigQueryQueryRelationHandle(String query, RemoteTableName destinationTableName, boolean useStorageApi)
{
this.query = query;
this.destinationTableName = requireNonNull(destinationTableName, "destinationTableName is null");
this.useStorageApi = useStorageApi;
}

@JsonProperty
Expand All @@ -37,10 +42,22 @@ public String getQuery()
return query;
}

@JsonProperty
public RemoteTableName getDestinationTableName()
{
return destinationTableName;
}

@JsonProperty
public boolean isUseStorageApi()
{
return useStorageApi;
}

@Override
public String toString()
{
return format("Query[%s]", query);
return format("Query[%s], Destination table[%s], Api[%s]", query, destinationTableName, useStorageApi ? "Storage" : "Rest");
}

@Override
Expand All @@ -53,12 +70,14 @@ public boolean equals(Object o)
return false;
}
BigQueryQueryRelationHandle that = (BigQueryQueryRelationHandle) o;
return query.equals(that.query);
return query.equals(that.query)
&& destinationTableName.equals(that.destinationTableName)
&& useStorageApi == that.useStorageApi;
}

@Override
public int hashCode()
{
return Objects.hash(query);
return Objects.hash(query, destinationTableName, useStorageApi);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -104,9 +105,30 @@ public ConnectorSplitSource getSplits(
TupleDomain<ColumnHandle> tableConstraint = bigQueryTableHandle.constraint();
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);

if (!bigQueryTableHandle.isNamedRelation()) {
if (bigQueryTableHandle.isQueryRelation()) {
BigQueryQueryRelationHandle bigQueryQueryRelationHandle = bigQueryTableHandle.getRequiredQueryRelation();
List<BigQueryColumnHandle> columns = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of());
return new FixedSplitSource(BigQuerySplit.forViewStream(columns, filter));
boolean useStorageApi = bigQueryQueryRelationHandle.isUseStorageApi();
List<String> projectedColumnsNames = getProjectedColumnNames(columns);

String query = filter
.map(whereClause -> "SELECT " + String.join(",", projectedColumnsNames) + " FROM (" + bigQueryQueryRelationHandle.getQuery() + ") WHERE " + whereClause)
.orElseGet(bigQueryQueryRelationHandle::getQuery);

if (!useStorageApi) {
log.debug("Using Rest API for running query: %s", query);
return new FixedSplitSource(BigQuerySplit.forViewStream(columns, filter));
}

TableId destinationTable = bigQueryQueryRelationHandle.getDestinationTableName().toTableId();
TableInfo tableInfo = new ViewMaterializationCache.DestinationTableBuilder(bigQueryClientFactory.create(session), viewExpiration, query, destinationTable).get();

log.debug("Using Storage API for running query: %s", query);
// filter is already used while constructing the select query
ReadSession readSession = createReadSession(session, tableInfo.getTableId(), ImmutableList.copyOf(projectedColumnsNames), Optional.empty());
return new FixedSplitSource(readSession.getStreamsList().stream()
.map(stream -> BigQuerySplit.forStream(stream.getName(), getSchemaAsString(readSession), columns, OptionalInt.of(stream.getSerializedSize())))
.collect(toImmutableList()));
}

TableId remoteTableId = bigQueryTableHandle.asPlainTable().getRemoteTableName().toTableId();
Expand Down Expand Up @@ -134,7 +156,7 @@ private List<BigQuerySplit> readFromBigQuery(

log.debug("readFromBigQuery(tableId=%s, projectedColumns=%s, filter=[%s])", remoteTableId, projectedColumns, filter);
List<BigQueryColumnHandle> columns = projectedColumns.get();
List<String> projectedColumnsNames = new ArrayList<>(columns.stream().map(BigQueryColumnHandle::name).toList());
List<String> projectedColumnsNames = new ArrayList<>(getProjectedColumnNames(columns));

if (isWildcardTable(type, remoteTableId.getTable())) {
// Storage API doesn't support reading wildcard tables
Expand Down Expand Up @@ -168,6 +190,11 @@ ReadSession createReadSession(ConnectorSession session, TableId remoteTableId, L
return readSessionCreator.create(session, remoteTableId, projectedColumnsNames, filter, nodeManager.getRequiredWorkerNodes().size());
}

private static List<String> getProjectedColumnNames(List<BigQueryColumnHandle> columns)
{
return columns.stream().map(BigQueryColumnHandle::name).collect(toImmutableList());
}

private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, TableDefinition.Type tableType, TableId remoteTableId, Optional<String> filter)
{
if (!TABLE_TYPES.contains(tableType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ public BigQueryNamedRelationHandle getRequiredNamedRelation()
return (BigQueryNamedRelationHandle) relationHandle;
}

@JsonIgnore
public BigQueryQueryRelationHandle getRequiredQueryRelation()
{
checkState(isQueryRelation(), "The table handle does not represent a query relation: %s", this);
return (BigQueryQueryRelationHandle) relationHandle;
}

@JsonIgnore
public boolean isSynthetic()
{
Expand All @@ -60,6 +67,12 @@ public boolean isNamedRelation()
return relationHandle instanceof BigQueryNamedRelationHandle;
}

@JsonIgnore
public boolean isQueryRelation()
{
return relationHandle instanceof BigQueryQueryRelationHandle;
}

public BigQueryNamedRelationHandle asPlainTable()
{
checkState(!isSynthetic(), "The table handle does not represent a plain table: %s", this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private TableId createDestinationTable(TableId remoteTableId)
return TableId.of(project, dataset, name);
}

private static class DestinationTableBuilder
public static class DestinationTableBuilder
implements Supplier<TableInfo>
{
private final BigQueryClient bigQueryClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.google.inject.Provider;
Expand All @@ -27,6 +28,7 @@
import io.trino.plugin.bigquery.BigQueryQueryRelationHandle;
import io.trino.plugin.bigquery.BigQueryTableHandle;
import io.trino.plugin.bigquery.BigQueryTypeManager;
import io.trino.plugin.bigquery.RemoteTableName;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorAccessControl;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -48,10 +50,14 @@
import java.util.Optional;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.bigquery.ViewMaterializationCache.TEMP_TABLE_PREFIX;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.function.table.ReturnTypeSpecification.GenericTable.GENERIC_TABLE;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.toList;

public class Query
Expand Down Expand Up @@ -107,9 +113,11 @@ public TableFunctionAnalysis analyze(
String query = ((Slice) argument.getValue()).toStringUtf8();

BigQueryClient client = clientFactory.create(session);
TableId destinationTable = buildDestinationTable(client.getDestinationTable(query));
boolean useStorageApi = client.useStorageApi(query, destinationTable);
Schema schema = client.getSchema(query);

BigQueryQueryRelationHandle queryRelationHandle = new BigQueryQueryRelationHandle(query);
BigQueryQueryRelationHandle queryRelationHandle = new BigQueryQueryRelationHandle(query, new RemoteTableName(destinationTable), useStorageApi);
BigQueryTableHandle tableHandle = new BigQueryTableHandle(queryRelationHandle, TupleDomain.all(), Optional.empty());

ImmutableList.Builder<BigQueryColumnHandle> columnsBuilder = ImmutableList.builderWithExpectedSize(schema.getFields().size());
Expand All @@ -134,6 +142,15 @@ public TableFunctionAnalysis analyze(
}
}

private static TableId buildDestinationTable(TableId remoteTableId)
{
String project = remoteTableId.getProject();
String dataset = remoteTableId.getDataset();

String name = format("%s%s", TEMP_TABLE_PREFIX, randomUUID().toString().toLowerCase(ENGLISH).replace("-", ""));
return TableId.of(project, dataset, name);
}

public static class QueryHandle
implements ConnectorTableFunctionHandle
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ public void testNativeQueryInsertStatementTableDoesNotExist()
assertThat(getQueryRunner().tableExists(getSession(), tableName)).isFalse();
assertThat(query("SELECT * FROM TABLE(bigquery.system.query(query => 'INSERT INTO test." + tableName + " VALUES (1)'))"))
.failure()
.hasMessageContaining("Failed to get schema for query")
.hasMessageContaining("Failed to get destination table for query")
.hasStackTraceContaining("%s was not found", tableName);
}

Expand All @@ -1064,7 +1064,7 @@ public void testNativeQueryInsertStatementTableExists()
public void testNativeQueryIncorrectSyntax()
{
assertThat(query("SELECT * FROM TABLE(system.query(query => 'some wrong syntax'))"))
.failure().hasMessageContaining("Failed to get schema for query");
.failure().hasMessageContaining("Failed to get destination table for query");
}

@Test
Expand Down

0 comments on commit 1cd3b48

Please sign in to comment.