Skip to content

Commit

Permalink
[BEAM-8661] Moving io module to have per-module logger
Browse files Browse the repository at this point in the history
  • Loading branch information
lukecwik committed Nov 15, 2019
2 parents 7588387 + c9e2699 commit 8bf9468
Show file tree
Hide file tree
Showing 32 changed files with 211 additions and 127 deletions.
19 changes: 11 additions & 8 deletions sdks/python/apache_beam/io/filebasedsink.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
__all__ = ['FileBasedSink']


_LOGGER = logging.getLogger(__name__)


class FileBasedSink(iobase.Sink):
"""A sink to a GCS or local files.
Expand Down Expand Up @@ -210,7 +213,7 @@ def pre_finalize(self, init_result, writer_results):
for file_metadata in mr.metadata_list]

if dst_glob_files:
logging.warning('Deleting %d existing files in target path matching: %s',
_LOGGER.warning('Deleting %d existing files in target path matching: %s',
len(dst_glob_files), self.shard_name_glob_format)
FileSystems.delete(dst_glob_files)

Expand Down Expand Up @@ -250,12 +253,12 @@ def _check_state_for_finalize_write(self, writer_results, num_shards):
raise BeamIOError('src and dst files do not exist. src: %s, dst: %s' % (
src, dst))
if not src_exists and dst_exists:
logging.debug('src: %s -> dst: %s already renamed, skipping', src, dst)
_LOGGER.debug('src: %s -> dst: %s already renamed, skipping', src, dst)
num_skipped += 1
continue
if (src_exists and dst_exists and
FileSystems.checksum(src) == FileSystems.checksum(dst)):
logging.debug('src: %s == dst: %s, deleting src', src, dst)
_LOGGER.debug('src: %s == dst: %s, deleting src', src, dst)
delete_files.append(src)
continue

Expand Down Expand Up @@ -284,7 +287,7 @@ def finalize_write(self, init_result, writer_results,
for i in range(0, len(dst_files), chunk_size)]

if num_shards_to_finalize:
logging.info(
_LOGGER.info(
'Starting finalize_write threads with num_shards: %d (skipped: %d), '
'batches: %d, num_threads: %d',
num_shards_to_finalize, num_skipped, len(source_file_batch),
Expand All @@ -304,11 +307,11 @@ def _rename_batch(batch):
raise
for (src, dst), exception in iteritems(exp.exception_details):
if exception:
logging.error(('Exception in _rename_batch. src: %s, '
_LOGGER.error(('Exception in _rename_batch. src: %s, '
'dst: %s, err: %s'), src, dst, exception)
exceptions.append(exception)
else:
logging.debug('Rename successful: %s -> %s', src, dst)
_LOGGER.debug('Rename successful: %s -> %s', src, dst)
return exceptions

exception_batches = util.run_using_threadpool(
Expand All @@ -324,10 +327,10 @@ def _rename_batch(batch):
for final_name in dst_files:
yield final_name

logging.info('Renamed %d shards in %.2f seconds.', num_shards_to_finalize,
_LOGGER.info('Renamed %d shards in %.2f seconds.', num_shards_to_finalize,
time.time() - start_time)
else:
logging.warning(
_LOGGER.warning(
'No shards found to finalize. num_shards: %d, skipped: %d',
num_shards, num_skipped)

Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/filebasedsink_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher

_LOGGER = logging.getLogger(__name__)


# TODO: Refactor code so all io tests are using same library
# TestCaseWithTempDirCleanup class.
Expand Down Expand Up @@ -247,7 +249,7 @@ def test_temp_dir_gcs(self):
'gs:https://aaa/bbb', 'gs:https://aaa/bbb/', 'gs:https://aaa', 'gs:https://aaa/', 'gs:https://',
'/')
except ValueError:
logging.debug('Ignoring test since GCP module is not installed')
_LOGGER.debug('Ignoring test since GCP module is not installed')

@mock.patch('apache_beam.io.localfilesystem.os')
def test_temp_dir_local(self, filesystem_os_mock):
Expand Down
17 changes: 10 additions & 7 deletions sdks/python/apache_beam/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@
'ReadMatches']


_LOGGER = logging.getLogger(__name__)


class EmptyMatchTreatment(object):
"""How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms.
Expand Down Expand Up @@ -479,7 +482,7 @@ def expand(self, pcoll):
str,
filesystems.FileSystems.join(temp_location,
'.temp%s' % dir_uid))
logging.info('Added temporary directory %s', self._temp_directory.get())
_LOGGER.info('Added temporary directory %s', self._temp_directory.get())

