+Udi Meiri <eh...@google.com> - do you have any suggestions to resolve this?

It looks like there are a couple things going wrong:
- ProtoCoder doesn't have a definition for to_type_hint
- DataflowRunner calls from_runner_api, which I believe is a workaround we
can eventually remove (+Robert Bradshaw <rober...@google.com>), which in
turn tries to get type hints for every coder in the pipeline

Brian

On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <lien.michi...@froomle.com>
wrote:

> Hi Brian,
>
> Thanks so much for your quick response!
>
> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
> exact same error. Here is the full stacktrace:
>
> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
> metadata_persistor --project XXXXX --environments XXXXX --window_size 1
> --input_subscription
> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
> --staging_location gs://XXXXXXXXXXX/item-metadata/staging
> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
> BeamDeprecationWarning: options is deprecated since First stable release.
> References to <pipeline>.options will not be supported
>   experiments = p.options.view_as(DebugOptions).experiments or []
> WARNING:root:Make sure that locally built Python SDK docker image has
> Python 3.7 interpreter.
> Traceback (most recent call last):
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
> line 8, in <module>
>     sys.exit(run())
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
> line 246, in run
>     batch_size=500,
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 524, in __exit__
>     self.run().wait_until_finish()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 497, in run
>     self._options).run(False)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 510, in run
>     return self.runner.run_pipeline(self, self._options)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
> line 484, in run_pipeline
>     allow_proto_holders=True)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 858, in from_runner_api
>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1238, in from_runner_api
>     part = context.transforms.get_by_id(transform_id)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1244, in from_runner_api
>     id in proto.outputs.items()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1244, in <dictcomp>
>     id in proto.outputs.items()
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
>     self._id_to_proto[id], self._pipeline_context)
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
> line 214, in from_runner_api
>     element_type=context.element_type_from_coder_id(proto.coder_id),
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
> line 227, in element_type_from_coder_id
>     self.coders[coder_id].to_type_hint())
>   File
> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
> line 221, in to_type_hint
>     raise NotImplementedError('BEAM-2717')
> NotImplementedError: BEAM-2717
>
> When I was debugging and commenting out the different steps, I noticed the
> location in my code that supposedly throws the error changes. Here it
> complains about the WriteToBigQuery step (batch_size=500) but if I comment
> out that step it just moves on to the one above. It appears it's
> consistently thrown on the last run step (don't know if that's helpful,
> just thought I'd mention it).
>
> After adding beam.typehints.disable_type_annotations() it still throws
> the same error.
>
> Another thing I forgot to mention in my first email is that I registered a
> ProtoCoder as suggested at the bottom of this page (
> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>
> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>
> Thanks again, really appreciate your help!
> Lien
>
> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Hi Lien,
>>
>> > First time writing the email list, so please tell me if I'm doing this
>> all wrong.
>> Not at all! This is exactly the kind of question this list is for
>>
>> I have a couple of questions that may help us debug:
>> - Can you share the full stacktrace?
>> - What version of Beam are you using?
>>
>> There were some changes to the way we use typehints in the most recent
>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>> could you try reverting to the old behavior (call
>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>> pipeline) to see if that helps?
>>
>> Thanks,
>> Brian
>>
>> [1] https://beam.apache.org/blog/python-typing/
>>
>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com>
>> wrote:
>>
>>>
>>> Hi everyone,
>>>
>>> First time writing the email list, so please tell me if I'm doing this
>>> all wrong.
>>>
>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>
>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>> as I try to deploy to DataFlow it throws the following error:
>>>
>>> File
>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>>> 221, in to_type_hint
>>>     raise NotImplementedError('BEAM-2717')
>>>
>>> I've tried narrowing down what exactly could be causing the issue and it
>>> appears to be caused by the second step in my pipeline, which transforms
>>> the bytes read from PubSub to my own internal Proto format:
>>>
>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>> action_wrapper = ActionWrapper()
>>> action_wrapper.ParseFromString(x)
>>>
>>> return action_wrapper
>>>
>>> which is applied as a Map step.
>>>
>>> I've added typehints to all downstream steps as follows:
>>> def partition_by_environment(
>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>> ) -> int:
>>>
>>> I'd really appreciate it if anyone could let me know what I'm doing
>>> wrong, or what exactly is the issue this error is referring to. I read the
>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>
>>> Thanks!
>>> Kind regards,
>>> Lien
>>>
>>

Reply via email to