Hi Udi, Brian, Thanks so much 🙌 I'm now able to deploy the pipeline.
Lien On Thu, 11 Jun 2020 at 00:02, Udi Meiri <eh...@google.com> wrote: > I don't believe you need to register a coder unless you create your own > coder class. > > Since the coder has a proto_message_type field, it could return that in > to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area. > > As a temporary workaround, please try overriding the output type like so: > Map(parse_message_blobs).with_output_types(typing.Any) > > That should prevent Beam from attempting to use the ProtoCoder. > > On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette <bhule...@google.com> wrote: > >> +Udi Meiri <eh...@google.com> - do you have any suggestions to resolve >> this? >> >> It looks like there are a couple things going wrong: >> - ProtoCoder doesn't have a definition for to_type_hint >> - DataflowRunner calls from_runner_api, which I believe is a workaround >> we can eventually remove (+Robert Bradshaw <rober...@google.com>), which >> in turn tries to get type hints for every coder in the pipeline >> >> Brian >> >> On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <lien.michi...@froomle.com> >> wrote: >> >>> Hi Brian, >>> >>> Thanks so much for your quick response! >>> >>> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the >>> exact same error. Here is the full stacktrace: >>> >>> (metadata-persistor) ➜ metadata-persistor git:(feature/DEV-1249) ✗ >>> metadata_persistor --project XXXXX --environments XXXXX --window_size 1 >>> --input_subscription >>> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription >>> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp >>> --staging_location gs://XXXXXXXXXXX/item-metadata/staging >>> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431: >>> BeamDeprecationWarning: options is deprecated since First stable release. >>> References to <pipeline>.options will not be supported >>> experiments = p.options.view_as(DebugOptions).experiments or [] >>> WARNING:root:Make sure that locally built Python SDK docker image has >>> Python 3.7 interpreter. >>> Traceback (most recent call last): >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor", >>> line 8, in <module> >>> sys.exit(run()) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py", >>> line 246, in run >>> batch_size=500, >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 524, in __exit__ >>> self.run().wait_until_finish() >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 497, in run >>> self._options).run(False) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 510, in run >>> return self.runner.run_pipeline(self, self._options) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", >>> line 484, in run_pipeline >>> allow_proto_holders=True) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 858, in from_runner_api >>> p.transforms_stack = >>> [context.transforms.get_by_id(root_transform_id)] >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 103, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1238, in from_runner_api >>> part = context.transforms.get_by_id(transform_id) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 103, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1244, in from_runner_api >>> id in proto.outputs.items() >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >>> line 1244, in <dictcomp> >>> id in proto.outputs.items() >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 103, in get_by_id >>> self._id_to_proto[id], self._pipeline_context) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py", >>> line 214, in from_runner_api >>> element_type=context.element_type_from_coder_id(proto.coder_id), >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >>> line 227, in element_type_from_coder_id >>> self.coders[coder_id].to_type_hint()) >>> File >>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py", >>> line 221, in to_type_hint >>> raise NotImplementedError('BEAM-2717') >>> NotImplementedError: BEAM-2717 >>> >>> When I was debugging and commenting out the different steps, I noticed >>> the location in my code that supposedly throws the error changes. Here it >>> complains about the WriteToBigQuery step (batch_size=500) but if I comment >>> out that step it just moves on to the one above. It appears it's >>> consistently thrown on the last run step (don't know if that's helpful, >>> just thought I'd mention it). >>> >>> After adding beam.typehints.disable_type_annotations() it still throws >>> the same error. >>> >>> Another thing I forgot to mention in my first email is that I registered >>> a ProtoCoder as suggested at the bottom of this page ( >>> https://beam.apache.org/documentation/sdks/python-type-safety/) as: >>> >>> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder) >>> >>> Thanks again, really appreciate your help! >>> Lien >>> >>> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com> >>> wrote: >>> >>>> Hi Lien, >>>> >>>> > First time writing the email list, so please tell me if I'm doing >>>> this all wrong. >>>> Not at all! This is exactly the kind of question this list is for >>>> >>>> I have a couple of questions that may help us debug: >>>> - Can you share the full stacktrace? >>>> - What version of Beam are you using? >>>> >>>> There were some changes to the way we use typehints in the most recent >>>> Beam release (2.21) that might be causing this [1]. If you're using 2.21 >>>> could you try reverting to the old behavior (call >>>> `apache_beam.typehints.disable_type_annotations()` before constructing the >>>> pipeline) to see if that helps? >>>> >>>> Thanks, >>>> Brian >>>> >>>> [1] https://beam.apache.org/blog/python-typing/ >>>> >>>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com> >>>> wrote: >>>> >>>>> >>>>> Hi everyone, >>>>> >>>>> First time writing the email list, so please tell me if I'm doing this >>>>> all wrong. >>>>> >>>>> I'm building a streaming pipeline to be run on the DataflowRunner that >>>>> reads from PubSub and writes to BQ using the Python 3 SDK. >>>>> >>>>> I can get the pipeline started fine with the DirectRunner, but as soon >>>>> as I try to deploy to DataFlow it throws the following error: >>>>> >>>>> File >>>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", >>>>> line >>>>> 221, in to_type_hint >>>>> raise NotImplementedError('BEAM-2717') >>>>> >>>>> I've tried narrowing down what exactly could be causing the issue and >>>>> it appears to be caused by the second step in my pipeline, which >>>>> transforms >>>>> the bytes read from PubSub to my own internal Proto format: >>>>> >>>>> def parse_message_blobs(x: bytes) -> ActionWrapper: >>>>> action_wrapper = ActionWrapper() >>>>> action_wrapper.ParseFromString(x) >>>>> >>>>> return action_wrapper >>>>> >>>>> which is applied as a Map step. >>>>> >>>>> I've added typehints to all downstream steps as follows: >>>>> def partition_by_environment( >>>>> x: ActionWrapper, num_partitions: int, environments: List[str] >>>>> ) -> int: >>>>> >>>>> I'd really appreciate it if anyone could let me know what I'm doing >>>>> wrong, or what exactly is the issue this error is referring to. I read the >>>>> JIRA ticket, but did not understand how it is related to my issue here. >>>>> >>>>> Thanks! >>>>> Kind regards, >>>>> Lien >>>>> >>>> -- Lien Michiels Data Scientist & Solutions Architect FROOMLE m: +32 483 71 87 36 w: froomle.ai e: lien.michi...@froomle.com <https://www.linkedin.com/in/lien-michiels/>