FWIW I sent a PR to fix this https://github.com/apache/beam/pull/12571
However, I'm not up to date on the portable test infrastructure and would appreciate guidance on what tests I can add for this. On Tue, Aug 11, 2020 at 5:28 PM Eugene Kirpichov <ekirpic...@gmail.com> wrote: > (FYI Sam +sbrot...@gmail.com <sbrot...@gmail.com>) > > On Tue, Aug 11, 2020 at 5:00 PM Eugene Kirpichov <ekirpic...@gmail.com> > wrote: > >> Ok I found the bug, and now I don't understand how it could have possibly >> ever worked. And if this was never tested, then I don't understand why it >> works after fixing this one bug :) >> >> Basically the Python ArtifactStaging/RetrievalService uses >> FileSystems.open() to read the artifacts to be staged, and >> FileSystems.open() by default decompresses compressed files based on their >> extension. >> I found two of such services - in Python and in Java. Is the Python used >> with an embedded job endpoint and the java one otherwise? I haven't >> inspected the Java one, but fixing Python does the trick. >> >> The fix is this patch: >> >> diff --git >> a/sdks/python/apache_beam/runners/portability/artifact_service.py >> b/sdks/python/apache_beam/runners/portability/artifact_service.py >> index f2bbf534c3..1f3ec1c0b0 100644 >> --- a/sdks/python/apache_beam/runners/portability/artifact_service.py >> +++ b/sdks/python/apache_beam/runners/portability/artifact_service.py >> @@ -41,6 +41,7 @@ import grpc >> from future.moves.urllib.request import urlopen >> >> from apache_beam.io import filesystems >> +from apache_beam.io.filesystems import CompressionTypes >> from apache_beam.portability import common_urns >> from apache_beam.portability.api import beam_artifact_api_pb2 >> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >> @@ -263,7 +264,8 @@ class BeamFilesystemHandler(object): >> self._root = root >> >> def file_reader(self, path): >> - return filesystems.FileSystems.open(path) >> + return filesystems.FileSystems.open( >> + path, compression_type=CompressionTypes.UNCOMPRESSED) >> >> def file_writer(self, name=None): >> full_path = filesystems.FileSystems.join(self._root, name) >> diff --git >> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >> index 5bf3282250..2684235be0 100644 >> --- >> a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >> +++ >> b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py >> @@ -45,6 +45,7 @@ from typing import overload >> import grpc >> >> from apache_beam.io import filesystems >> +from apache_beam.io.filesystems import CompressionTypes >> from apache_beam.portability import common_urns >> from apache_beam.portability import python_urns >> from apache_beam.portability.api import beam_artifact_api_pb2_grpc >> @@ -464,9 +465,13 @@ class GrpcServer(object): >> self.provision_info.provision_info, worker_manager), >> self.control_server) >> >> + def open_uncompressed(f): >> + return filesystems.FileSystems.open( >> + f, compression_type=CompressionTypes.UNCOMPRESSED) >> + >> >> beam_artifact_api_pb2_grpc.add_ArtifactRetrievalServiceServicer_to_server( >> artifact_service.ArtifactRetrievalService( >> - file_reader=filesystems.FileSystems.open), >> + file_reader=open_uncompressed), >> self.control_server) >> >> self.data_plane_handler = data_plane.BeamFnDataServicer( >> >> >> >> On Tue, Aug 11, 2020 at 1:43 PM Eugene Kirpichov <ekirpic...@gmail.com> >> wrote: >> >>> Hi Maximilian, >>> >>> Thank you - it works fine with the embedded Flink runner (per below, >>> seems like it's not using Docker for running Python code? What is it using >>> then?). >>> >>> However, the original bug appears to be wider than I thought - it is >>> also present if I run --runner=FlinkRunner --environment_type=DOCKER. Seems >>> like something is very broken in local Docker execution in general - I >>> haven't yet verified whether the same error will happen when running on a >>> remote Flink cluster. >>> >>> Trying to build my own SDK containers with some more debugging so I can >>> figure out what's going on... >>> >>> >>> On Tue, Aug 11, 2020 at 7:53 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >>>> Looks like you ran into a bug. >>>> >>>> You could just run your program without specifying any arguments, since >>>> running with Python's FnApiRunner should be enough. >>>> >>>> Alternatively, how about trying to run the same pipeline with the >>>> FlinkRunner? Use: --runner=FlinkRunner and do not specify an endpoint. >>>> It will run the Python code embedded (loopback environment) without >>>> additional containers. >>>> >>>> Cheers, >>>> Max >>>> >>>> On 10.08.20 21:59, Eugene Kirpichov wrote: >>>> > Thanks Valentyn! >>>> > >>>> > Good to know that this is a bug (I'll file a bug), and that Dataflow >>>> has >>>> > an experimental way to use custom containers. I'll try that. >>>> > >>>> > On Mon, Aug 10, 2020 at 12:51 PM Valentyn Tymofieiev >>>> > <valen...@google.com <mailto:valen...@google.com>> wrote: >>>> > >>>> > Hi Eugene, >>>> > >>>> > Good to hear from you. The experience you are describing on >>>> Portable >>>> > Runner + Docker container in local execution mode is most >>>> certainly >>>> > a bug, if you have not opened an issue on it, please do so and >>>> feel >>>> > free to cc me. >>>> > >>>> > I can also reproduce the bug and likewise didn't see anything >>>> > obvious immediately, this needs some debugging. >>>> > >>>> > cc: +Ankur Goenka <mailto:goe...@google.com> +Kyle Weaver >>>> > <mailto:kcwea...@google.com> who recently worked on Portable >>>> Runner >>>> > and may be interested. >>>> > >>>> > By the way, you should be able to use custom containers with >>>> > Dataflow, if you set --experiments=use_runner_v2. >>>> > >>>> > On Mon, Aug 10, 2020 at 9:06 AM Eugene Kirpichov >>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>>> > >>>> > (cc'ing Sam with whom I'm working on this atm) >>>> > >>>> > FWIW I'm still stumped. I've looked through Python, Go and >>>> Java >>>> > code in the Beam repo having anything to do with >>>> > gzipping/unzipping, and none of it appears to be used in the >>>> > artifact staging/retrieval codepaths. I also can't find any >>>> > mention of compression/decompression in the container boot >>>> code. >>>> > My next step will be to add a bunch of debugging, rebuild the >>>> > containers, and see what the artifact services think they're >>>> > serving. >>>> > >>>> > >>>> > On Fri, Aug 7, 2020 at 6:47 PM Eugene Kirpichov >>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> wrote: >>>> > >>>> > Thanks Austin! Good stuff - though note that I am >>>> > /not/ using custom containers, I'm just trying to get the >>>> > basic stuff to work, a Python pipeline with a simple >>>> > requirements.txt file. Feels like this should work >>>> > out-of-the-box, I must be doing something wrong. >>>> > >>>> > On Fri, Aug 7, 2020 at 6:38 PM Austin Bennett >>>> > <whatwouldausti...@gmail.com >>>> > <mailto:whatwouldausti...@gmail.com>> wrote: >>>> > >>>> > I only believe @OrielResearch Eila Arich-Landkof >>>> > <mailto:e...@orielresearch.org> potentially doing >>>> > applied work with custom containers (there must be >>>> others)! >>>> > >>>> > For a plug for her and @BeamSummit -- I think enough >>>> > related will be talked about in (with Conda specifics) >>>> > --> >>>> > >>>> https://2020.beamsummit.org/sessions/workshop-using-conda-on-beam/ >>>> > >>>> > I'm sure others will have more things to say that are >>>> > actually helpful, on-list, before that occurs (~3 >>>> weeks). >>>> > >>>> > >>>> > >>>> > On Fri, Aug 7, 2020 at 6:32 PM Eugene Kirpichov >>>> > <ekirpic...@gmail.com <mailto:ekirpic...@gmail.com>> >>>> wrote: >>>> > >>>> > Hi old Beam friends, >>>> > >>>> > I left Google to work on climate change >>>> > < >>>> https://www.linkedin.com/posts/eugenekirpichov_i-am-leaving-google-heres-a-snipped-to-activity-6683408492444962816-Mw5U >>>> > >>>> > and am now doing a short engagement with Pachama >>>> > <https://pachama.com/>. Right now I'm trying to >>>> get >>>> > a Beam Python pipeline to work; the pipeline will >>>> > use fancy requirements and native dependencies, >>>> and >>>> > we plan to run it on Cloud Dataflow (so custom >>>> > containers are not yet an option), so I'm going >>>> > straight for the direct PortableRunner as per >>>> > >>>> https://beam.apache.org/documentation/runtime/environments/. >>>> > >>>> > Basically I can't get a minimal Beam program with >>>> a >>>> > minimal requirements.txt file to work - the >>>> .tar.gz >>>> > of the dependency mysteriously ends up being >>>> > ungzipped and non-installable inside the Docker >>>> > container running the worker. Details below. >>>> > >>>> > === main.py === >>>> > import argparse >>>> > import logging >>>> > >>>> > import apache_beam as beam >>>> > from apache_beam.options.pipeline_options import >>>> > PipelineOptions >>>> > from apache_beam.options.pipeline_options import >>>> > SetupOptions >>>> > >>>> > def run(argv=None): >>>> > parser = argparse.ArgumentParser() >>>> > known_args, pipeline_args = >>>> > parser.parse_known_args(argv) >>>> > >>>> > pipeline_options = >>>> PipelineOptions(pipeline_args) >>>> > >>>> > >>>> pipeline_options.view_as(SetupOptions).save_main_session >>>> > = True >>>> > >>>> > with beam.Pipeline(options=pipeline_options) >>>> as p: >>>> > (p | 'Create' >> beam.Create(['Hello']) >>>> > | 'Write' >> >>>> beam.io.WriteToText('/tmp')) >>>> > >>>> > >>>> > if __name__ == '__main__': >>>> > logging.getLogger().setLevel(logging.INFO) >>>> > run() >>>> > >>>> > === requirements.txt === >>>> > alembic >>>> > >>>> > When I run the program: >>>> > $ python3 main.py >>>> > >>>> --runner=PortableRunner --job_endpoint=embed >>>> --requirements_file=requirements.txt >>>> > >>>> > >>>> > I get some normal output and then: >>>> > >>>> > >>>> INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:b' >>>> > File >>>> > >>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>> > line 261, in unpack_file\n untar_file(filename, >>>> > location)\n File >>>> > >>>> "/usr/local/lib/python3.7/site-packages/pip/_internal/utils/unpacking.py", >>>> > line 177, in untar_file\n tar = >>>> > tarfile.open(filename, mode)\n File >>>> > "/usr/local/lib/python3.7/tarfile.py", line 1591, >>>> in >>>> > open\n return func(name, filemode, fileobj, >>>> > **kwargs)\n File >>>> > "/usr/local/lib/python3.7/tarfile.py", line 1648, >>>> in >>>> > gzopen\n raise ReadError("not a gzip >>>> > file")\ntarfile.ReadError: not a gzip >>>> > file\n2020/08/08 01:17:07 Failed to install >>>> required >>>> > packages: failed to install requirements: exit >>>> > status 2\n' >>>> > >>>> > This greatly puzzled me and, after some looking, I >>>> > found something really surprising. Here is the >>>> > package in the /directory to be staged/: >>>> > >>>> > $ file >>>> > >>>> >>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>> > ...: gzip compressed data, was >>>> > "dist/alembic-1.4.2.tar", last modified: Thu Mar >>>> 19 >>>> > 21:48:31 2020, max compression, original size >>>> modulo >>>> > 2^32 4730880 >>>> > $ ls -l >>>> > >>>> >>>> /var/folders/07/j09mnhmd2q9_kw40xrbfvcg80000gn/T/dataflow-requirements-cache/alembic-1.4.2.tar.gz >>>> > -rw-r--r-- 1 jkff staff 1092045 Aug 7 16:56 >>>> ... >>>> > >>>> > So far so good. But here is the same file inside >>>> the >>>> > Docker container (I ssh'd into the dead container >>>> > < >>>> https://thorsten-hans.com/how-to-run-commands-in-stopped-docker-containers >>>> >): >>>> > >>>> > # file /tmp/staged/alembic-1.4.2.tar.gz >>>> > /tmp/staged/alembic-1.4.2.tar.gz: POSIX tar >>>> archive >>>> > (GNU) >>>> > # ls -l /tmp/staged/alembic-1.4.2.tar.gz >>>> > -rwxr-xr-x 1 root root 4730880 Aug 8 01:17 >>>> > /tmp/staged/alembic-1.4.2.tar.gz >>>> > >>>> > The file has clearly been unzipped and now of >>>> course >>>> > pip can't install it! What's going on here? Am I >>>> > using the direct/portable runner combination >>>> wrong? >>>> > >>>> > Thanks! >>>> > >>>> > -- >>>> > Eugene Kirpichov >>>> > http://www.linkedin.com/in/eugenekirpichov >>>> > >>>> > >>>> > >>>> > -- >>>> > Eugene Kirpichov >>>> > http://www.linkedin.com/in/eugenekirpichov >>>> > >>>> > >>>> > >>>> > -- >>>> > Eugene Kirpichov >>>> > http://www.linkedin.com/in/eugenekirpichov >>>> > >>>> > >>>> > >>>> > -- >>>> > Eugene Kirpichov >>>> > http://www.linkedin.com/in/eugenekirpichov >>>> >>> >>> >>> -- >>> Eugene Kirpichov >>> http://www.linkedin.com/in/eugenekirpichov >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> > > > -- > Eugene Kirpichov > http://www.linkedin.com/in/eugenekirpichov > -- Eugene Kirpichov http://www.linkedin.com/in/eugenekirpichov