The PR is merged.

Do folks think this warrants being cherrypicked into v2.24? My hunch is
yes, cause basically one of the runners (local portable python runner) is
broken for any production workload (works only if your pipeline has no
dependencies).

On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ekirpic...@gmail.com>
wrote:

> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>
> However, I'm not up to date on the portable test infrastructure and would
> appreciate guidance on what tests I can add for this.
>
> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ekirpic...@gmail.com>
> wrote:
>
>> (FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>)
>>
>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com>
>> wrote:
>>
>>> Ok I found the bug, and now I don't understand how it could have
>>> possibly ever worked. And if this was never tested, then I don't understand
>>> why it works after fixing this one bug :)
>>>
>>> Basically the Python ArtifactStaging/RetrievalService uses
>>> FileSystems.open() to read the artifacts to be staged, and
>>> FileSystems.open() by default decompresses compressed files based on their
>>> extension.
>>> I found two of such services - in Python and in Java. Is the Python used
>>> with an embedded job endpoint and the java one otherwise? I haven't
>>> inspected the Java one, but fixing Python does the trick.
>>>
>>> The fix is this patch:
>>>
>>> diff --git
>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> index f2bbf534c3..1f3ec1c0b0 100644
>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>> @@ -41,6 +41,7 @@ import grpc
>>>  from future.moves.urllib.request import urlopen
>>>
>>>  from apache_beam.io import filesystems
>>> +from apache_beam.io.filesystems import CompressionTypes
>>>  from apache_beam.portability import common_urns
>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>      self._root = root
>>>
>>>    def file_reader(self, path):
>>> -    return filesystems.FileSystems.open(path)
>>> +    return filesystems.FileSystems.open(
>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>
>>>    def file_writer(self, name=None):
>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>> diff --git
>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> index 5bf3282250..2684235be0 100644
>>> ---
>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> +++
>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>> @@ -45,6 +45,7 @@ from typing import overload
>>>  import grpc
>>>
>>>  from apache_beam.io import filesystems
>>> +from apache_beam.io.filesystems import CompressionTypes
>>>  from apache_beam.portability import common_urns
>>>  from apache_beam.portability import python_urns
>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>                  self.provision_info.provision_info, worker_manager),
>>>              self.control_server)
>>>
>>> +      def open_uncompressed(f):
>>> +        return filesystems.FileSystems.open(
>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>> +
>>>
>>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>            artifact_service.ArtifactRetrievalService(
>>> -              file_reader=filesystems.FileSystems.open),
>>> +              file_reader=open_uncompressed),
>>>            self.control_server)
>>>
>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>
>>>
>>>
>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>> wrote:
>>>
>>>> Hi Maximilian,
>>>>
>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>> seems like it's not using Docker for running Python code? What is it using
>>>> then?).
>>>>
>>>> However, the original bug appears to be wider than I thought - it is
>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>>> like something is very broken in local Docker execution in general - I
>>>> haven't yet verified whether the same error will happen when running on a
>>>> remote Flink cluster.
>>>>
>>>> Trying to build my own SDK containers with some more debugging so I can
>>>> figure out what's going on...
>>>>
>>>>
>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>
>>>>> Looks like you ran into a bug.
>>>>>
>>>>> You could just run your program without specifying any arguments,
>>>>> since
>>>>> running with Python's FnApiRunner should be enough.
>>>>>
>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>>>> It will run the Python code embedded (loopback environment) without
>>>>> additional containers.
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>> > Thanks Valentyn!
>>>>> >
>>>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>>>>> has
>>>>> > an experimental way to use custom containers. I'll try that.
>>>>> >
>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>> > <valen...@google.com <mailto:valen...@google.com>> wrote:
>>>>> >
>>>>> >     Hi Eugene,
>>>>> >
>>>>> >     Good to hear from you. The experience you are describing on
>>>>> Portable
>>>>> >     Runner + Docker container in local execution mode is most
>>>>> certainly
>>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>>> feel
>>>>> >     free to cc me.
>>>>> >
>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>> >     obvious immediately, this needs some debugging.
>>>>> >
>>>>> >     cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver
>>>>> >     <mailto:kcwea...@google.com> who recently worked on Portable
>>>>> Runner
>>>>> >     and may be interested.
>>>>> >
>>>>> >     By the way, you should be able to use custom containers with
>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>> >
>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>> >     <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>>>> >
>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>> >
>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>>> Java
>>>>> >         code in the Beam repo having anything to do with
>>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>> >         mention of compression/decompression in the container boot
>>>>> code.
>>>>> >         My next step will be to add a bunch of debugging, rebuild the
>>>>> >         containers, and see what the artifact services think they're
>>>>> >         serving.
>>>>> >
>>>>> >
>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>> >         <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>>>> >
>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>> >             /not/ using custom containers, I'm just trying to get the
>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>> >             requirements.txt file. Feels like this should work
>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>> >
>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>> >             <whatwouldausti...@gmail.com
>>>>> >             <mailto:whatwouldausti...@gmail.com>> wrote:
>>>>> >
>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>> >                 <mailto:e...@orielresearch.org> potentially doing
>>>>> >                 applied work with custom containers (there must be
>>>>> others)!
>>>>> >
>>>>> >                 For a plug for her and @BeamSummit --  I think enough
>>>>> >                 related will be talked about in (with Conda
>>>>> specifics)
>>>>> >                 -->
>>>>> >
>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>> >
>>>>> >                 I'm sure others will have more things to say that are
>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>> weeks).
>>>>> >
>>>>> >
>>>>> >
>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>> >                 <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>>
>>>>> wrote:
>>>>> >
>>>>> >                     Hi old Beam friends,
>>>>> >
>>>>> >                     I left Google to work on climate change
>>>>> >                     <
>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>> >
>>>>> >                     and am now doing a short engagement with Pachama
>>>>> >                     <https://pachama.com/>. Right now I'm trying to
>>>>> get
>>>>> >                     a Beam Python pipeline to work; the pipeline will
>>>>> >                     use fancy requirements and native dependencies,
>>>>> and
>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>> >                     containers are not yet an option), so I'm going
>>>>> >                     straight for the direct PortableRunner as per
>>>>> >
>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>> >
>>>>> >                     Basically I can't get a minimal Beam program
>>>>> with a
>>>>> >                     minimal requirements.txt file to work - the
>>>>> .tar.gz
>>>>> >                     of the dependency mysteriously ends up being
>>>>> >                     ungzipped and non-installable inside the Docker
>>>>> >                     container running the worker. Details below.
>>>>> >
>>>>> >                     === main.py ===
>>>>> >                     import argparse
>>>>> >                     import logging
>>>>> >
>>>>> >                     import apache_beam as beam
>>>>> >                     from apache_beam.options.pipeline_options import
>>>>> >                     PipelineOptions
>>>>> >                     from apache_beam.options.pipeline_options import
>>>>> >                     SetupOptions
>>>>> >
>>>>> >                     def run(argv=None):
>>>>> >                          parser = argparse.ArgumentParser()
>>>>> >                          known_args, pipeline_args =
>>>>> >                     parser.parse_known_args(argv)
>>>>> >
>>>>> >                          pipeline_options =
>>>>> PipelineOptions(pipeline_args)
>>>>> >
>>>>> >
>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>> >                     = True
>>>>> >
>>>>> >                          with
>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>>> >                                 | 'Write' >>
>>>>> beam.io.WriteToText('/tmp'))
>>>>> >
>>>>> >
>>>>> >                     if __name__ == '__main__':
>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>> >                          run()
>>>>> >
>>>>> >                     === requirements.txt ===
>>>>> >                     alembic
>>>>> >
>>>>> >                     When I run the program:
>>>>> >                     $ python3 main.py
>>>>> >
>>>>>  --runner=PortableRunner --job_endpoint=embed 
>>>>> --requirements_file=requirements.txt
>>>>> >
>>>>> >
>>>>> >                     I get some normal output and then:
>>>>> >
>>>>> >
>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>> >                       File
>>>>> >
>>>>>  
>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>> >                     line 261, in unpack_file\n
>>>>>  untar_file(filename,
>>>>> >                     location)\n  File
>>>>> >
>>>>>  
>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>> >                     line 177, in untar_file\n    tar =
>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>> 1591, in
>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>> >                     **kwargs)\n  File
>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>> 1648, in
>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>> required
>>>>> >                     packages: failed to install requirements: exit
>>>>> >                     status 2\n'
>>>>> >
>>>>> >                     This greatly puzzled me and, after some looking,
>>>>> I
>>>>> >                     found something really surprising. Here is the
>>>>> >                     package in the /directory to be staged/:
>>>>> >
>>>>> >                     $ file
>>>>> >
>>>>>  
>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>> >                     ...: gzip compressed data, was
>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar
>>>>> 19
>>>>> >                     21:48:31 2020, max compression, original size
>>>>> modulo
>>>>> >                     2^32 4730880
>>>>> >                     $ ls -l
>>>>> >
>>>>>  
>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56
>>>>> ...
>>>>> >
>>>>> >                     So far so good. But here is the same file inside
>>>>> the
>>>>> >                     Docker container (I ssh'd into the dead container
>>>>> >                     <
>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>> >):
>>>>> >
>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>> archive
>>>>> >                     (GNU)
>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>> >
>>>>> >                     The file has clearly been unzipped and now of
>>>>> course
>>>>> >                     pip can't install it! What's going on here? Am I
>>>>> >                     using the direct/portable runner combination
>>>>> wrong?
>>>>> >
>>>>> >                     Thanks!
>>>>> >
>>>>> >                     --
>>>>> >                     Eugene Kirpichov
>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>> >
>>>>> >
>>>>> >
>>>>> >             --
>>>>> >             Eugene Kirpichov
>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>> >
>>>>> >
>>>>> >
>>>>> >         --
>>>>> >         Eugene Kirpichov
>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Eugene Kirpichov
>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Reply via email to