+Daniel as in charge of 2.24 per dev@ thread.

On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ekirpic...@gmail.com>
wrote:

> The PR is merged.
>
> Do folks think this warrants being cherrypicked into v2.24? My hunch is
> yes, cause basically one of the runners (local portable python runner) is
> broken for any production workload (works only if your pipeline has no
> dependencies).
>
> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ekirpic...@gmail.com>
> wrote:
>
>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
>>
>> However, I'm not up to date on the portable test infrastructure and would
>> appreciate guidance on what tests I can add for this.
>>
>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ekirpic...@gmail.com>
>> wrote:
>>
>>> (FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>)
>>>
>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>> wrote:
>>>
>>>> Ok I found the bug, and now I don't understand how it could have
>>>> possibly ever worked. And if this was never tested, then I don't understand
>>>> why it works after fixing this one bug :)
>>>>
>>>> Basically the Python ArtifactStaging/RetrievalService uses
>>>> FileSystems.open() to read the artifacts to be staged, and
>>>> FileSystems.open() by default decompresses compressed files based on their
>>>> extension.
>>>> I found two of such services - in Python and in Java. Is the Python
>>>> used with an embedded job endpoint and the java one otherwise? I haven't
>>>> inspected the Java one, but fixing Python does the trick.
>>>>
>>>> The fix is this patch:
>>>>
>>>> diff --git
>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> index f2bbf534c3..1f3ec1c0b0 100644
>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>>>> @@ -41,6 +41,7 @@ import grpc
>>>>  from future.moves.urllib.request import urlopen
>>>>
>>>>  from apache_beam.io import filesystems
>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>  from apache_beam.portability import common_urns
>>>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>>>      self._root = root
>>>>
>>>>    def file_reader(self, path):
>>>> -    return filesystems.FileSystems.open(path)
>>>> +    return filesystems.FileSystems.open(
>>>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>>>
>>>>    def file_writer(self, name=None):
>>>>      full_path = filesystems.FileSystems.join(self._root, name)
>>>> diff --git
>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> index 5bf3282250..2684235be0 100644
>>>> ---
>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> +++
>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>>>> @@ -45,6 +45,7 @@ from typing import overload
>>>>  import grpc
>>>>
>>>>  from apache_beam.io import filesystems
>>>> +from apache_beam.io.filesystems import CompressionTypes
>>>>  from apache_beam.portability import common_urns
>>>>  from apache_beam.portability import python_urns
>>>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>>>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>>>                  self.provision_info.provision_info, worker_manager),
>>>>              self.control_server)
>>>>
>>>> +      def open_uncompressed(f):
>>>> +        return filesystems.FileSystems.open(
>>>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>>>> +
>>>>
>>>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>>>            artifact_service.ArtifactRetrievalService(
>>>> -              file_reader=filesystems.FileSystems.open),
>>>> +              file_reader=open_uncompressed),
>>>>            self.control_server)
>>>>
>>>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>>>
>>>>
>>>>
>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Maximilian,
>>>>>
>>>>> Thank you - it works fine with the embedded Flink runner (per below,
>>>>> seems like it's not using Docker for running Python code? What is it using
>>>>> then?).
>>>>>
>>>>> However, the original bug appears to be wider than I thought - it is
>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. 
>>>>> Seems
>>>>> like something is very broken in local Docker execution in general - I
>>>>> haven't yet verified whether the same error will happen when running on a
>>>>> remote Flink cluster.
>>>>>
>>>>> Trying to build my own SDK containers with some more debugging so I
>>>>> can figure out what's going on...
>>>>>
>>>>>
>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Looks like you ran into a bug.
>>>>>>
>>>>>> You could just run your program without specifying any arguments,
>>>>>> since
>>>>>> running with Python's FnApiRunner should be enough.
>>>>>>
>>>>>> Alternatively, how about trying to run the same pipeline with the
>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an
>>>>>> endpoint.
>>>>>> It will run the Python code embedded (loopback environment) without
>>>>>> additional containers.
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>>>> > Thanks Valentyn!
>>>>>> >
>>>>>> > Good to know that this is a bug (I'll file a bug), and that
>>>>>> Dataflow has
>>>>>> > an experimental way to use custom containers. I'll try that.
>>>>>> >
>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>>>> > <valen...@google.com <mailto:valen...@google.com>> wrote:
>>>>>> >
>>>>>> >     Hi Eugene,
>>>>>> >
>>>>>> >     Good to hear from you. The experience you are describing on
>>>>>> Portable
>>>>>> >     Runner + Docker container in local execution mode is most
>>>>>> certainly
>>>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>>>> feel
>>>>>> >     free to cc me.
>>>>>> >
>>>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>>>> >     obvious immediately, this needs some debugging.
>>>>>> >
>>>>>> >     cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver
>>>>>> >     <mailto:kcwea...@google.com> who recently worked on Portable
>>>>>> Runner
>>>>>> >     and may be interested.
>>>>>> >
>>>>>> >     By the way, you should be able to use custom containers with
>>>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>>>> >
>>>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>>>> >     <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>>>>> >
>>>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>>>> >
>>>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>>>> Java
>>>>>> >         code in the Beam repo having anything to do with
>>>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>>>> >         mention of compression/decompression in the container boot
>>>>>> code.
>>>>>> >         My next step will be to add a bunch of debugging, rebuild
>>>>>> the
>>>>>> >         containers, and see what the artifact services think they're
>>>>>> >         serving.
>>>>>> >
>>>>>> >
>>>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>>>> >         <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>>>>> >
>>>>>> >             Thanks Austin! Good stuff - though note that I am
>>>>>> >             /not/ using custom containers, I'm just trying to get
>>>>>> the
>>>>>> >             basic stuff to work, a Python pipeline with a simple
>>>>>> >             requirements.txt file. Feels like this should work
>>>>>> >             out-of-the-box, I must be doing something wrong.
>>>>>> >
>>>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>>>> >             <whatwouldausti...@gmail.com
>>>>>> >             <mailto:whatwouldausti...@gmail.com>> wrote:
>>>>>> >
>>>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>>>> >                 <mailto:e...@orielresearch.org> potentially doing
>>>>>> >                 applied work with custom containers (there must be
>>>>>> others)!
>>>>>> >
>>>>>> >                 For a plug for her and @BeamSummit --  I think
>>>>>> enough
>>>>>> >                 related will be talked about in (with Conda
>>>>>> specifics)
>>>>>> >                 -->
>>>>>> >
>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>>>> >
>>>>>> >                 I'm sure others will have more things to say that
>>>>>> are
>>>>>> >                 actually helpful, on-list, before that occurs (~3
>>>>>> weeks).
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>>>> >                 <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>>
>>>>>> wrote:
>>>>>> >
>>>>>> >                     Hi old Beam friends,
>>>>>> >
>>>>>> >                     I left Google to work on climate change
>>>>>> >                     <
>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>>>> >
>>>>>> >                     and am now doing a short engagement with Pachama
>>>>>> >                     <https://pachama.com/>. Right now I'm trying
>>>>>> to get
>>>>>> >                     a Beam Python pipeline to work; the pipeline
>>>>>> will
>>>>>> >                     use fancy requirements and native dependencies,
>>>>>> and
>>>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>>>> >                     containers are not yet an option), so I'm going
>>>>>> >                     straight for the direct PortableRunner as per
>>>>>> >
>>>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>>>> >
>>>>>> >                     Basically I can't get a minimal Beam program
>>>>>> with a
>>>>>> >                     minimal requirements.txt file to work - the
>>>>>> .tar.gz
>>>>>> >                     of the dependency mysteriously ends up being
>>>>>> >                     ungzipped and non-installable inside the Docker
>>>>>> >                     container running the worker. Details below.
>>>>>> >
>>>>>> >                     === main.py ===
>>>>>> >                     import argparse
>>>>>> >                     import logging
>>>>>> >
>>>>>> >                     import apache_beam as beam
>>>>>> >                     from apache_beam.options.pipeline_options import
>>>>>> >                     PipelineOptions
>>>>>> >                     from apache_beam.options.pipeline_options import
>>>>>> >                     SetupOptions
>>>>>> >
>>>>>> >                     def run(argv=None):
>>>>>> >                          parser = argparse.ArgumentParser()
>>>>>> >                          known_args, pipeline_args =
>>>>>> >                     parser.parse_known_args(argv)
>>>>>> >
>>>>>> >                          pipeline_options =
>>>>>> PipelineOptions(pipeline_args)
>>>>>> >
>>>>>> >
>>>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>>>> >                     = True
>>>>>> >
>>>>>> >                          with
>>>>>> beam.Pipeline(options=pipeline_options) as p:
>>>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>>>> >                                 | 'Write' >>
>>>>>> beam.io.WriteToText('/tmp'))
>>>>>> >
>>>>>> >
>>>>>> >                     if __name__ == '__main__':
>>>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>>>> >                          run()
>>>>>> >
>>>>>> >                     === requirements.txt ===
>>>>>> >                     alembic
>>>>>> >
>>>>>> >                     When I run the program:
>>>>>> >                     $ python3 main.py
>>>>>> >
>>>>>>  --runner=PortableRunner --job_endpoint=embed 
>>>>>> --requirements_file=requirements.txt
>>>>>> >
>>>>>> >
>>>>>> >                     I get some normal output and then:
>>>>>> >
>>>>>> >
>>>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>>>> >                       File
>>>>>> >
>>>>>>  
>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>> >                     line 261, in unpack_file\n
>>>>>>  untar_file(filename,
>>>>>> >                     location)\n  File
>>>>>> >
>>>>>>  
>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>>>> >                     line 177, in untar_file\n    tar =
>>>>>> >                     tarfile.open(filename, mode)\n  File
>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>> 1591, in
>>>>>> >                     open\n    return func(name, filemode, fileobj,
>>>>>> >                     **kwargs)\n  File
>>>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line
>>>>>> 1648, in
>>>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>>>> required
>>>>>> >                     packages: failed to install requirements: exit
>>>>>> >                     status 2\n'
>>>>>> >
>>>>>> >                     This greatly puzzled me and, after some
>>>>>> looking, I
>>>>>> >                     found something really surprising. Here is the
>>>>>> >                     package in the /directory to be staged/:
>>>>>> >
>>>>>> >                     $ file
>>>>>> >
>>>>>>  
>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>> >                     ...: gzip compressed data, was
>>>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu
>>>>>> Mar 19
>>>>>> >                     21:48:31 2020, max compression, original size
>>>>>> modulo
>>>>>> >                     2^32 4730880
>>>>>> >                     $ ls -l
>>>>>> >
>>>>>>  
>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56
>>>>>> ...
>>>>>> >
>>>>>> >                     So far so good. But here is the same file
>>>>>> inside the
>>>>>> >                     Docker container (I ssh'd into the dead
>>>>>> container
>>>>>> >                     <
>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>>>> >):
>>>>>> >
>>>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>>>> archive
>>>>>> >                     (GNU)
>>>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>>>> >
>>>>>> >                     The file has clearly been unzipped and now of
>>>>>> course
>>>>>> >                     pip can't install it! What's going on here? Am I
>>>>>> >                     using the direct/portable runner combination
>>>>>> wrong?
>>>>>> >
>>>>>> >                     Thanks!
>>>>>> >
>>>>>> >                     --
>>>>>> >                     Eugene Kirpichov
>>>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >             --
>>>>>> >             Eugene Kirpichov
>>>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >         --
>>>>>> >         Eugene Kirpichov
>>>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > Eugene Kirpichov
>>>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Reply via email to