I don't believe you need to register a coder unless you create your own coder class.
Since the coder has a proto_message_type field, it could return that in to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area. As a temporary workaround, please try overriding the output type like so: Map(parse_message_blobs).with_output_types(typing.Any) That should prevent Beam from attempting to use the ProtoCoder. On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette <bhule...@google.com> wrote: > +Udi Meiri <eh...@google.com> - do you have any suggestions to resolve > this? > > It looks like there are a couple things going wrong: > - ProtoCoder doesn't have a definition for to_type_hint > - DataflowRunner calls from_runner_api, which I believe is a workaround we > can eventually remove (+Robert Bradshaw <rober...@google.com>), which in > turn tries to get type hints for every coder in the pipeline > > Brian > > On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <lien.michi...@froomle.com> > wrote: > >> Hi Brian, >> >> Thanks so much for your quick response! >> >> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the >> exact same error. Here is the full stacktrace: >> >> (metadata-persistor) ➜ metadata-persistor git:(feature/DEV-1249) ✗ >> metadata_persistor --project XXXXX --environments XXXXX --window_size 1 >> --input_subscription >> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription >> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp >> --staging_location gs://XXXXXXXXXXX/item-metadata/staging >> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431: >> BeamDeprecationWarning: options is deprecated since First stable release. >> References to <pipeline>.options will not be supported >> experiments = p.options.view_as(DebugOptions).experiments or [] >> WARNING:root:Make sure that locally built Python SDK docker image has >> Python 3.7 interpreter. >> Traceback (most recent call last): >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor", >> line 8, in <module> >> sys.exit(run()) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py", >> line 246, in run >> batch_size=500, >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 524, in __exit__ >> self.run().wait_until_finish() >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 497, in run >> self._options).run(False) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 510, in run >> return self.runner.run_pipeline(self, self._options) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", >> line 484, in run_pipeline >> allow_proto_holders=True) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 858, in from_runner_api >> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 103, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1238, in from_runner_api >> part = context.transforms.get_by_id(transform_id) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 103, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1244, in from_runner_api >> id in proto.outputs.items() >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", >> line 1244, in <dictcomp> >> id in proto.outputs.items() >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 103, in get_by_id >> self._id_to_proto[id], self._pipeline_context) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py", >> line 214, in from_runner_api >> element_type=context.element_type_from_coder_id(proto.coder_id), >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", >> line 227, in element_type_from_coder_id >> self.coders[coder_id].to_type_hint()) >> File >> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py", >> line 221, in to_type_hint >> raise NotImplementedError('BEAM-2717') >> NotImplementedError: BEAM-2717 >> >> When I was debugging and commenting out the different steps, I noticed >> the location in my code that supposedly throws the error changes. Here it >> complains about the WriteToBigQuery step (batch_size=500) but if I comment >> out that step it just moves on to the one above. It appears it's >> consistently thrown on the last run step (don't know if that's helpful, >> just thought I'd mention it). >> >> After adding beam.typehints.disable_type_annotations() it still throws >> the same error. >> >> Another thing I forgot to mention in my first email is that I registered >> a ProtoCoder as suggested at the bottom of this page ( >> https://beam.apache.org/documentation/sdks/python-type-safety/) as: >> >> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder) >> >> Thanks again, really appreciate your help! >> Lien >> >> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com> wrote: >> >>> Hi Lien, >>> >>> > First time writing the email list, so please tell me if I'm doing >>> this all wrong. >>> Not at all! This is exactly the kind of question this list is for >>> >>> I have a couple of questions that may help us debug: >>> - Can you share the full stacktrace? >>> - What version of Beam are you using? >>> >>> There were some changes to the way we use typehints in the most recent >>> Beam release (2.21) that might be causing this [1]. If you're using 2.21 >>> could you try reverting to the old behavior (call >>> `apache_beam.typehints.disable_type_annotations()` before constructing the >>> pipeline) to see if that helps? >>> >>> Thanks, >>> Brian >>> >>> [1] https://beam.apache.org/blog/python-typing/ >>> >>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com> >>> wrote: >>> >>>> >>>> Hi everyone, >>>> >>>> First time writing the email list, so please tell me if I'm doing this >>>> all wrong. >>>> >>>> I'm building a streaming pipeline to be run on the DataflowRunner that >>>> reads from PubSub and writes to BQ using the Python 3 SDK. >>>> >>>> I can get the pipeline started fine with the DirectRunner, but as soon >>>> as I try to deploy to DataFlow it throws the following error: >>>> >>>> File >>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line >>>> 221, in to_type_hint >>>> raise NotImplementedError('BEAM-2717') >>>> >>>> I've tried narrowing down what exactly could be causing the issue and >>>> it appears to be caused by the second step in my pipeline, which transforms >>>> the bytes read from PubSub to my own internal Proto format: >>>> >>>> def parse_message_blobs(x: bytes) -> ActionWrapper: >>>> action_wrapper = ActionWrapper() >>>> action_wrapper.ParseFromString(x) >>>> >>>> return action_wrapper >>>> >>>> which is applied as a Map step. >>>> >>>> I've added typehints to all downstream steps as follows: >>>> def partition_by_environment( >>>> x: ActionWrapper, num_partitions: int, environments: List[str] >>>> ) -> int: >>>> >>>> I'd really appreciate it if anyone could let me know what I'm doing >>>> wrong, or what exactly is the issue this error is referring to. I read the >>>> JIRA ticket, but did not understand how it is related to my issue here. >>>> >>>> Thanks! >>>> Kind regards, >>>> Lien >>>> >>>
smime.p7s
Description: S/MIME Cryptographic Signature