Skip to content

Commit

Permalink
Merge pull request apache#11560 from [BEAM-9886] Auto-inferring proje…
Browse files Browse the repository at this point in the history
…ct for ReadFromBigQuery

* Auto-inferring project for ReadFromBigQuery

* Fixup

* fixup

* fixup

* Formatter fixup

* Fixup

* fixformat

* Fix coder issue

* fixup

* Fix lint

* Fixup
  • Loading branch information
pabloem committed May 5, 2020
1 parent 0c6155e commit 34c58c4
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def run(argv=None):
with beam.Pipeline(argv=pipeline_args) as p:

# Read the table rows into a PCollection.
rows = p | 'read' >> beam.io.Read(beam.io.BigQuerySource(known_args.input))
rows = p | 'read' >> beam.io.ReadFromBigQuery(table=known_args.input)
counts = count_tornadoes(rows)

# Write the output using a "Write" transform that has side effects.
Expand Down
40 changes: 29 additions & 11 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,6 @@ def reader(self, test_bigquery_client=None):
FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')


def _to_bool(value):
return value == 'true'


def _to_decimal(value):
return decimal.Decimal(value)

Expand All @@ -547,7 +543,6 @@ def __init__(self, table_schema):
'INTEGER': int,
'INT64': int,
'FLOAT': float,
'BOOLEAN': _to_bool,
'NUMERIC': _to_decimal,
'BYTES': _to_bytes,
}
Expand Down Expand Up @@ -607,6 +602,7 @@ def __init__(
project=None,
query=None,
validate=False,
pipeline_options=None,
coder=None,
use_standard_sql=False,
flatten_results=True,
Expand Down Expand Up @@ -637,6 +633,15 @@ def __init__(
self.coder = coder or _JsonToDictCoder
self.kms_key = kms_key
self.split_result = None
self.options = pipeline_options

def display_data(self):
return {
'table': str(self.table_reference),
'query': str(self.query),
'project': str(self.project),
'use_legacy_sql': self.use_legacy_sql,
}

def estimate_size(self):
bq = bigquery_tools.BigQueryWrapper()
Expand All @@ -654,8 +659,9 @@ def estimate_size(self):
table_ref.projectId, table_ref.datasetId, table_ref.tableId)
return int(table.numBytes)
elif self.query is not None and self.query.is_accessible():
project = self._get_project()
job = bq._start_query_job(
self.project,
project,
self.query.get(),
self.use_legacy_sql,
self.flatten_results,
Expand All @@ -669,6 +675,16 @@ def estimate_size(self):
# no access to the query that we're running.
return None

def _get_project(self):
"""Returns the project that queries and exports will be billed to."""

project = self.options.view_as(GoogleCloudOptions).project
if isinstance(project, vp.ValueProvider):
project = project.get()
if not project:
project = self.project
return project

def split(self, desired_bundle_size, start_position=None, stop_position=None):
if self.split_result is None:
bq = bigquery_tools.BigQueryWrapper()
Expand All @@ -687,7 +703,7 @@ def split(self, desired_bundle_size, start_position=None, stop_position=None):
self.coder(schema)) for metadata in metadata_list
]
if self.query is not None:
bq.clean_up_temporary_dataset(self.project)
bq.clean_up_temporary_dataset(self._get_project())

for source in self.split_result:
yield SourceBundle(0, source, None, None)
Expand All @@ -709,21 +725,21 @@ def read(self, range_tracker):
@check_accessible(['query'])
def _setup_temporary_dataset(self, bq):
location = bq.get_query_location(
self.project, self.query.get(), self.use_legacy_sql)
bq.create_temporary_dataset(self.project, location)
self._get_project(), self.query.get(), self.use_legacy_sql)
bq.create_temporary_dataset(self._get_project(), location)

@check_accessible(['query'])
def _execute_query(self, bq):
job = bq._start_query_job(
self.project,
self._get_project(),
self.query.get(),
self.use_legacy_sql,
self.flatten_results,
job_id=uuid.uuid4().hex,
kms_key=self.kms_key)
job_ref = job.jobReference
bq.wait_for_bq_job(job_ref)
return bq._get_temp_table(self.project)
return bq._get_temp_table(self._get_project())

def _export_files(self, bq):
"""Runs a BigQuery export job.
Expand All @@ -736,6 +752,7 @@ def _export_files(self, bq):
job_id,
self.table_reference,
bigquery_tools.FileFormat.JSON,
project=self._get_project(),
include_header=False)
bq.wait_for_bq_job(job_ref)
metadata_list = FileSystems.match([self.gcs_location])[0].metadata_list
Expand Down Expand Up @@ -1638,6 +1655,7 @@ def process(self, unused_element, signal):
_CustomBigQuerySource(
gcs_location=gcs_location,
validate=self.validate,
pipeline_options=pcoll.pipeline.options,
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(RemoveJsonFiles(gcs_location)))
7 changes: 4 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,17 +695,18 @@ def perform_extract_job(
job_id,
table_reference,
destination_format,
project=None,
include_header=True,
compression=ExportCompression.NONE):
"""Starts a job to export data from BigQuery.
Returns:
bigquery.JobReference with the information about the job that was started.
"""
job_reference = bigquery.JobReference(
jobId=job_id, projectId=table_reference.projectId)
job_project = project or table_reference.projectId
job_reference = bigquery.JobReference(jobId=job_id, projectId=job_project)
request = bigquery.BigqueryJobsInsertRequest(
projectId=table_reference.projectId,
projectId=job_project,
job=bigquery.Job(
configuration=bigquery.JobConfiguration(
extract=bigquery.JobConfigurationExtract(
Expand Down

0 comments on commit 34c58c4

Please sign in to comment.