Skip to content

Commit

Permalink
[BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll …
Browse files Browse the repository at this point in the history
…forward)
  • Loading branch information
pabloem committed May 13, 2020
1 parent 485bd08 commit 2ddb9c0
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
22 changes: 16 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,8 @@ def __init__(
coder=None,
use_standard_sql=False,
flatten_results=True,
kms_key=None):
kms_key=None,
bigquery_job_labels=None):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
Expand Down Expand Up @@ -634,13 +635,15 @@ def __init__(
self.kms_key = kms_key
self.split_result = None
self.options = pipeline_options
self.bigquery_job_labels = bigquery_job_labels or {}

def display_data(self):
return {
'table': str(self.table_reference),
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
}

def estimate_size(self):
Expand All @@ -667,7 +670,8 @@ def estimate_size(self):
self.flatten_results,
job_id=uuid.uuid4().hex,
dry_run=True,
kms_key=self.kms_key)
kms_key=self.kms_key,
job_labels=self.bigquery_job_labels)
size = int(job.statistics.totalBytesProcessed)
return size
else:
Expand Down Expand Up @@ -736,7 +740,8 @@ def _execute_query(self, bq):
self.use_legacy_sql,
self.flatten_results,
job_id=uuid.uuid4().hex,
kms_key=self.kms_key)
kms_key=self.kms_key,
job_labels=self.bigquery_job_labels)
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref, max_retries=0)
return bq._get_temp_table(self._get_project())
Expand All @@ -753,8 +758,9 @@ def _export_files(self, bq):
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
include_header=False)
bq.wait_for_bq_job(job_ref, max_retries=0)
include_header=False,
job_labels=self.bigquery_job_labels)
bq.wait_for_bq_job(job_ref)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list

if isinstance(self.table_reference, vp.ValueProvider):
Expand Down Expand Up @@ -1589,7 +1595,11 @@ class ReadFromBigQuery(PTransform):
bucket where the extracted table should be written as a string or
a :class:`~apache_beam.options.value_provider.ValueProvider`. If
:data:`None`, then the temp_location parameter is used.
"""
bigquery_job_labels (dict): A dictionary with string labels to be passed
to BigQuery export and query jobs created by this transform. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/\
Job#JobConfiguration
"""
def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
if gcs_location:
if not isinstance(gcs_location, (str, unicode, ValueProvider)):
Expand Down
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,10 @@ def test_iobase_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
p | 'read' >> beam.io.ReadFromBigQuery(
query=self.query, use_standard_sql=True, project=self.project))
query=self.query,
use_standard_sql=True,
project=self.project,
bigquery_job_labels={'launcher': 'apache_beam_tests'}))
assert_that(result, equal_to(self.get_expected_data()))


Expand Down
42 changes: 34 additions & 8 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,20 @@ def parse_table_reference(table, dataset=None, project=None):
# BigQueryWrapper.


def _build_job_labels(input_labels):
"""Builds job label protobuf structure."""
input_labels = input_labels or {}
result = bigquery.JobConfiguration.LabelsValue()

for k, v in input_labels.items():
result.additionalProperties.append(
bigquery.JobConfiguration.LabelsValue.AdditionalProperty(
key=k,
value=v,
))
return result


class BigQueryWrapper(object):
"""BigQuery client wrapper with utilities for querying.
Expand Down Expand Up @@ -320,7 +334,8 @@ def _insert_copy_job(
from_table_reference,
to_table_reference,
create_disposition=None,
write_disposition=None):
write_disposition=None,
job_labels=None):
reference = bigquery.JobReference()
reference.jobId = job_id
reference.projectId = project_id
Expand All @@ -333,7 +348,9 @@ def _insert_copy_job(
sourceTable=from_table_reference,
createDisposition=create_disposition,
writeDisposition=write_disposition,
)),
),
labels=_build_job_labels(job_labels),
),
jobReference=reference,
))

Expand All @@ -355,7 +372,8 @@ def _insert_load_job(
write_disposition=None,
create_disposition=None,
additional_load_parameters=None,
source_format=None):
source_format=None,
job_labels=None):
additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
Expand All @@ -372,7 +390,9 @@ def _insert_load_job(
sourceFormat=source_format,
useAvroLogicalTypes=True,
autodetect=schema == 'SCHEMA_AUTODETECT',
**additional_load_parameters)),
**additional_load_parameters),
labels=_build_job_labels(job_labels),
),
jobReference=reference,
))
response = self.client.jobs.Insert(request)
Expand All @@ -389,7 +409,8 @@ def _start_query_job(
flatten_results,
job_id,
dry_run=False,
kms_key=None):
kms_key=None,
job_labels=None):
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
Expand All @@ -404,7 +425,9 @@ def _start_query_job(
if not dry_run else None,
flattenResults=flatten_results,
destinationEncryptionConfiguration=bigquery.
EncryptionConfiguration(kmsKeyName=kms_key))),
EncryptionConfiguration(kmsKeyName=kms_key)),
labels=_build_job_labels(job_labels),
),
jobReference=reference))

response = self.client.jobs.Insert(request)
Expand Down Expand Up @@ -696,7 +719,8 @@ def perform_extract_job(
destination_format,
project=None,
include_header=True,
compression=ExportCompression.NONE):
compression=ExportCompression.NONE,
job_labels=None):
"""Starts a job to export data from BigQuery.
Returns:
Expand All @@ -714,7 +738,9 @@ def perform_extract_job(
printHeader=include_header,
destinationFormat=destination_format,
compression=compression,
)),
),
labels=_build_job_labels(job_labels),
),
jobReference=job_reference,
))
response = self.client.jobs.Insert(request)
Expand Down

0 comments on commit 2ddb9c0

Please sign in to comment.