I don't believe you need to register a coder unless you create your own
coder class.

Since the coder has a proto_message_type field, it could return that in
to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area.

As a temporary workaround, please try overriding the output type like so:
  Map(parse_message_blobs).with_output_types(typing.Any)

That should prevent Beam from attempting to use the ProtoCoder.

On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette <bhule...@google.com> wrote:

> +Udi Meiri <eh...@google.com> - do you have any suggestions to resolve
> this?
>
> It looks like there are a couple things going wrong:
> - ProtoCoder doesn't have a definition for to_type_hint
> - DataflowRunner calls from_runner_api, which I believe is a workaround we
> can eventually remove (+Robert Bradshaw <rober...@google.com>), which in
> turn tries to get type hints for every coder in the pipeline
>
> Brian
>
> On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <lien.michi...@froomle.com>
> wrote:
>
>> Hi Brian,
>>
>> Thanks so much for your quick response!
>>
>> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
>> exact same error. Here is the full stacktrace:
>>
>> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
>> metadata_persistor --project XXXXX --environments XXXXX --window_size 1
>> --input_subscription
>> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
>> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
>> --staging_location gs://XXXXXXXXXXX/item-metadata/staging
>> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
>> BeamDeprecationWarning: options is deprecated since First stable release.
>> References to <pipeline>.options will not be supported
>>   experiments = p.options.view_as(DebugOptions).experiments or []
>> WARNING:root:Make sure that locally built Python SDK docker image has
>> Python 3.7 interpreter.
>> Traceback (most recent call last):
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
>> line 8, in <module>
>>     sys.exit(run())
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
>> line 246, in run
>>     batch_size=500,
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 524, in __exit__
>>     self.run().wait_until_finish()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 497, in run
>>     self._options).run(False)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 510, in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>> line 484, in run_pipeline
>>     allow_proto_holders=True)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 858, in from_runner_api
>>     p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1238, in from_runner_api
>>     part = context.transforms.get_by_id(transform_id)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1244, in from_runner_api
>>     id in proto.outputs.items()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 1244, in <dictcomp>
>>     id in proto.outputs.items()
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 103, in get_by_id
>>     self._id_to_proto[id], self._pipeline_context)
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
>> line 214, in from_runner_api
>>     element_type=context.element_type_from_coder_id(proto.coder_id),
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 227, in element_type_from_coder_id
>>     self.coders[coder_id].to_type_hint())
>>   File
>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>> line 221, in to_type_hint
>>     raise NotImplementedError('BEAM-2717')
>> NotImplementedError: BEAM-2717
>>
>> When I was debugging and commenting out the different steps, I noticed
>> the location in my code that supposedly throws the error changes. Here it
>> complains about the WriteToBigQuery step (batch_size=500) but if I comment
>> out that step it just moves on to the one above. It appears it's
>> consistently thrown on the last run step (don't know if that's helpful,
>> just thought I'd mention it).
>>
>> After adding beam.typehints.disable_type_annotations() it still throws
>> the same error.
>>
>> Another thing I forgot to mention in my first email is that I registered
>> a ProtoCoder as suggested at the bottom of this page (
>> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>>
>> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>>
>> Thanks again, really appreciate your help!
>> Lien
>>
>> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com> wrote:
>>
>>> Hi Lien,
>>>
>>> > First time writing the email list, so please tell me if I'm doing
>>> this all wrong.
>>> Not at all! This is exactly the kind of question this list is for
>>>
>>> I have a couple of questions that may help us debug:
>>> - Can you share the full stacktrace?
>>> - What version of Beam are you using?
>>>
>>> There were some changes to the way we use typehints in the most recent
>>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>>> could you try reverting to the old behavior (call
>>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>>> pipeline) to see if that helps?
>>>
>>> Thanks,
>>> Brian
>>>
>>> [1] https://beam.apache.org/blog/python-typing/
>>>
>>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com>
>>> wrote:
>>>
>>>>
>>>> Hi everyone,
>>>>
>>>> First time writing the email list, so please tell me if I'm doing this
>>>> all wrong.
>>>>
>>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>>
>>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>>> as I try to deploy to DataFlow it throws the following error:
>>>>
>>>> File
>>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", line
>>>> 221, in to_type_hint
>>>>     raise NotImplementedError('BEAM-2717')
>>>>
>>>> I've tried narrowing down what exactly could be causing the issue and
>>>> it appears to be caused by the second step in my pipeline, which transforms
>>>> the bytes read from PubSub to my own internal Proto format:
>>>>
>>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>>> action_wrapper = ActionWrapper()
>>>> action_wrapper.ParseFromString(x)
>>>>
>>>> return action_wrapper
>>>>
>>>> which is applied as a Map step.
>>>>
>>>> I've added typehints to all downstream steps as follows:
>>>> def partition_by_environment(
>>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>>> ) -> int:
>>>>
>>>> I'd really appreciate it if anyone could let me know what I'm doing
>>>> wrong, or what exactly is the issue this error is referring to. I read the
>>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>>
>>>> Thanks!
>>>> Kind regards,
>>>> Lien
>>>>
>>>

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to