(FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>) On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com> wrote:
> Ok I found the bug, and now I don't understand how it could have possibly > ever worked. And if this was never tested, then I don't understand why it > works after fixing this one bug :) > > Basically the Python ArtifactStaging/RetrievalService uses > FileSystems.open() to read the artifacts to be staged, and > FileSystems.open() by default decompresses compressed files based on their > extension. > I found two of such services - in Python and in Java. Is the Python used > with an embedded job endpoint and the java one otherwise? I haven't > inspected the Java one, but fixing Python does the trick. > > The fix is this patch: > > diff --git > a/sdks/python/apache_beam/runners/portability/artifact_service.py > b/sdks/python/apache_beam/runners/portability/artifact_service.py > index f2bbf534c3..1f3ec1c0b0 100644 > --- a/sdks/python/apache_beam/runners/portability/artifact_service.py > +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py > @@ -41,6 +41,7 @@ import grpc > from future.moves.urllib.request import urlopen > > from apache_beam.io import filesystems > +from apache_beam.io.filesystems import CompressionTypes > from apache_beam.portability import common_urns > from apache_beam.portability.api import beam_artifact_api_pb2 > from apache_beam.portability.api import beam_artifact_api_pb2_grpc > @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object): > self._root = root > > def file_reader(self, path): > - return filesystems.FileSystems.open(path) > + return filesystems.FileSystems.open( > + path, compression_type=CompressionTypes.UNCOMPRESSED) > > def file_writer(self, name=None): > full_path = filesystems.FileSystems.join(self._root, name) > diff --git > a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py > b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py > index 5bf3282250..2684235be0 100644 > --- > a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py > +++ > b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py > @@ -45,6 +45,7 @@ from typing import overload > import grpc > > from apache_beam.io import filesystems > +from apache_beam.io.filesystems import CompressionTypes > from apache_beam.portability import common_urns > from apache_beam.portability import python_urns > from apache_beam.portability.api import beam_artifact_api_pb2_grpc > @@ -464,9 +465,13 @@ class GrpcServer(object): > self.provision_info.provision_info, worker_manager), > self.control_server) > > + def open_uncompressed(f): > + return filesystems.FileSystems.open( > + f, compression_type=CompressionTypes.UNCOMPRESSED) > + > > beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( > artifact_service.ArtifactRetrievalService( > - file_reader=filesystems.FileSystems.open), > + file_reader=open_uncompressed), > self.control_server) > > self.data_plane_handler = data_plane.BeamFnDataServicer( > > > > On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com> > wrote: > >> Hi Maximilian, >> >> Thank you - it works fine with the embedded Flink runner (per below, >> seems like it's not using Docker for running Python code? What is it using >> then?). >> >> However, the original bug appears to be wider than I thought - it is also >> present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems like >> something is very broken in local Docker execution in general - I haven't >> yet verified whether the same error will happen when running on a remote >> Flink cluster. >> >> Trying to build my own SDK containers with some more debugging so I can >> figure out what's going on... >> >> >> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org> >> wrote: >> >>> Looks like you ran into a bug. >>> >>> You could just run your program without specifying any arguments, since >>> running with Python's FnApiRunner should be enough. >>> >>> Alternatively, how about trying to run the same pipeline with the >>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint. >>> It will run the Python code embedded (loopback environment) without >>> additional containers. >>> >>> Cheers, >>> Max >>> >>> On 10.08.20 21:59, Eugene Kirpichov wrote: >>> > Thanks Valentyn! >>> > >>> > Good to know that this is a bug (I'll file a bug), and that Dataflow >>> has >>> > an experimental way to use custom containers. I'll try that. >>> > >>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev >>> > <valen...@google.com <mailto:valen...@google.com>> wrote: >>> > >>> > Hi Eugene, >>> > >>> > Good to hear from you. The experience you are describing on >>> Portable >>> > Runner + Docker container in local execution mode is most certainly >>> > a bug, if you have not opened an issue on it, please do so and feel >>> > free to cc me. >>> > >>> > I can also reproduce the bug and likewise didn't see anything >>> > obvious immediately, this needs some debugging. >>> > >>> > cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver >>> > <mailto:kcwea...@google.com> who recently worked on Portable >>> Runner >>> > and may be interested. >>> > >>> > By the way, you should be able to use custom containers with >>> > Dataflow, if you set --experiments=use_runner_v2. >>> > >>> > On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov >>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>> > >>> > (cc'ing Sam with whom I'm working on this atm) >>> > >>> > FWIW I'm still stumped. I've looked through Python, Go and Java >>> > code in the Beam repo having anything to do with >>> > gzipping/unzipping, and none of it appears to be used in the >>> > artifact staging/retrieval codepaths. I also can't find any >>> > mention of compression/decompression in the container boot >>> code. >>> > My next step will be to add a bunch of debugging, rebuild the >>> > containers, and see what the artifact services think they're >>> > serving. >>> > >>> > >>> > On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov >>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>> > >>> > Thanks Austin! Good stuff - though note that I am >>> > /not/ using custom containers, I'm just trying to get the >>> > basic stuff to work, a Python pipeline with a simple >>> > requirements.txt file. Feels like this should work >>> > out-of-the-box, I must be doing something wrong. >>> > >>> > On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett >>> > <whatwouldausti...@gmail.com >>> > <mailto:whatwouldausti...@gmail.com>> wrote: >>> > >>> > I only believe @OrielResearch Eila Arich-Landkof >>> > <mailto:e...@orielresearch.org> potentially doing >>> > applied work with custom containers (there must be >>> others)! >>> > >>> > For a plug for her and @BeamSummit -- I think enough >>> > related will be talked about in (with Conda specifics) >>> > --> >>> > >>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/ >>> > >>> > I'm sure others will have more things to say that are >>> > actually helpful, on-list, before that occurs (~3 >>> weeks). >>> > >>> > >>> > >>> > On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov >>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> >>> wrote: >>> > >>> > Hi old Beam friends, >>> > >>> > I left Google to work on climate change >>> > < >>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U >>> > >>> > and am now doing a short engagement with Pachama >>> > <https://pachama.com/>. Right now I'm trying to >>> get >>> > a Beam Python pipeline to work; the pipeline will >>> > use fancy requirements and native dependencies, and >>> > we plan to run it on Cloud Dataflow (so custom >>> > containers are not yet an option), so I'm going >>> > straight for the direct PortableRunner as per >>> > >>> https://beam.apache.org/documentation/runtime/environments/. >>> > >>> > Basically I can't get a minimal Beam program with a >>> > minimal requirements.txt file to work - the .tar.gz >>> > of the dependency mysteriously ends up being >>> > ungzipped and non-installable inside the Docker >>> > container running the worker. Details below. >>> > >>> > === main.py === >>> > import argparse >>> > import logging >>> > >>> > import apache_beam as beam >>> > from apache_beam.options.pipeline_options import >>> > PipelineOptions >>> > from apache_beam.options.pipeline_options import >>> > SetupOptions >>> > >>> > def run(argv=None): >>> > parser = argparse.ArgumentParser() >>> > known_args, pipeline_args = >>> > parser.parse_known_args(argv) >>> > >>> > pipeline_options = >>> PipelineOptions(pipeline_args) >>> > >>> > >>> pipeline_options.view_as(SetupOptions).save_main_session >>> > = True >>> > >>> > with beam.Pipeline(options=pipeline_options) >>> as p: >>> > (p | 'Create' >> beam.Create(['Hello']) >>> > | 'Write' >> >>> beam.io.WriteToText('/tmp')) >>> > >>> > >>> > if __name__ == '__main__': >>> > logging.getLogger().setLevel(logging.INFO) >>> > run() >>> > >>> > === requirements.txt === >>> > alembic >>> > >>> > When I run the program: >>> > $ python3 main.py >>> > >>> --runner=PortableRunner --job_endpoint=embed >>> --requirements_file=requirements.txt >>> > >>> > >>> > I get some normal output and then: >>> > >>> > >>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b' >>> > File >>> > >>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>> > line 261, in unpack_file\n untar_file(filename, >>> > location)\n File >>> > >>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>> > line 177, in untar_file\n tar = >>> > tarfile.open(filename, mode)\n File >>> > "/usr/local/lib/python3.7/tarfile.py", line 1591, >>> in >>> > open\n return func(name, filemode, fileobj, >>> > **kwargs)\n File >>> > "/usr/local/lib/python3.7/tarfile.py", line 1648, >>> in >>> > gzopen\n raise ReadError("not a gzip >>> > file")\ntarfile.ReadError: not a gzip >>> > file\n2020/08/08 01:17:07 Failed to install >>> required >>> > packages: failed to install requirements: exit >>> > status 2\n' >>> > >>> > This greatly puzzled me and, after some looking, I >>> > found something really surprising. Here is the >>> > package in the /directory to be staged/: >>> > >>> > $ file >>> > >>> >>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>> > ...: gzip compressed data, was >>> > "dist/alembic-1.4.2.tar", last modified: Thu Mar 19 >>> > 21:48:31 2020, max compression, original size >>> modulo >>> > 2^32 4730880 >>> > $ ls -l >>> > >>> >>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>> > -rw-r--r-- 1 jkff staff 1092045 Aug 7 16:56 ... >>> > >>> > So far so good. But here is the same file inside >>> the >>> > Docker container (I ssh'd into the dead container >>> > < >>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers >>> >): >>> > >>> > # file /tmp/staged/alembic-1.4.2.tar.gz >>> > /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar archive >>> > (GNU) >>> > # ls -l /tmp/staged/alembic-1.4.2.tar.gz >>> > -rwxr-xr-x 1 root root 4730880 Aug 8 01:17 >>> > /tmp/staged/alembic-1.4.2.tar.gz >>> > >>> > The file has clearly been unzipped and now of >>> course >>> > pip can't install it! What's going on here? Am I >>> > using the direct/portable runner combination wrong? >>> > >>> > Thanks! >>> > >>> > -- >>> > Eugene Kirpichov >>> > http://www.linkedin.com/in/eugenekirpichov >>> > >>> > >>> > >>> > -- >>> > Eugene Kirpichov >>> > http://www.linkedin.com/in/eugenekirpichov >>> > >>> > >>> > >>> > -- >>> > Eugene Kirpichov >>> > http://www.linkedin.com/in/eugenekirpichov >>> > >>> > >>> > >>> > -- >>> > Eugene Kirpichov >>> > http://www.linkedin.com/in/eugenekirpichov >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> > > > -- > Eugene Kirpichov > http://www.linkedin.com/in/eugenekirpichov > -- Eugene Kirpichov http://www.linkedin.com/in/eugenekirpichov