Details
-
Bug
-
Status: Triage Needed
-
P2
-
Resolution: Fixed
-
2.16.0, 2.18.0, 2.19.0
-
None
Description
A user reported following issue.
-------------------------------------------------
I have a set of tfrecord files, obtained by converting parquet files with Spark. Each file is roughly 1GB and I have 11 of those.
I would expect simple statistics gathering (ie counting number of items of all files) to scale linearly with respect to the number of cores on my system.
I am able to reproduce the issue with the minimal snippet below
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.runners.portability import fn_api_runner from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.portability import python_urns import sys pipeline_options = PipelineOptions(['--direct_num_workers', '4']) file_pattern = 'part-r-00* runner=fn_api_runner.FnApiRunner( default_environment=beam_runner_api_pb2.Environment( urn=python_urns.SUBPROCESS_SDK, payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable.encode('ascii'))) p = beam.Pipeline(runner=runner, options=pipeline_options) lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) | beam.combiners.Count.Globally() | beam.io.WriteToText('/tmp/output')) p.run()
Only one combination of apache_beam revision / worker type seems to work (I refer to https://beam.apache.org/documentation/runners/direct/ for the worker types)
- beam 2.16; neither multithread nor multiprocess achieve high cpu usage on multiple cores
- beam 2.17: able to achieve high cpu usage on all 4 cores
- beam 2.18: not tested the mulithreaded mode but the multiprocess mode fails when trying to serialize the Environment instance most likely because of a change from 2.17 to 2.18.
I also tried briefly SparkRunner with version 2.16 but was no able to achieve any throughput.
What is the recommnended way to achieve what I am trying to ? How can I troubleshoot ?
----------------------------------------------------------------------------------------------------------------------------------------------
This is caused by this PR.
A workaround is tried, which is rolling back iobase.py not to use _SDFBoundedSourceWrapper. This confirmed that data is distributed to multiple workers, however, there are some regressions with SDF wrapper tests.
Attachments
Issue Links
- links to