From 2ddb9c024193aeeb3d94326fb2df30459c787a06 Mon Sep 17 00:00:00 2001 From: Pablo Estrada Date: Wed, 13 May 2020 14:16:44 -0700 Subject: [PATCH] [BEAM-9967] Adding support for BQ labels on Query/Export jobs. (Roll forward) --- sdks/python/apache_beam/io/gcp/bigquery.py | 22 +++++++--- .../io/gcp/bigquery_read_it_test.py | 5 ++- .../apache_beam/io/gcp/bigquery_tools.py | 42 +++++++++++++++---- 3 files changed, 54 insertions(+), 15 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index d56fe12cce0e8..130b473300f3c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -606,7 +606,8 @@ def __init__( coder=None, use_standard_sql=False, flatten_results=True, - kms_key=None): + kms_key=None, + bigquery_job_labels=None): if table is not None and query is not None: raise ValueError( 'Both a BigQuery table and a query were specified.' @@ -634,6 +635,7 @@ def __init__( self.kms_key = kms_key self.split_result = None self.options = pipeline_options + self.bigquery_job_labels = bigquery_job_labels or {} def display_data(self): return { @@ -641,6 +643,7 @@ def display_data(self): 'query': str(self.query), 'project': str(self.project), 'use_legacy_sql': self.use_legacy_sql, + 'bigquery_job_labels': json.dumps(self.bigquery_job_labels), } def estimate_size(self): @@ -667,7 +670,8 @@ def estimate_size(self): self.flatten_results, job_id=uuid.uuid4().hex, dry_run=True, - kms_key=self.kms_key) + kms_key=self.kms_key, + job_labels=self.bigquery_job_labels) size = int(job.statistics.totalBytesProcessed) return size else: @@ -736,7 +740,8 @@ def _execute_query(self, bq): self.use_legacy_sql, self.flatten_results, job_id=uuid.uuid4().hex, - kms_key=self.kms_key) + kms_key=self.kms_key, + job_labels=self.bigquery_job_labels) job_ref = job.jobReference bq.wait_for_bq_job(job_ref, max_retries=0) return bq._get_temp_table(self._get_project()) @@ -753,8 +758,9 @@ def _export_files(self, bq): self.table_reference, bigquery_tools.FileFormat.JSON, project=self._get_project(), - include_header=False) - bq.wait_for_bq_job(job_ref, max_retries=0) + include_header=False, + job_labels=self.bigquery_job_labels) + bq.wait_for_bq_job(job_ref) metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list if isinstance(self.table_reference, vp.ValueProvider): @@ -1589,7 +1595,11 @@ class ReadFromBigQuery(PTransform): bucket where the extracted table should be written as a string or a :class:`~apache_beam.options.value_provider.ValueProvider`. If :data:`None`, then the temp_location parameter is used. - """ + bigquery_job_labels (dict): A dictionary with string labels to be passed + to BigQuery export and query jobs created by this transform. See: + https://cloud.google.com/bigquery/docs/reference/rest/v2/\ + Job#JobConfiguration + """ def __init__(self, gcs_location=None, validate=False, *args, **kwargs): if gcs_location: if not isinstance(gcs_location, (str, unicode, ValueProvider)): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index 98d12413871d7..50ebcdcd8fe87 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -274,7 +274,10 @@ def test_iobase_source(self): with beam.Pipeline(argv=self.args) as p: result = ( p | 'read' >> beam.io.ReadFromBigQuery( - query=self.query, use_standard_sql=True, project=self.project)) + query=self.query, + use_standard_sql=True, + project=self.project, + bigquery_job_labels={'launcher': 'apache_beam_tests'})) assert_that(result, equal_to(self.get_expected_data())) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 2b6c959fac4c6..b036dc0568f45 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -218,6 +218,20 @@ def parse_table_reference(table, dataset=None, project=None): # BigQueryWrapper. +def _build_job_labels(input_labels): + """Builds job label protobuf structure.""" + input_labels = input_labels or {} + result = bigquery.JobConfiguration.LabelsValue() + + for k, v in input_labels.items(): + result.additionalProperties.append( + bigquery.JobConfiguration.LabelsValue.AdditionalProperty( + key=k, + value=v, + )) + return result + + class BigQueryWrapper(object): """BigQuery client wrapper with utilities for querying. @@ -320,7 +334,8 @@ def _insert_copy_job( from_table_reference, to_table_reference, create_disposition=None, - write_disposition=None): + write_disposition=None, + job_labels=None): reference = bigquery.JobReference() reference.jobId = job_id reference.projectId = project_id @@ -333,7 +348,9 @@ def _insert_copy_job( sourceTable=from_table_reference, createDisposition=create_disposition, writeDisposition=write_disposition, - )), + ), + labels=_build_job_labels(job_labels), + ), jobReference=reference, )) @@ -355,7 +372,8 @@ def _insert_load_job( write_disposition=None, create_disposition=None, additional_load_parameters=None, - source_format=None): + source_format=None, + job_labels=None): additional_load_parameters = additional_load_parameters or {} job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema reference = bigquery.JobReference(jobId=job_id, projectId=project_id) @@ -372,7 +390,9 @@ def _insert_load_job( sourceFormat=source_format, useAvroLogicalTypes=True, autodetect=schema == 'SCHEMA_AUTODETECT', - **additional_load_parameters)), + **additional_load_parameters), + labels=_build_job_labels(job_labels), + ), jobReference=reference, )) response = self.client.jobs.Insert(request) @@ -389,7 +409,8 @@ def _start_query_job( flatten_results, job_id, dry_run=False, - kms_key=None): + kms_key=None, + job_labels=None): reference = bigquery.JobReference(jobId=job_id, projectId=project_id) request = bigquery.BigqueryJobsInsertRequest( projectId=project_id, @@ -404,7 +425,9 @@ def _start_query_job( if not dry_run else None, flattenResults=flatten_results, destinationEncryptionConfiguration=bigquery. - EncryptionConfiguration(kmsKeyName=kms_key))), + EncryptionConfiguration(kmsKeyName=kms_key)), + labels=_build_job_labels(job_labels), + ), jobReference=reference)) response = self.client.jobs.Insert(request) @@ -696,7 +719,8 @@ def perform_extract_job( destination_format, project=None, include_header=True, - compression=ExportCompression.NONE): + compression=ExportCompression.NONE, + job_labels=None): """Starts a job to export data from BigQuery. Returns: @@ -714,7 +738,9 @@ def perform_extract_job( printHeader=include_header, destinationFormat=destination_format, compression=compression, - )), + ), + labels=_build_job_labels(job_labels), + ), jobReference=job_reference, )) response = self.client.jobs.Insert(request)