(FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>)

On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com>
wrote:

> Ok I found the bug, and now I don't understand how it could have possibly
> ever worked. And if this was never tested, then I don't understand why it
> works after fixing this one bug :)
>
> Basically the Python ArtifactStaging/RetrievalService uses
> FileSystems.open() to read the artifacts to be staged, and
> FileSystems.open() by default decompresses compressed files based on their
> extension.
> I found two of such services - in Python and in Java. Is the Python used
> with an embedded job endpoint and the java one otherwise? I haven't
> inspected the Java one, but fixing Python does the trick.
>
> The fix is this patch:
>
> diff --git
> a/sdks/python/apache_beam/runners/portability/artifact_service.py
> b/sdks/python/apache_beam/runners/portability/artifact_service.py
> index f2bbf534c3..1f3ec1c0b0 100644
> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
> @@ -41,6 +41,7 @@ import grpc
>  from future.moves.urllib.request import urlopen
>
>  from apache_beam.io import filesystems
> +from apache_beam.io.filesystems import CompressionTypes
>  from apache_beam.portability import common_urns
>  from apache_beam.portability.api import beam_artifact_api_pb2
>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>      self._root = root
>
>    def file_reader(self, path):
> -    return filesystems.FileSystems.open(path)
> +    return filesystems.FileSystems.open(
> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>
>    def file_writer(self, name=None):
>      full_path = filesystems.FileSystems.join(self._root, name)
> diff --git
> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> index 5bf3282250..2684235be0 100644
> ---
> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> +++
> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
> @@ -45,6 +45,7 @@ from typing import overload
>  import grpc
>
>  from apache_beam.io import filesystems
> +from apache_beam.io.filesystems import CompressionTypes
>  from apache_beam.portability import common_urns
>  from apache_beam.portability import python_urns
>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
> @@ -464,9 +465,13 @@ class GrpcServer(object):
>                  self.provision_info.provision_info, worker_manager),
>              self.control_server)
>
> +      def open_uncompressed(f):
> +        return filesystems.FileSystems.open(
> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
> +
>
>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>            artifact_service.ArtifactRetrievalService(
> -              file_reader=filesystems.FileSystems.open),
> +              file_reader=open_uncompressed),
>            self.control_server)
>
>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>
>
>
> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com>
> wrote:
>
>> Hi Maximilian,
>>
>> Thank you - it works fine with the embedded Flink runner (per below,
>> seems like it's not using Docker for running Python code? What is it using
>> then?).
>>
>> However, the original bug appears to be wider than I thought - it is also
>> present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like
>> something is very broken in local Docker execution in general - I haven't
>> yet verified whether the same error will happen when running on a remote
>> Flink cluster.
>>
>> Trying to build my own SDK containers with some more debugging so I can
>> figure out what's going on...
>>
>>
>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Looks like you ran into a bug.
>>>
>>> You could just run your program without specifying any arguments, since
>>> running with Python's FnApiRunner should be enough.
>>>
>>> Alternatively, how about trying to run the same pipeline with the
>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>> It will run the Python code embedded (loopback environment) without
>>> additional containers.
>>>
>>> Cheers,
>>> Max
>>>
>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>> > Thanks Valentyn!
>>> >
>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>>> has
>>> > an experimental way to use custom containers. I'll try that.
>>> >
>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>> > <valen...@google.com <mailto:valen...@google.com>> wrote:
>>> >
>>> >     Hi Eugene,
>>> >
>>> >     Good to hear from you. The experience you are describing on
>>> Portable
>>> >     Runner + Docker container in local execution mode is most certainly
>>> >     a bug, if you have not opened an issue on it, please do so and feel
>>> >     free to cc me.
>>> >
>>> >     I can also reproduce the bug and likewise didn't see anything
>>> >     obvious immediately, this needs some debugging.
>>> >
>>> >     cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver
>>> >     <mailto:kcwea...@google.com> who recently worked on Portable
>>> Runner
>>> >     and may be interested.
>>> >
>>> >     By the way, you should be able to use custom containers with
>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>> >
>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>> >     <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>> >
>>> >         (cc'ing Sam with whom I'm working on this atm)
>>> >
>>> >         FWIW I'm still stumped. I've looked through Python, Go and Java
>>> >         code in the Beam repo having anything to do with
>>> >         gzipping/unzipping, and none of it appears to be used in the
>>> >         artifact staging/retrieval codepaths. I also can't find any
>>> >         mention of compression/decompression in the container boot
>>> code.
>>> >         My next step will be to add a bunch of debugging, rebuild the
>>> >         containers, and see what the artifact services think they're
>>> >         serving.
>>> >
>>> >
>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>> >         <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>> >
>>> >             Thanks Austin! Good stuff - though note that I am
>>> >             /not/ using custom containers, I'm just trying to get the
>>> >             basic stuff to work, a Python pipeline with a simple
>>> >             requirements.txt file. Feels like this should work
>>> >             out-of-the-box, I must be doing something wrong.
>>> >
>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>> >             <whatwouldausti...@gmail.com
>>> >             <mailto:whatwouldausti...@gmail.com>> wrote:
>>> >
>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>> >                 <mailto:e...@orielresearch.org> potentially doing
>>> >                 applied work with custom containers (there must be
>>> others)!
>>> >
>>> >                 For a plug for her and @BeamSummit --  I think enough
>>> >                 related will be talked about in (with Conda specifics)
>>> >                 -->
>>> >
>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>> >
>>> >                 I'm sure others will have more things to say that are
>>> >                 actually helpful, on-list, before that occurs (~3
>>> weeks).
>>> >
>>> >
>>> >
>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>> >                 <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>>
>>> wrote:
>>> >
>>> >                     Hi old Beam friends,
>>> >
>>> >                     I left Google to work on climate change
>>> >                     <
>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>> >
>>> >                     and am now doing a short engagement with Pachama
>>> >                     <https://pachama.com/>. Right now I'm trying to
>>> get
>>> >                     a Beam Python pipeline to work; the pipeline will
>>> >                     use fancy requirements and native dependencies, and
>>> >                     we plan to run it on Cloud Dataflow (so custom
>>> >                     containers are not yet an option), so I'm going
>>> >                     straight for the direct PortableRunner as per
>>> >
>>> https://beam.apache.org/documentation/runtime/environments/.
>>> >
>>> >                     Basically I can't get a minimal Beam program with a
>>> >                     minimal requirements.txt file to work - the .tar.gz
>>> >                     of the dependency mysteriously ends up being
>>> >                     ungzipped and non-installable inside the Docker
>>> >                     container running the worker. Details below.
>>> >
>>> >                     === main.py ===
>>> >                     import argparse
>>> >                     import logging
>>> >
>>> >                     import apache_beam as beam
>>> >                     from apache_beam.options.pipeline_options import
>>> >                     PipelineOptions
>>> >                     from apache_beam.options.pipeline_options import
>>> >                     SetupOptions
>>> >
>>> >                     def run(argv=None):
>>> >                          parser = argparse.ArgumentParser()
>>> >                          known_args, pipeline_args =
>>> >                     parser.parse_known_args(argv)
>>> >
>>> >                          pipeline_options =
>>> PipelineOptions(pipeline_args)
>>> >
>>> >
>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>> >                     = True
>>> >
>>> >                          with beam.Pipeline(options=pipeline_options)
>>> as p:
>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>> >                                 | 'Write' >>
>>> beam.io.WriteToText('/tmp'))
>>> >
>>> >
>>> >                     if __name__ == '__main__':
>>> >                          logging.getLogger().setLevel(logging.INFO)
>>> >                          run()
>>> >
>>> >                     === requirements.txt ===
>>> >                     alembic
>>> >
>>> >                     When I run the program:
>>> >                     $ python3 main.py
>>> >
>>>  --runner=PortableRunner --job_endpoint=embed 
>>> --requirements_file=requirements.txt
>>> >
>>> >
>>> >                     I get some normal output and then:
>>> >
>>> >
>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>> >                       File
>>> >
>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>> >                     line 261, in unpack_file\n    untar_file(filename,
>>> >                     location)\n  File
>>> >
>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>> >                     line 177, in untar_file\n    tar =
>>> >                     tarfile.open(filename, mode)\n  File
>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591,
>>> in
>>> >                     open\n    return func(name, filemode, fileobj,
>>> >                     **kwargs)\n  File
>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648,
>>> in
>>> >                     gzopen\n    raise ReadError("not a gzip
>>> >                     file")\ntarfile.ReadError: not a gzip
>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>> required
>>> >                     packages: failed to install requirements: exit
>>> >                     status 2\n'
>>> >
>>> >                     This greatly puzzled me and, after some looking, I
>>> >                     found something really surprising. Here is the
>>> >                     package in the /directory to be staged/:
>>> >
>>> >                     $ file
>>> >
>>>  
>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>> >                     ...: gzip compressed data, was
>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar 19
>>> >                     21:48:31 2020, max compression, original size
>>> modulo
>>> >                     2^32 4730880
>>> >                     $ ls -l
>>> >
>>>  
>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56 ...
>>> >
>>> >                     So far so good. But here is the same file inside
>>> the
>>> >                     Docker container (I ssh'd into the dead container
>>> >                     <
>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>> >):
>>> >
>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive
>>> >                     (GNU)
>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>> >
>>> >                     The file has clearly been unzipped and now of
>>> course
>>> >                     pip can't install it! What's going on here? Am I
>>> >                     using the direct/portable runner combination wrong?
>>> >
>>> >                     Thanks!
>>> >
>>> >                     --
>>> >                     Eugene Kirpichov
>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>> >
>>> >
>>> >
>>> >             --
>>> >             Eugene Kirpichov
>>> >             http://www.linkedin.com/in/eugenekirpichov
>>> >
>>> >
>>> >
>>> >         --
>>> >         Eugene Kirpichov
>>> >         http://www.linkedin.com/in/eugenekirpichov
>>> >
>>> >
>>> >
>>> > --
>>> > Eugene Kirpichov
>>> > http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Reply via email to