FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571

However, I'm not up to date on the portable test infrastructure and would
appreciate guidance on what tests I can add for this.

On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ekirpic...@gmail.com>
wrote:

> (FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>)
>
> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com>
> wrote:
>
>> Ok I found the bug, and now I don't understand how it could have possibly
>> ever worked. And if this was never tested, then I don't understand why it
>> works after fixing this one bug :)
>>
>> Basically the Python ArtifactStaging/RetrievalService uses
>> FileSystems.open() to read the artifacts to be staged, and
>> FileSystems.open() by default decompresses compressed files based on their
>> extension.
>> I found two of such services - in Python and in Java. Is the Python used
>> with an embedded job endpoint and the java one otherwise? I haven't
>> inspected the Java one, but fixing Python does the trick.
>>
>> The fix is this patch:
>>
>> diff --git
>> a/sdks/python/apache_beam/runners/portability/artifact_service.py
>> b/sdks/python/apache_beam/runners/portability/artifact_service.py
>> index f2bbf534c3..1f3ec1c0b0 100644
>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py
>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py
>> @@ -41,6 +41,7 @@ import grpc
>>  from future.moves.urllib.request import urlopen
>>
>>  from apache_beam.io import filesystems
>> +from apache_beam.io.filesystems import CompressionTypes
>>  from apache_beam.portability import common_urns
>>  from apache_beam.portability.api import beam_artifact_api_pb2
>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object):
>>      self._root = root
>>
>>    def file_reader(self, path):
>> -    return filesystems.FileSystems.open(path)
>> +    return filesystems.FileSystems.open(
>> +        path, compression_type=CompressionTypes.UNCOMPRESSED)
>>
>>    def file_writer(self, name=None):
>>      full_path = filesystems.FileSystems.join(self._root, name)
>> diff --git
>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> index 5bf3282250..2684235be0 100644
>> ---
>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> +++
>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
>> @@ -45,6 +45,7 @@ from typing import overload
>>  import grpc
>>
>>  from apache_beam.io import filesystems
>> +from apache_beam.io.filesystems import CompressionTypes
>>  from apache_beam.portability import common_urns
>>  from apache_beam.portability import python_urns
>>  from apache_beam.portability.api import beam_artifact_api_pb2_grpc
>> @@ -464,9 +465,13 @@ class GrpcServer(object):
>>                  self.provision_info.provision_info, worker_manager),
>>              self.control_server)
>>
>> +      def open_uncompressed(f):
>> +        return filesystems.FileSystems.open(
>> +            f, compression_type=CompressionTypes.UNCOMPRESSED)
>> +
>>
>>  beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server(
>>            artifact_service.ArtifactRetrievalService(
>> -              file_reader=filesystems.FileSystems.open),
>> +              file_reader=open_uncompressed),
>>            self.control_server)
>>
>>      self.data_plane_handler = data_plane.BeamFnDataServicer(
>>
>>
>>
>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com>
>> wrote:
>>
>>> Hi Maximilian,
>>>
>>> Thank you - it works fine with the embedded Flink runner (per below,
>>> seems like it's not using Docker for running Python code? What is it using
>>> then?).
>>>
>>> However, the original bug appears to be wider than I thought - it is
>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems
>>> like something is very broken in local Docker execution in general - I
>>> haven't yet verified whether the same error will happen when running on a
>>> remote Flink cluster.
>>>
>>> Trying to build my own SDK containers with some more debugging so I can
>>> figure out what's going on...
>>>
>>>
>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> Looks like you ran into a bug.
>>>>
>>>> You could just run your program without specifying any arguments, since
>>>> running with Python's FnApiRunner should be enough.
>>>>
>>>> Alternatively, how about trying to run the same pipeline with the
>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint.
>>>> It will run the Python code embedded (loopback environment) without
>>>> additional containers.
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>> On 10.08.20 21:59, Eugene Kirpichov wrote:
>>>> > Thanks Valentyn!
>>>> >
>>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow
>>>> has
>>>> > an experimental way to use custom containers. I'll try that.
>>>> >
>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev
>>>> > <valen...@google.com <mailto:valen...@google.com>> wrote:
>>>> >
>>>> >     Hi Eugene,
>>>> >
>>>> >     Good to hear from you. The experience you are describing on
>>>> Portable
>>>> >     Runner + Docker container in local execution mode is most
>>>> certainly
>>>> >     a bug, if you have not opened an issue on it, please do so and
>>>> feel
>>>> >     free to cc me.
>>>> >
>>>> >     I can also reproduce the bug and likewise didn't see anything
>>>> >     obvious immediately, this needs some debugging.
>>>> >
>>>> >     cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver
>>>> >     <mailto:kcwea...@google.com> who recently worked on Portable
>>>> Runner
>>>> >     and may be interested.
>>>> >
>>>> >     By the way, you should be able to use custom containers with
>>>> >     Dataflow, if you set --experiments=use_runner_v2.
>>>> >
>>>> >     On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov
>>>> >     <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>>> >
>>>> >         (cc'ing Sam with whom I'm working on this atm)
>>>> >
>>>> >         FWIW I'm still stumped. I've looked through Python, Go and
>>>> Java
>>>> >         code in the Beam repo having anything to do with
>>>> >         gzipping/unzipping, and none of it appears to be used in the
>>>> >         artifact staging/retrieval codepaths. I also can't find any
>>>> >         mention of compression/decompression in the container boot
>>>> code.
>>>> >         My next step will be to add a bunch of debugging, rebuild the
>>>> >         containers, and see what the artifact services think they're
>>>> >         serving.
>>>> >
>>>> >
>>>> >         On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov
>>>> >         <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote:
>>>> >
>>>> >             Thanks Austin! Good stuff - though note that I am
>>>> >             /not/ using custom containers, I'm just trying to get the
>>>> >             basic stuff to work, a Python pipeline with a simple
>>>> >             requirements.txt file. Feels like this should work
>>>> >             out-of-the-box, I must be doing something wrong.
>>>> >
>>>> >             On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett
>>>> >             <whatwouldausti...@gmail.com
>>>> >             <mailto:whatwouldausti...@gmail.com>> wrote:
>>>> >
>>>> >                 I only believe @OrielResearch Eila Arich-Landkof
>>>> >                 <mailto:e...@orielresearch.org> potentially doing
>>>> >                 applied work with custom containers (there must be
>>>> others)!
>>>> >
>>>> >                 For a plug for her and @BeamSummit --  I think enough
>>>> >                 related will be talked about in (with Conda specifics)
>>>> >                 -->
>>>> >
>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/
>>>> >
>>>> >                 I'm sure others will have more things to say that are
>>>> >                 actually helpful, on-list, before that occurs (~3
>>>> weeks).
>>>> >
>>>> >
>>>> >
>>>> >                 On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov
>>>> >                 <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>>
>>>> wrote:
>>>> >
>>>> >                     Hi old Beam friends,
>>>> >
>>>> >                     I left Google to work on climate change
>>>> >                     <
>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U
>>>> >
>>>> >                     and am now doing a short engagement with Pachama
>>>> >                     <https://pachama.com/>. Right now I'm trying to
>>>> get
>>>> >                     a Beam Python pipeline to work; the pipeline will
>>>> >                     use fancy requirements and native dependencies,
>>>> and
>>>> >                     we plan to run it on Cloud Dataflow (so custom
>>>> >                     containers are not yet an option), so I'm going
>>>> >                     straight for the direct PortableRunner as per
>>>> >
>>>> https://beam.apache.org/documentation/runtime/environments/.
>>>> >
>>>> >                     Basically I can't get a minimal Beam program with
>>>> a
>>>> >                     minimal requirements.txt file to work - the
>>>> .tar.gz
>>>> >                     of the dependency mysteriously ends up being
>>>> >                     ungzipped and non-installable inside the Docker
>>>> >                     container running the worker. Details below.
>>>> >
>>>> >                     === main.py ===
>>>> >                     import argparse
>>>> >                     import logging
>>>> >
>>>> >                     import apache_beam as beam
>>>> >                     from apache_beam.options.pipeline_options import
>>>> >                     PipelineOptions
>>>> >                     from apache_beam.options.pipeline_options import
>>>> >                     SetupOptions
>>>> >
>>>> >                     def run(argv=None):
>>>> >                          parser = argparse.ArgumentParser()
>>>> >                          known_args, pipeline_args =
>>>> >                     parser.parse_known_args(argv)
>>>> >
>>>> >                          pipeline_options =
>>>> PipelineOptions(pipeline_args)
>>>> >
>>>> >
>>>>  pipeline_options.view_as(SetupOptions).save_main_session
>>>> >                     = True
>>>> >
>>>> >                          with beam.Pipeline(options=pipeline_options)
>>>> as p:
>>>> >                              (p | 'Create' >> beam.Create(['Hello'])
>>>> >                                 | 'Write' >>
>>>> beam.io.WriteToText('/tmp'))
>>>> >
>>>> >
>>>> >                     if __name__ == '__main__':
>>>> >                          logging.getLogger().setLevel(logging.INFO)
>>>> >                          run()
>>>> >
>>>> >                     === requirements.txt ===
>>>> >                     alembic
>>>> >
>>>> >                     When I run the program:
>>>> >                     $ python3 main.py
>>>> >
>>>>  --runner=PortableRunner --job_endpoint=embed 
>>>> --requirements_file=requirements.txt
>>>> >
>>>> >
>>>> >                     I get some normal output and then:
>>>> >
>>>> >
>>>>  INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b'
>>>> >                       File
>>>> >
>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>> >                     line 261, in unpack_file\n    untar_file(filename,
>>>> >                     location)\n  File
>>>> >
>>>>  "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py",
>>>> >                     line 177, in untar_file\n    tar =
>>>> >                     tarfile.open(filename, mode)\n  File
>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1591,
>>>> in
>>>> >                     open\n    return func(name, filemode, fileobj,
>>>> >                     **kwargs)\n  File
>>>> >                     "/usr/local/lib/python3.7/tarfile.py", line 1648,
>>>> in
>>>> >                     gzopen\n    raise ReadError("not a gzip
>>>> >                     file")\ntarfile.ReadError: not a gzip
>>>> >                     file\n2020/08/08 01:17:07 Failed to install
>>>> required
>>>> >                     packages: failed to install requirements: exit
>>>> >                     status 2\n'
>>>> >
>>>> >                     This greatly puzzled me and, after some looking, I
>>>> >                     found something really surprising. Here is the
>>>> >                     package in the /directory to be staged/:
>>>> >
>>>> >                     $ file
>>>> >
>>>>  
>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>> >                     ...: gzip compressed data, was
>>>> >                     "dist/alembic-1.4.2.tar", last modified: Thu Mar
>>>> 19
>>>> >                     21:48:31 2020, max compression, original size
>>>> modulo
>>>> >                     2^32 4730880
>>>> >                     $ ls -l
>>>> >
>>>>  
>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz
>>>> >                     -rw-r--r--  1 jkff  staff  1092045 Aug  7 16:56
>>>> ...
>>>> >
>>>> >                     So far so good. But here is the same file inside
>>>> the
>>>> >                     Docker container (I ssh'd into the dead container
>>>> >                     <
>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers
>>>> >):
>>>> >
>>>> >                     # file /tmp/staged/alembic-1.4.2.tar.gz
>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar
>>>> archive
>>>> >                     (GNU)
>>>> >                     # ls -l /tmp/staged/alembic-1.4.2.tar.gz
>>>> >                     -rwxr-xr-x 1 root root 4730880 Aug  8 01:17
>>>> >                     /tmp/staged/alembic-1.4.2.tar.gz
>>>> >
>>>> >                     The file has clearly been unzipped and now of
>>>> course
>>>> >                     pip can't install it! What's going on here? Am I
>>>> >                     using the direct/portable runner combination
>>>> wrong?
>>>> >
>>>> >                     Thanks!
>>>> >
>>>> >                     --
>>>> >                     Eugene Kirpichov
>>>> >                     http://www.linkedin.com/in/eugenekirpichov
>>>> >
>>>> >
>>>> >
>>>> >             --
>>>> >             Eugene Kirpichov
>>>> >             http://www.linkedin.com/in/eugenekirpichov
>>>> >
>>>> >
>>>> >
>>>> >         --
>>>> >         Eugene Kirpichov
>>>> >         http://www.linkedin.com/in/eugenekirpichov
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > Eugene Kirpichov
>>>> > http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Reply via email to