Hi Julien Thanks for reaching out user community. I will look into it. Can you please share how you checked CPU usage for each core?
Thanks, Hannah On Tue, Jan 28, 2020 at 9:48 PM Julien Lafaye <jlaf...@gmail.com> wrote: > Hello, > > I have a set of tfrecord files, obtained by converting parquet files with > Spark. Each file is roughly 1GB and I have 11 of those. > > I would expect simple statistics gathering (ie counting number of items of > all files) to scale linearly with respect to the number of cores on my > system. > > I am able to reproduce the issue with the minimal snippet below > > import apache_beam as beam > from apache_beam.options.pipeline_options import PipelineOptions > from apache_beam.runners.portability import fn_api_runner > from apache_beam.portability.api import beam_runner_api_pb2 > from apache_beam.portability import python_urns > import sys > > pipeline_options = PipelineOptions(['--direct_num_workers', '4']) > > file_pattern = 'part-r-00* > runner=fn_api_runner.FnApiRunner( > default_environment=beam_runner_api_pb2.Environment( > urn=python_urns.SUBPROCESS_SDK, > payload=b'%s -m apache_beam.runners.worker.sdk_worker_main' > % sys.executable.encode('ascii'))) > > p = beam.Pipeline(runner=runner, options=pipeline_options) > > lines = (p | 'read' >> beam.io.tfrecordio.ReadFromTFRecord(file_pattern) > | beam.combiners.Count.Globally() > | beam.io.WriteToText('/tmp/output')) > > p.run() > > Only one combination of apache_beam revision / worker type seems to work > (I refer to https://beam.apache.org/documentation/runners/direct/ for the > worker types) > * beam 2.16; neither multithread nor multiprocess achieve high cpu usage > on multiple cores > * beam 2.17: able to achieve high cpu usage on all 4 cores > * beam 2.18: not tested the mulithreaded mode but the multiprocess mode > fails when trying to serialize the Environment instance most likely because > of a change from 2.17 to 2.18. > > I also tried briefly SparkRunner with version 2.16 but was no able to > achieve any throughput. > > What is the recommnended way to achieve what I am trying to ? How can I > troubleshoot ? > > -- Please help me know how I am doing: go/hannahjiang-feedback