Skip to content

Commit

Permalink
Revert "Merge pull request apache#11673 from [BEAM-9967] Adding suppo…
Browse files Browse the repository at this point in the history
…rt for BQ labels on Query/Export jobs"

This reverts commit 820f0f5.
  • Loading branch information
pabloem committed May 13, 2020
1 parent a543a2e commit f399e02
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 40 deletions.
22 changes: 6 additions & 16 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,7 @@ def __init__(
coder=None,
use_standard_sql=False,
flatten_results=True,
kms_key=None,
bigquery_job_labels=None):
kms_key=None):
if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
Expand Down Expand Up @@ -635,15 +634,13 @@ def __init__(
self.kms_key = kms_key
self.split_result = None
self.options = pipeline_options
self.bigquery_job_labels = bigquery_job_labels or {}

def display_data(self):
return {
'table': str(self.table_reference),
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
'bigquery_job_labels': json.dumps(self.bigquery_job_labels),
}

def estimate_size(self):
Expand All @@ -670,8 +667,7 @@ def estimate_size(self):
self.flatten_results,
job_id=uuid.uuid4().hex,
dry_run=True,
kms_key=self.kms_key,
job_labels=self.bigquery_job_labels)
kms_key=self.kms_key)
size = int(job.statistics.totalBytesProcessed)
return size
else:
Expand Down Expand Up @@ -740,8 +736,7 @@ def _execute_query(self, bq):
self.use_legacy_sql,
self.flatten_results,
job_id=uuid.uuid4().hex,
kms_key=self.kms_key,
job_labels=self.bigquery_job_labels)
kms_key=self.kms_key)
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref, max_retries=0)
return bq._get_temp_table(self._get_project())
Expand All @@ -758,9 +753,8 @@ def _export_files(self, bq):
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
include_header=False,
job_labels=self.bigquery_job_labels)
bq.wait_for_bq_job(job_ref)
include_header=False)
bq.wait_for_bq_job(job_ref, max_retries=0)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list

if isinstance(self.table_reference, vp.ValueProvider):
Expand Down Expand Up @@ -1595,11 +1589,7 @@ class ReadFromBigQuery(PTransform):
bucket where the extracted table should be written as a string or
a :class:`~apache_beam.options.value_provider.ValueProvider`. If
:data:`None`, then the temp_location parameter is used.
bigquery_job_labels (dict): A dictionary with string labels to be passed
to BigQuery export and query jobs created by this transform. See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/\
Job#JobConfiguration
"""
"""
def __init__(self, gcs_location=None, validate=False, *args, **kwargs):
if gcs_location:
if not isinstance(gcs_location, (str, unicode, ValueProvider)):
Expand Down
5 changes: 1 addition & 4 deletions sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,7 @@ def test_iobase_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
p | 'read' >> beam.io.ReadFromBigQuery(
query=self.query,
use_standard_sql=True,
project=self.project,
bigquery_job_labels={'owner': 'apache_beam_tests'}))
query=self.query, use_standard_sql=True, project=self.project))
assert_that(result, equal_to(self.get_expected_data()))


Expand Down
28 changes: 8 additions & 20 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,7 @@ def _insert_copy_job(
from_table_reference,
to_table_reference,
create_disposition=None,
write_disposition=None,
job_labels=None):
write_disposition=None):
reference = bigquery.JobReference()
reference.jobId = job_id
reference.projectId = project_id
Expand All @@ -334,9 +333,7 @@ def _insert_copy_job(
sourceTable=from_table_reference,
createDisposition=create_disposition,
writeDisposition=write_disposition,
),
labels=job_labels or {},
),
)),
jobReference=reference,
))

Expand All @@ -358,8 +355,7 @@ def _insert_load_job(
write_disposition=None,
create_disposition=None,
additional_load_parameters=None,
source_format=None,
job_labels=None):
source_format=None):
additional_load_parameters = additional_load_parameters or {}
job_schema = None if schema == 'SCHEMA_AUTODETECT' else schema
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
Expand All @@ -376,9 +372,7 @@ def _insert_load_job(
sourceFormat=source_format,
useAvroLogicalTypes=True,
autodetect=schema == 'SCHEMA_AUTODETECT',
**additional_load_parameters),
labels=job_labels or {},
),
**additional_load_parameters)),
jobReference=reference,
))
response = self.client.jobs.Insert(request)
Expand All @@ -395,8 +389,7 @@ def _start_query_job(
flatten_results,
job_id,
dry_run=False,
kms_key=None,
job_labels=None):
kms_key=None):
reference = bigquery.JobReference(jobId=job_id, projectId=project_id)
request = bigquery.BigqueryJobsInsertRequest(
projectId=project_id,
Expand All @@ -411,9 +404,7 @@ def _start_query_job(
if not dry_run else None,
flattenResults=flatten_results,
destinationEncryptionConfiguration=bigquery.
EncryptionConfiguration(kmsKeyName=kms_key)),
labels=job_labels or {},
),
EncryptionConfiguration(kmsKeyName=kms_key))),
jobReference=reference))

response = self.client.jobs.Insert(request)
Expand Down Expand Up @@ -705,8 +696,7 @@ def perform_extract_job(
destination_format,
project=None,
include_header=True,
compression=ExportCompression.NONE,
job_labels=None):
compression=ExportCompression.NONE):
"""Starts a job to export data from BigQuery.
Returns:
Expand All @@ -724,9 +714,7 @@ def perform_extract_job(
printHeader=include_header,
destinationFormat=destination_format,
compression=compression,
),
labels=job_labels or {},
),
)),
jobReference=job_reference,
))
response = self.client.jobs.Insert(request)
Expand Down

0 comments on commit f399e02

Please sign in to comment.