Normally I'd say not to cherry-pick this since the issue is only affecting one runner and isn't really a regression, but given that it's the last Py2 release and there won't be a follow-up release that will be able to include this fix, I think it's worth making an exception this time. There should be at least one release with a working portable runner for Py2 users, given that the portable runner is included in examples on the website. Plus, I'm already waiting for some other cherry-picks, so it won't even delay anything.
So yes, Eugene if you could create a cherry-pick of this change into the release-2.24.0 branch, I'll review and merge it. On Mon, Aug 17, 2020 at 11:27 AM Valentyn Tymofieiev <valen...@google.com> wrote: > Will defer to the release manager; one reason to cherry-pick is > that 2.24.0 will be the last release with Python 2 support, so Py2 users of > Portable Python Local Runner might appreciate the fix, since they won't be > able to use the next release. > > On Thu, Aug 13, 2020 at 6:28 PM Eugene Kirpichov <ekirpic...@gmail.com> > wrote: > >> +Daniel as in charge of 2.24 per dev@ thread. >> >> On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ekirpic...@gmail.com> >> wrote: >> >>> The PR is merged. >>> >>> Do folks think this warrants being cherrypicked into v2.24? My hunch is >>> yes, cause basically one of the runners (local portable python runner) is >>> broken for any production workload (works only if your pipeline has no >>> dependencies). >>> >>> On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ekirpic...@gmail.com> >>> wrote: >>> >>>> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571 >>>> >>>> However, I'm not up to date on the portable test infrastructure and >>>> would appreciate guidance on what tests I can add for this. >>>> >>>> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ekirpic...@gmail.com> >>>> wrote: >>>> >>>>> (FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>) >>>>> >>>>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com> >>>>> wrote: >>>>> >>>>>> Ok I found the bug, and now I don't understand how it could have >>>>>> possibly ever worked. And if this was never tested, then I don't >>>>>> understand >>>>>> why it works after fixing this one bug :) >>>>>> >>>>>> Basically the Python ArtifactStaging/RetrievalService uses >>>>>> FileSystems.open() to read the artifacts to be staged, and >>>>>> FileSystems.open() by default decompresses compressed files based on >>>>>> their >>>>>> extension. >>>>>> I found two of such services - in Python and in Java. Is the Python >>>>>> used with an embedded job endpoint and the java one otherwise? I haven't >>>>>> inspected the Java one, but fixing Python does the trick. >>>>>> >>>>>> The fix is this patch: >>>>>> >>>>>> diff --git >>>>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>>> index f2bbf534c3..1f3ec1c0b0 100644 >>>>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py >>>>>> @@ -41,6 +41,7 @@ import grpc >>>>>> from future.moves.urllib.request import urlopen >>>>>> >>>>>> from apache_beam.io import filesystems >>>>>> +from apache_beam.io.filesystems import CompressionTypes >>>>>> from apache_beam.portability import common_urns >>>>>> from apache_beam.portability.api import beam_artifact_api_pb2 >>>>>> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >>>>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object): >>>>>> self._root = root >>>>>> >>>>>> def file_reader(self, path): >>>>>> - return filesystems.FileSystems.open(path) >>>>>> + return filesystems.FileSystems.open( >>>>>> + path, compression_type=CompressionTypes.UNCOMPRESSED) >>>>>> >>>>>> def file_writer(self, name=None): >>>>>> full_path = filesystems.FileSystems.join(self._root, name) >>>>>> diff --git >>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>>> index 5bf3282250..2684235be0 100644 >>>>>> --- >>>>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>>> +++ >>>>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>>>> @@ -45,6 +45,7 @@ from typing import overload >>>>>> import grpc >>>>>> >>>>>> from apache_beam.io import filesystems >>>>>> +from apache_beam.io.filesystems import CompressionTypes >>>>>> from apache_beam.portability import common_urns >>>>>> from apache_beam.portability import python_urns >>>>>> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >>>>>> @@ -464,9 +465,13 @@ class GrpcServer(object): >>>>>> self.provision_info.provision_info, worker_manager), >>>>>> self.control_server) >>>>>> >>>>>> + def open_uncompressed(f): >>>>>> + return filesystems.FileSystems.open( >>>>>> + f, compression_type=CompressionTypes.UNCOMPRESSED) >>>>>> + >>>>>> >>>>>> >>>>>> beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( >>>>>> artifact_service.ArtifactRetrievalService( >>>>>> - file_reader=filesystems.FileSystems.open), >>>>>> + file_reader=open_uncompressed), >>>>>> self.control_server) >>>>>> >>>>>> self.data_plane_handler = data_plane.BeamFnDataServicer( >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov < >>>>>> ekirpic...@gmail.com> wrote: >>>>>> >>>>>>> Hi Maximilian, >>>>>>> >>>>>>> Thank you - it works fine with the embedded Flink runner (per below, >>>>>>> seems like it's not using Docker for running Python code? What is it >>>>>>> using >>>>>>> then?). >>>>>>> >>>>>>> However, the original bug appears to be wider than I thought - it is >>>>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. >>>>>>> Seems >>>>>>> like something is very broken in local Docker execution in general - I >>>>>>> haven't yet verified whether the same error will happen when running on >>>>>>> a >>>>>>> remote Flink cluster. >>>>>>> >>>>>>> Trying to build my own SDK containers with some more debugging so I >>>>>>> can figure out what's going on... >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Looks like you ran into a bug. >>>>>>>> >>>>>>>> You could just run your program without specifying any arguments, >>>>>>>> since >>>>>>>> running with Python's FnApiRunner should be enough. >>>>>>>> >>>>>>>> Alternatively, how about trying to run the same pipeline with the >>>>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an >>>>>>>> endpoint. >>>>>>>> It will run the Python code embedded (loopback environment) without >>>>>>>> additional containers. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Max >>>>>>>> >>>>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote: >>>>>>>> > Thanks Valentyn! >>>>>>>> > >>>>>>>> > Good to know that this is a bug (I'll file a bug), and that >>>>>>>> Dataflow has >>>>>>>> > an experimental way to use custom containers. I'll try that. >>>>>>>> > >>>>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev >>>>>>>> > <valen...@google.com <mailto:valen...@google.com>> wrote: >>>>>>>> > >>>>>>>> > Hi Eugene, >>>>>>>> > >>>>>>>> > Good to hear from you. The experience you are describing on >>>>>>>> Portable >>>>>>>> > Runner + Docker container in local execution mode is most >>>>>>>> certainly >>>>>>>> > a bug, if you have not opened an issue on it, please do so >>>>>>>> and feel >>>>>>>> > free to cc me. >>>>>>>> > >>>>>>>> > I can also reproduce the bug and likewise didn't see anything >>>>>>>> > obvious immediately, this needs some debugging. >>>>>>>> > >>>>>>>> > cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver >>>>>>>> > <mailto:kcwea...@google.com> who recently worked on Portable >>>>>>>> Runner >>>>>>>> > and may be interested. >>>>>>>> > >>>>>>>> > By the way, you should be able to use custom containers with >>>>>>>> > Dataflow, if you set --experiments=use_runner_v2. >>>>>>>> > >>>>>>>> > On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov >>>>>>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>>>>>>> > >>>>>>>> > (cc'ing Sam with whom I'm working on this atm) >>>>>>>> > >>>>>>>> > FWIW I'm still stumped. I've looked through Python, Go >>>>>>>> and Java >>>>>>>> > code in the Beam repo having anything to do with >>>>>>>> > gzipping/unzipping, and none of it appears to be used in >>>>>>>> the >>>>>>>> > artifact staging/retrieval codepaths. I also can't find >>>>>>>> any >>>>>>>> > mention of compression/decompression in the container >>>>>>>> boot code. >>>>>>>> > My next step will be to add a bunch of debugging, rebuild >>>>>>>> the >>>>>>>> > containers, and see what the artifact services think >>>>>>>> they're >>>>>>>> > serving. >>>>>>>> > >>>>>>>> > >>>>>>>> > On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov >>>>>>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Thanks Austin! Good stuff - though note that I am >>>>>>>> > /not/ using custom containers, I'm just trying to get >>>>>>>> the >>>>>>>> > basic stuff to work, a Python pipeline with a simple >>>>>>>> > requirements.txt file. Feels like this should work >>>>>>>> > out-of-the-box, I must be doing something wrong. >>>>>>>> > >>>>>>>> > On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett >>>>>>>> > <whatwouldausti...@gmail.com >>>>>>>> > <mailto:whatwouldausti...@gmail.com>> wrote: >>>>>>>> > >>>>>>>> > I only believe @OrielResearch Eila Arich-Landkof >>>>>>>> > <mailto:e...@orielresearch.org> potentially doing >>>>>>>> > applied work with custom containers (there must >>>>>>>> be others)! >>>>>>>> > >>>>>>>> > For a plug for her and @BeamSummit -- I think >>>>>>>> enough >>>>>>>> > related will be talked about in (with Conda >>>>>>>> specifics) >>>>>>>> > --> >>>>>>>> > >>>>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/ >>>>>>>> > >>>>>>>> > I'm sure others will have more things to say that >>>>>>>> are >>>>>>>> > actually helpful, on-list, before that occurs (~3 >>>>>>>> weeks). >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov >>>>>>>> > <ekirpic...@gmail.com <mailto: >>>>>>>> ekirpic...@gmail.com>> wrote: >>>>>>>> > >>>>>>>> > Hi old Beam friends, >>>>>>>> > >>>>>>>> > I left Google to work on climate change >>>>>>>> > < >>>>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U >>>>>>>> > >>>>>>>> > and am now doing a short engagement with >>>>>>>> Pachama >>>>>>>> > <https://pachama.com/>. Right now I'm trying >>>>>>>> to get >>>>>>>> > a Beam Python pipeline to work; the pipeline >>>>>>>> will >>>>>>>> > use fancy requirements and native >>>>>>>> dependencies, and >>>>>>>> > we plan to run it on Cloud Dataflow (so custom >>>>>>>> > containers are not yet an option), so I'm >>>>>>>> going >>>>>>>> > straight for the direct PortableRunner as per >>>>>>>> > >>>>>>>> https://beam.apache.org/documentation/runtime/environments/. >>>>>>>> > >>>>>>>> > Basically I can't get a minimal Beam program >>>>>>>> with a >>>>>>>> > minimal requirements.txt file to work - the >>>>>>>> .tar.gz >>>>>>>> > of the dependency mysteriously ends up being >>>>>>>> > ungzipped and non-installable inside the >>>>>>>> Docker >>>>>>>> > container running the worker. Details below. >>>>>>>> > >>>>>>>> > === main.py === >>>>>>>> > import argparse >>>>>>>> > import logging >>>>>>>> > >>>>>>>> > import apache_beam as beam >>>>>>>> > from apache_beam.options.pipeline_options >>>>>>>> import >>>>>>>> > PipelineOptions >>>>>>>> > from apache_beam.options.pipeline_options >>>>>>>> import >>>>>>>> > SetupOptions >>>>>>>> > >>>>>>>> > def run(argv=None): >>>>>>>> > parser = argparse.ArgumentParser() >>>>>>>> > known_args, pipeline_args = >>>>>>>> > parser.parse_known_args(argv) >>>>>>>> > >>>>>>>> > pipeline_options = >>>>>>>> PipelineOptions(pipeline_args) >>>>>>>> > >>>>>>>> > >>>>>>>> pipeline_options.view_as(SetupOptions).save_main_session >>>>>>>> > = True >>>>>>>> > >>>>>>>> > with >>>>>>>> beam.Pipeline(options=pipeline_options) as p: >>>>>>>> > (p | 'Create' >> >>>>>>>> beam.Create(['Hello']) >>>>>>>> > | 'Write' >> >>>>>>>> beam.io.WriteToText('/tmp')) >>>>>>>> > >>>>>>>> > >>>>>>>> > if __name__ == '__main__': >>>>>>>> > >>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>> > run() >>>>>>>> > >>>>>>>> > === requirements.txt === >>>>>>>> > alembic >>>>>>>> > >>>>>>>> > When I run the program: >>>>>>>> > $ python3 main.py >>>>>>>> > >>>>>>>> --runner=PortableRunner --job_endpoint=embed >>>>>>>> --requirements_file=requirements.txt >>>>>>>> > >>>>>>>> > >>>>>>>> > I get some normal output and then: >>>>>>>> > >>>>>>>> > >>>>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b' >>>>>>>> > File >>>>>>>> > >>>>>>>> >>>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>>>>>> > line 261, in unpack_file\n >>>>>>>> untar_file(filename, >>>>>>>> > location)\n File >>>>>>>> > >>>>>>>> >>>>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>>>>>> > line 177, in untar_file\n tar = >>>>>>>> > tarfile.open(filename, mode)\n File >>>>>>>> > "/usr/local/lib/python3.7/tarfile.py", line >>>>>>>> 1591, in >>>>>>>> > open\n return func(name, filemode, fileobj, >>>>>>>> > **kwargs)\n File >>>>>>>> > "/usr/local/lib/python3.7/tarfile.py", line >>>>>>>> 1648, in >>>>>>>> > gzopen\n raise ReadError("not a gzip >>>>>>>> > file")\ntarfile.ReadError: not a gzip >>>>>>>> > file\n2020/08/08 01:17:07 Failed to install >>>>>>>> required >>>>>>>> > packages: failed to install requirements: exit >>>>>>>> > status 2\n' >>>>>>>> > >>>>>>>> > This greatly puzzled me and, after some >>>>>>>> looking, I >>>>>>>> > found something really surprising. Here is the >>>>>>>> > package in the /directory to be staged/: >>>>>>>> > >>>>>>>> > $ file >>>>>>>> > >>>>>>>> >>>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>>>>>> > ...: gzip compressed data, was >>>>>>>> > "dist/alembic-1.4.2.tar", last modified: Thu >>>>>>>> Mar 19 >>>>>>>> > 21:48:31 2020, max compression, original size >>>>>>>> modulo >>>>>>>> > 2^32 4730880 >>>>>>>> > $ ls -l >>>>>>>> > >>>>>>>> >>>>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>>>>>> > -rw-r--r-- 1 jkff staff 1092045 Aug 7 >>>>>>>> 16:56 ... >>>>>>>> > >>>>>>>> > So far so good. But here is the same file >>>>>>>> inside the >>>>>>>> > Docker container (I ssh'd into the dead >>>>>>>> container >>>>>>>> > < >>>>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers >>>>>>>> >): >>>>>>>> > >>>>>>>> > # file /tmp/staged/alembic-1.4.2.tar.gz >>>>>>>> > /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar >>>>>>>> archive >>>>>>>> > (GNU) >>>>>>>> > # ls -l /tmp/staged/alembic-1.4.2.tar.gz >>>>>>>> > -rwxr-xr-x 1 root root 4730880 Aug 8 01:17 >>>>>>>> > /tmp/staged/alembic-1.4.2.tar.gz >>>>>>>> > >>>>>>>> > The file has clearly been unzipped and now of >>>>>>>> course >>>>>>>> > pip can't install it! What's going on here? >>>>>>>> Am I >>>>>>>> > using the direct/portable runner combination >>>>>>>> wrong? >>>>>>>> > >>>>>>>> > Thanks! >>>>>>>> > >>>>>>>> > -- >>>>>>>> > Eugene Kirpichov >>>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > -- >>>>>>>> > Eugene Kirpichov >>>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > -- >>>>>>>> > Eugene Kirpichov >>>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > -- >>>>>>>> > Eugene Kirpichov >>>>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Eugene Kirpichov >>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Eugene Kirpichov >>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>> >>>>> >>>>> >>>>> -- >>>>> Eugene Kirpichov >>>>> http://www.linkedin.com/in/eugenekirpichov >>>>> >>>> >>>> >>>> -- >>>> Eugene Kirpichov >>>> http://www.linkedin.com/in/eugenekirpichov >>>> >>> >>> >>> -- >>> Eugene Kirpichov >>> http://www.linkedin.com/in/eugenekirpichov >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> >