+Daniel as in charge of 2.24 per dev@ thread. On Thu, Aug 13, 2020 at 6:24 PM Eugene Kirpichov <ekirpic...@gmail.com> wrote:
> The PR is merged. > > Do folks think this warrants being cherrypicked into v2.24? My hunch is > yes, cause basically one of the runners (local portable python runner) is > broken for any production workload (works only if your pipeline has no > dependencies). > > On Thu, Aug 13, 2020 at 12:56 PM Eugene Kirpichov <ekirpic...@gmail.com> > wrote: > >> FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571 >> >> However, I'm not up to date on the portable test infrastructure and would >> appreciate guidance on what tests I can add for this. >> >> On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ekirpic...@gmail.com> >> wrote: >> >>> (FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>) >>> >>> On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com> >>> wrote: >>> >>>> Ok I found the bug, and now I don't understand how it could have >>>> possibly ever worked. And if this was never tested, then I don't understand >>>> why it works after fixing this one bug :) >>>> >>>> Basically the Python ArtifactStaging/RetrievalService uses >>>> FileSystems.open() to read the artifacts to be staged, and >>>> FileSystems.open() by default decompresses compressed files based on their >>>> extension. >>>> I found two of such services - in Python and in Java. Is the Python >>>> used with an embedded job endpoint and the java one otherwise? I haven't >>>> inspected the Java one, but fixing Python does the trick. >>>> >>>> The fix is this patch: >>>> >>>> diff --git >>>> a/sdks/python/apache_beam/runners/portability/artifact_service.py >>>> b/sdks/python/apache_beam/runners/portability/artifact_service.py >>>> index f2bbf534c3..1f3ec1c0b0 100644 >>>> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py >>>> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py >>>> @@ -41,6 +41,7 @@ import grpc >>>> from future.moves.urllib.request import urlopen >>>> >>>> from apache_beam.io import filesystems >>>> +from apache_beam.io.filesystems import CompressionTypes >>>> from apache_beam.portability import common_urns >>>> from apache_beam.portability.api import beam_artifact_api_pb2 >>>> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >>>> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object): >>>> self._root = root >>>> >>>> def file_reader(self, path): >>>> - return filesystems.FileSystems.open(path) >>>> + return filesystems.FileSystems.open( >>>> + path, compression_type=CompressionTypes.UNCOMPRESSED) >>>> >>>> def file_writer(self, name=None): >>>> full_path = filesystems.FileSystems.join(self._root, name) >>>> diff --git >>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>> index 5bf3282250..2684235be0 100644 >>>> --- >>>> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>> +++ >>>> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >>>> @@ -45,6 +45,7 @@ from typing import overload >>>> import grpc >>>> >>>> from apache_beam.io import filesystems >>>> +from apache_beam.io.filesystems import CompressionTypes >>>> from apache_beam.portability import common_urns >>>> from apache_beam.portability import python_urns >>>> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >>>> @@ -464,9 +465,13 @@ class GrpcServer(object): >>>> self.provision_info.provision_info, worker_manager), >>>> self.control_server) >>>> >>>> + def open_uncompressed(f): >>>> + return filesystems.FileSystems.open( >>>> + f, compression_type=CompressionTypes.UNCOMPRESSED) >>>> + >>>> >>>> beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( >>>> artifact_service.ArtifactRetrievalService( >>>> - file_reader=filesystems.FileSystems.open), >>>> + file_reader=open_uncompressed), >>>> self.control_server) >>>> >>>> self.data_plane_handler = data_plane.BeamFnDataServicer( >>>> >>>> >>>> >>>> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com> >>>> wrote: >>>> >>>>> Hi Maximilian, >>>>> >>>>> Thank you - it works fine with the embedded Flink runner (per below, >>>>> seems like it's not using Docker for running Python code? What is it using >>>>> then?). >>>>> >>>>> However, the original bug appears to be wider than I thought - it is >>>>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. >>>>> Seems >>>>> like something is very broken in local Docker execution in general - I >>>>> haven't yet verified whether the same error will happen when running on a >>>>> remote Flink cluster. >>>>> >>>>> Trying to build my own SDK containers with some more debugging so I >>>>> can figure out what's going on... >>>>> >>>>> >>>>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>> >>>>>> Looks like you ran into a bug. >>>>>> >>>>>> You could just run your program without specifying any arguments, >>>>>> since >>>>>> running with Python's FnApiRunner should be enough. >>>>>> >>>>>> Alternatively, how about trying to run the same pipeline with the >>>>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an >>>>>> endpoint. >>>>>> It will run the Python code embedded (loopback environment) without >>>>>> additional containers. >>>>>> >>>>>> Cheers, >>>>>> Max >>>>>> >>>>>> On 10.08.20 21:59, Eugene Kirpichov wrote: >>>>>> > Thanks Valentyn! >>>>>> > >>>>>> > Good to know that this is a bug (I'll file a bug), and that >>>>>> Dataflow has >>>>>> > an experimental way to use custom containers. I'll try that. >>>>>> > >>>>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev >>>>>> > <valen...@google.com <mailto:valen...@google.com>> wrote: >>>>>> > >>>>>> > Hi Eugene, >>>>>> > >>>>>> > Good to hear from you. The experience you are describing on >>>>>> Portable >>>>>> > Runner + Docker container in local execution mode is most >>>>>> certainly >>>>>> > a bug, if you have not opened an issue on it, please do so and >>>>>> feel >>>>>> > free to cc me. >>>>>> > >>>>>> > I can also reproduce the bug and likewise didn't see anything >>>>>> > obvious immediately, this needs some debugging. >>>>>> > >>>>>> > cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver >>>>>> > <mailto:kcwea...@google.com> who recently worked on Portable >>>>>> Runner >>>>>> > and may be interested. >>>>>> > >>>>>> > By the way, you should be able to use custom containers with >>>>>> > Dataflow, if you set --experiments=use_runner_v2. >>>>>> > >>>>>> > On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov >>>>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>>>>> > >>>>>> > (cc'ing Sam with whom I'm working on this atm) >>>>>> > >>>>>> > FWIW I'm still stumped. I've looked through Python, Go and >>>>>> Java >>>>>> > code in the Beam repo having anything to do with >>>>>> > gzipping/unzipping, and none of it appears to be used in the >>>>>> > artifact staging/retrieval codepaths. I also can't find any >>>>>> > mention of compression/decompression in the container boot >>>>>> code. >>>>>> > My next step will be to add a bunch of debugging, rebuild >>>>>> the >>>>>> > containers, and see what the artifact services think they're >>>>>> > serving. >>>>>> > >>>>>> > >>>>>> > On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov >>>>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>>>>> > >>>>>> > Thanks Austin! Good stuff - though note that I am >>>>>> > /not/ using custom containers, I'm just trying to get >>>>>> the >>>>>> > basic stuff to work, a Python pipeline with a simple >>>>>> > requirements.txt file. Feels like this should work >>>>>> > out-of-the-box, I must be doing something wrong. >>>>>> > >>>>>> > On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett >>>>>> > <whatwouldausti...@gmail.com >>>>>> > <mailto:whatwouldausti...@gmail.com>> wrote: >>>>>> > >>>>>> > I only believe @OrielResearch Eila Arich-Landkof >>>>>> > <mailto:e...@orielresearch.org> potentially doing >>>>>> > applied work with custom containers (there must be >>>>>> others)! >>>>>> > >>>>>> > For a plug for her and @BeamSummit -- I think >>>>>> enough >>>>>> > related will be talked about in (with Conda >>>>>> specifics) >>>>>> > --> >>>>>> > >>>>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/ >>>>>> > >>>>>> > I'm sure others will have more things to say that >>>>>> are >>>>>> > actually helpful, on-list, before that occurs (~3 >>>>>> weeks). >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov >>>>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> >>>>>> wrote: >>>>>> > >>>>>> > Hi old Beam friends, >>>>>> > >>>>>> > I left Google to work on climate change >>>>>> > < >>>>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U >>>>>> > >>>>>> > and am now doing a short engagement with Pachama >>>>>> > <https://pachama.com/>. Right now I'm trying >>>>>> to get >>>>>> > a Beam Python pipeline to work; the pipeline >>>>>> will >>>>>> > use fancy requirements and native dependencies, >>>>>> and >>>>>> > we plan to run it on Cloud Dataflow (so custom >>>>>> > containers are not yet an option), so I'm going >>>>>> > straight for the direct PortableRunner as per >>>>>> > >>>>>> https://beam.apache.org/documentation/runtime/environments/. >>>>>> > >>>>>> > Basically I can't get a minimal Beam program >>>>>> with a >>>>>> > minimal requirements.txt file to work - the >>>>>> .tar.gz >>>>>> > of the dependency mysteriously ends up being >>>>>> > ungzipped and non-installable inside the Docker >>>>>> > container running the worker. Details below. >>>>>> > >>>>>> > === main.py === >>>>>> > import argparse >>>>>> > import logging >>>>>> > >>>>>> > import apache_beam as beam >>>>>> > from apache_beam.options.pipeline_options import >>>>>> > PipelineOptions >>>>>> > from apache_beam.options.pipeline_options import >>>>>> > SetupOptions >>>>>> > >>>>>> > def run(argv=None): >>>>>> > parser = argparse.ArgumentParser() >>>>>> > known_args, pipeline_args = >>>>>> > parser.parse_known_args(argv) >>>>>> > >>>>>> > pipeline_options = >>>>>> PipelineOptions(pipeline_args) >>>>>> > >>>>>> > >>>>>> pipeline_options.view_as(SetupOptions).save_main_session >>>>>> > = True >>>>>> > >>>>>> > with >>>>>> beam.Pipeline(options=pipeline_options) as p: >>>>>> > (p | 'Create' >> beam.Create(['Hello']) >>>>>> > | 'Write' >> >>>>>> beam.io.WriteToText('/tmp')) >>>>>> > >>>>>> > >>>>>> > if __name__ == '__main__': >>>>>> > logging.getLogger().setLevel(logging.INFO) >>>>>> > run() >>>>>> > >>>>>> > === requirements.txt === >>>>>> > alembic >>>>>> > >>>>>> > When I run the program: >>>>>> > $ python3 main.py >>>>>> > >>>>>> --runner=PortableRunner --job_endpoint=embed >>>>>> --requirements_file=requirements.txt >>>>>> > >>>>>> > >>>>>> > I get some normal output and then: >>>>>> > >>>>>> > >>>>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b' >>>>>> > File >>>>>> > >>>>>> >>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>>>> > line 261, in unpack_file\n >>>>>> untar_file(filename, >>>>>> > location)\n File >>>>>> > >>>>>> >>>>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>>>> > line 177, in untar_file\n tar = >>>>>> > tarfile.open(filename, mode)\n File >>>>>> > "/usr/local/lib/python3.7/tarfile.py", line >>>>>> 1591, in >>>>>> > open\n return func(name, filemode, fileobj, >>>>>> > **kwargs)\n File >>>>>> > "/usr/local/lib/python3.7/tarfile.py", line >>>>>> 1648, in >>>>>> > gzopen\n raise ReadError("not a gzip >>>>>> > file")\ntarfile.ReadError: not a gzip >>>>>> > file\n2020/08/08 01:17:07 Failed to install >>>>>> required >>>>>> > packages: failed to install requirements: exit >>>>>> > status 2\n' >>>>>> > >>>>>> > This greatly puzzled me and, after some >>>>>> looking, I >>>>>> > found something really surprising. Here is the >>>>>> > package in the /directory to be staged/: >>>>>> > >>>>>> > $ file >>>>>> > >>>>>> >>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>>>> > ...: gzip compressed data, was >>>>>> > "dist/alembic-1.4.2.tar", last modified: Thu >>>>>> Mar 19 >>>>>> > 21:48:31 2020, max compression, original size >>>>>> modulo >>>>>> > 2^32 4730880 >>>>>> > $ ls -l >>>>>> > >>>>>> >>>>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>>>> > -rw-r--r-- 1 jkff staff 1092045 Aug 7 16:56 >>>>>> ... >>>>>> > >>>>>> > So far so good. But here is the same file >>>>>> inside the >>>>>> > Docker container (I ssh'd into the dead >>>>>> container >>>>>> > < >>>>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers >>>>>> >): >>>>>> > >>>>>> > # file /tmp/staged/alembic-1.4.2.tar.gz >>>>>> > /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar >>>>>> archive >>>>>> > (GNU) >>>>>> > # ls -l /tmp/staged/alembic-1.4.2.tar.gz >>>>>> > -rwxr-xr-x 1 root root 4730880 Aug 8 01:17 >>>>>> > /tmp/staged/alembic-1.4.2.tar.gz >>>>>> > >>>>>> > The file has clearly been unzipped and now of >>>>>> course >>>>>> > pip can't install it! What's going on here? Am I >>>>>> > using the direct/portable runner combination >>>>>> wrong? >>>>>> > >>>>>> > Thanks! >>>>>> > >>>>>> > -- >>>>>> > Eugene Kirpichov >>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Eugene Kirpichov >>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Eugene Kirpichov >>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>> > >>>>>> > >>>>>> > >>>>>> > -- >>>>>> > Eugene Kirpichov >>>>>> > http://www.linkedin.com/in/eugenekirpichov >>>>>> >>>>> >>>>> >>>>> -- >>>>> Eugene Kirpichov >>>>> http://www.linkedin.com/in/eugenekirpichov >>>>> >>>> >>>> >>>> -- >>>> Eugene Kirpichov >>>> http://www.linkedin.com/in/eugenekirpichov >>>> >>> >>> >>> -- >>> Eugene Kirpichov >>> http://www.linkedin.com/in/eugenekirpichov >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> > > > -- > Eugene Kirpichov > http://www.linkedin.com/in/eugenekirpichov > -- Eugene Kirpichov http://www.linkedin.com/in/eugenekirpichov