+Udi Meiri <eh...@google.com> - do you have any suggestions to resolve this?
It looks like there are a couple things going wrong: - ProtoCoder doesn't have a definition for to_type_hint - DataflowRunner calls from_runner_api, which I believe is a workaround we can eventually remove (+Robert Bradshaw <rober...@google.com>), which in turn tries to get type hints for every coder in the pipeline Brian On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <lien.michi...@froomle.com> wrote: > Hi Brian, > > Thanks so much for your quick response! > > I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the > exact same error. Here is the full stacktrace: > > (metadata-persistor) ➜ metadata-persistor git:(feature/DEV-1249) ✗ > metadata_persistor --project XXXXX --environments XXXXX --window_size 1 > --input_subscription > projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription > --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp > --staging_location gs://XXXXXXXXXXX/item-metadata/staging > /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431: > BeamDeprecationWarning: options is deprecated since First stable release. > References to <pipeline>.options will not be supported > experiments = p.options.view_as(DebugOptions).experiments or [] > WARNING:root:Make sure that locally built Python SDK docker image has > Python 3.7 interpreter. > Traceback (most recent call last): > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor", > line 8, in <module> > sys.exit(run()) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py", > line 246, in run > batch_size=500, > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 524, in __exit__ > self.run().wait_until_finish() > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 497, in run > self._options).run(False) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 510, in run > return self.runner.run_pipeline(self, self._options) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", > line 484, in run_pipeline > allow_proto_holders=True) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 858, in from_runner_api > p.transforms_stack = [context.transforms.get_by_id(root_transform_id)] > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1238, in from_runner_api > part = context.transforms.get_by_id(transform_id) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1244, in from_runner_api > id in proto.outputs.items() > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py", > line 1244, in <dictcomp> > id in proto.outputs.items() > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 103, in get_by_id > self._id_to_proto[id], self._pipeline_context) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py", > line 214, in from_runner_api > element_type=context.element_type_from_coder_id(proto.coder_id), > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py", > line 227, in element_type_from_coder_id > self.coders[coder_id].to_type_hint()) > File > "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py", > line 221, in to_type_hint > raise NotImplementedError('BEAM-2717') > NotImplementedError: BEAM-2717 > > When I was debugging and commenting out the different steps, I noticed the > location in my code that supposedly throws the error changes. Here it > complains about the WriteToBigQuery step (batch_size=500) but if I comment > out that step it just moves on to the one above. It appears it's > consistently thrown on the last run step (don't know if that's helpful, > just thought I'd mention it). > > After adding beam.typehints.disable_type_annotations() it still throws > the same error. > > Another thing I forgot to mention in my first email is that I registered a > ProtoCoder as suggested at the bottom of this page ( > https://beam.apache.org/documentation/sdks/python-type-safety/) as: > > beam.coders.registry.register_coder(ActionWrapper, ProtoCoder) > > Thanks again, really appreciate your help! > Lien > > On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com> wrote: > >> Hi Lien, >> >> > First time writing the email list, so please tell me if I'm doing this >> all wrong. >> Not at all! This is exactly the kind of question this list is for >> >> I have a couple of questions that may help us debug: >> - Can you share the full stacktrace? >> - What version of Beam are you using? >> >> There were some changes to the way we use typehints in the most recent >> Beam release (2.21) that might be causing this [1]. If you're using 2.21 >> could you try reverting to the old behavior (call >> `apache_beam.typehints.disable_type_annotations()` before constructing the >> pipeline) to see if that helps? >> >> Thanks, >> Brian >> >> [1] https://beam.apache.org/blog/python-typing/ >> >> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com> >> wrote: >> >>> >>> Hi everyone, >>> >>> First time writing the email list, so please tell me if I'm doing this >>> all wrong. >>> >>> I'm building a streaming pipeline to be run on the DataflowRunner that >>> reads from PubSub and writes to BQ using the Python 3 SDK. >>> >>> I can get the pipeline started fine with the DirectRunner, but as soon >>> as I try to deploy to DataFlow it throws the following error: >>> >>> File >>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line >>> 221, in to_type_hint >>> raise NotImplementedError('BEAM-2717') >>> >>> I've tried narrowing down what exactly could be causing the issue and it >>> appears to be caused by the second step in my pipeline, which transforms >>> the bytes read from PubSub to my own internal Proto format: >>> >>> def parse_message_blobs(x: bytes) -> ActionWrapper: >>> action_wrapper = ActionWrapper() >>> action_wrapper.ParseFromString(x) >>> >>> return action_wrapper >>> >>> which is applied as a Map step. >>> >>> I've added typehints to all downstream steps as follows: >>> def partition_by_environment( >>> x: ActionWrapper, num_partitions: int, environments: List[str] >>> ) -> int: >>> >>> I'd really appreciate it if anyone could let me know what I'm doing >>> wrong, or what exactly is the issue this error is referring to. I read the >>> JIRA ticket, but did not understand how it is related to my issue here. >>> >>> Thanks! >>> Kind regards, >>> Lien >>> >>