output = (pcoll
| beam.ParDo(_WriteUnshardedRecordsFn(
Expand Down Expand Up @@ -557,7 +560,7 @@ def process(self,
'',
destination)

logging.info('Moving temporary file %s to dir: %s as %s. Res: %s',
_LOGGER.info('Moving temporary file %s to dir: %s as %s. Res: %s',
r.file_name, self.path.get(), final_file_name, r)

final_full_path = filesystems.FileSystems.join(self.path.get(),
Expand All @@ -570,7 +573,7 @@ def process(self,
except BeamIOError:
# This error is not serious, because it may happen on a retry of the
# bundle. We simply log it.
logging.debug('File %s failed to be copied. This may be due to a bundle'
_LOGGER.debug('File %s failed to be copied. This may be due to a bundle'
' being retried.', r.file_name)

yield FileResult(final_file_name,
Expand All @@ -580,7 +583,7 @@ def process(self,
r.pane,
destination)

logging.info('Cautiously removing temporary files for'
_LOGGER.info('Cautiously removing temporary files for'
' destination %s and window %s', destination, w)
writer_key = (destination, w)
self._remove_temporary_files(writer_key)
Expand All @@ -592,10 +595,10 @@ def _remove_temporary_files(self, writer_key):
match_result = filesystems.FileSystems.match(['%s*' % prefix])
orphaned_files = [m.path for m in match_result[0].metadata_list]

logging.debug('Deleting orphaned files: %s', orphaned_files)
_LOGGER.debug('Deleting orphaned files: %s', orphaned_files)
filesystems.FileSystems.delete(orphaned_files)
except BeamIOError as e:
logging.debug('Exceptions when deleting files: %s', e)
_LOGGER.debug('Exceptions when deleting files: %s', e)


class _WriteShardedRecordsFn(beam.DoFn):
Expand Down Expand Up @@ -625,7 +628,7 @@ def process(self,
sink.flush()
writer.close()

logging.info('Writing file %s for destination %s and shard %s',
_LOGGER.info('Writing file %s for destination %s and shard %s',
full_file_name, destination, repr(shard))

yield FileResult(full_file_name,
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/io/filesystemio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

from apache_beam.io import filesystemio

_LOGGER = logging.getLogger(__name__)


class FakeDownloader(filesystemio.Downloader):

Expand Down Expand Up @@ -206,7 +208,7 @@ def test_pipe_stream(self):

for buffer_size in buffer_sizes:
for target in [self._read_and_verify, self._read_and_seek]:
logging.info('buffer_size=%s, target=%s' % (buffer_size, target))
_LOGGER.info('buffer_size=%s, target=%s' % (buffer_size, target))
parent_conn, child_conn = multiprocessing.Pipe()
stream = filesystemio.PipeStream(child_conn)
success = [False]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
except ImportError:
pass


_LOGGER = logging.getLogger(__name__)

WAIT_UNTIL_FINISH_DURATION_MS = 15 * 60 * 1000

BIG_QUERY_DATASET_ID = 'python_query_to_table_'
Expand Down Expand Up @@ -90,7 +93,7 @@ def tearDown(self):
try:
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
logging.debug('Failed to clean up dataset %s' % self.dataset_id)
_LOGGER.debug('Failed to clean up dataset %s' % self.dataset_id)

def _setup_new_types_env(self):
table_schema = bigquery.TableSchema()
Expand Down
13 changes: 8 additions & 5 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ def compute_table_name(row):
]


_LOGGER = logging.getLogger(__name__)


@deprecated(since='2.11.0', current="bigquery_tools.parse_table_reference")
def _parse_table_reference(table, dataset=None, project=None):
return bigquery_tools.parse_table_reference(table, dataset, project)
Expand Down Expand Up @@ -787,7 +790,7 @@ def _create_table_if_needed(self, table_reference, schema=None):
# and avoid the get-or-create step.
return

logging.debug('Creating or getting table %s with schema %s.',
_LOGGER.debug('Creating or getting table %s with schema %s.',
table_reference, schema)

table_schema = self.get_table_schema(schema)
Expand Down Expand Up @@ -833,7 +836,7 @@ def finish_bundle(self):
return self._flush_all_batches()

def _flush_all_batches(self):
logging.debug('Attempting to flush to all destinations. Total buffered: %s',
_LOGGER.debug('Attempting to flush to all destinations. Total buffered: %s',
self._total_buffered_rows)

return itertools.chain(*[self._flush_batch(destination)
Expand All @@ -850,7 +853,7 @@ def _flush_batch(self, destination):
table_reference.projectId = vp.RuntimeValueProvider.get_value(
'project', str, '')

logging.debug('Flushing data to %s. Total %s rows.',
_LOGGER.debug('Flushing data to %s. Total %s rows.',
destination, len(rows_and_insert_ids))

rows = [r[0] for r in rows_and_insert_ids]
Expand All @@ -865,7 +868,7 @@ def _flush_batch(self, destination):
insert_ids=insert_ids,
skip_invalid_rows=True)

logging.debug("Passed: %s. Errors are %s", passed, errors)
_LOGGER.debug("Passed: %s. Errors are %s", passed, errors)
failed_rows = [rows[entry.index] for entry in errors]
should_retry = any(
bigquery_tools.RetryStrategy.should_retry(
Expand All @@ -877,7 +880,7 @@ def _flush_batch(self, destination):
break
else:
retry_backoff = next(self._backoff_calculator)
logging.info('Sleeping %s seconds before retrying insertion.',
_LOGGER.info('Sleeping %s seconds before retrying insertion.',
retry_backoff)
time.sleep(retry_backoff)

Expand Down
14 changes: 8 additions & 6 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
from apache_beam.transforms import trigger
from apache_beam.transforms.window import GlobalWindows

_LOGGER = logging.getLogger(__name__)

ONE_TERABYTE = (1 << 40)

# The maximum file size for imports is 5TB. We keep our files under that.
Expand Down Expand Up @@ -320,7 +322,7 @@ def process(self, element, job_name_prefix=None):
copy_to_reference.datasetId,
copy_to_reference.tableId)))

logging.info("Triggering copy job from %s to %s",
_LOGGER.info("Triggering copy job from %s to %s",
copy_from_reference, copy_to_reference)
job_reference = self.bq_wrapper._insert_copy_job(
copy_to_reference.projectId,
Expand Down Expand Up @@ -407,15 +409,15 @@ def process(self, element, load_job_name_prefix, *schema_side_inputs):
uid = _bq_uuid()
job_name = '%s_%s_%s' % (
load_job_name_prefix, destination_hash, uid)
logging.debug('Load job has %s files. Job name is %s.',
_LOGGER.debug('Load job has %s files. Job name is %s.',
len(files), job_name)

if self.temporary_tables:
# For temporary tables, we create a new table with the name with JobId.
table_reference.tableId = job_name
yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)

logging.info('Triggering job %s to load data to BigQuery table %s.'
_LOGGER.info('Triggering job %s to load data to BigQuery table %s.'
'Schema: %s. Additional parameters: %s',
job_name, table_reference,
schema, additional_parameters)
Expand Down Expand Up @@ -519,9 +521,9 @@ def _check_job_states(self, job_references):
ref.jobId,
ref.location)

logging.info("Job status: %s", job.status)
_LOGGER.info("Job status: %s", job.status)
if job.status.state == 'DONE' and job.status.errorResult:
logging.warning("Job %s seems to have failed. Error Result: %s",
_LOGGER.warning("Job %s seems to have failed. Error Result: %s",
ref.jobId, job.status.errorResult)
self._latest_error = job.status
return WaitForBQJobs.FAILED
Expand All @@ -541,7 +543,7 @@ def start_bundle(self):
self.bq_wrapper = bigquery_tools.BigQueryWrapper(client=self.test_client)

def process(self, table_reference):
logging.info("Deleting table %s", table_reference)
_LOGGER.info("Deleting table %s", table_reference)
table_reference = bigquery_tools.parse_table_reference(table_reference)
self.bq_wrapper._delete_table(
table_reference.projectId,
Expand Down
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
HttpError = None


_LOGGER = logging.getLogger(__name__)

_DESTINATION_ELEMENT_PAIRS = [
# DESTINATION 1
('project1:dataset1.table1', '{"name":"beam", "language":"py"}'),
Expand Down Expand Up @@ -609,7 +611,7 @@ def setUp(self):
self.bigquery_client = bigquery_tools.BigQueryWrapper()
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
self.output_table = "%s.output_table" % (self.dataset_id)
logging.info("Created dataset %s in project %s",
_LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)

@attr('IT')
Expand Down Expand Up @@ -794,11 +796,11 @@ def tearDown(self):
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
logging.info("Deleting dataset %s in project %s",
_LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
logging.debug('Failed to clean up dataset %s in project %s',
_LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)


Expand Down
9 changes: 6 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
# pylint: enable=wrong-import-order, wrong-import-position


_LOGGER = logging.getLogger(__name__)


class BigQueryReadIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_read_table_'

Expand All @@ -59,19 +62,19 @@ def setUp(self):
str(int(time.time())),
random.randint(0, 10000))
self.bigquery_client.get_or_create_dataset(self.project, self.dataset_id)
logging.info("Created dataset %s in project %s",
_LOGGER.info("Created dataset %s in project %s",
self.dataset_id, self.project)

def tearDown(self):
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=self.project, datasetId=self.dataset_id,
deleteContents=True)
try:
logging.info("Deleting dataset %s in project %s",
_LOGGER.info("Deleting dataset %s in project %s",
self.dataset_id, self.project)
self.bigquery_client.client.datasets.Delete(request)
except HttpError:
logging.debug('Failed to clean up dataset %s in project %s',
_LOGGER.debug('Failed to clean up dataset %s in project %s',
self.dataset_id, self.project)

def create_table(self, tablename):
Expand Down
Loading

0 comments on commit 8bf9468

Please sign in to comment.