Hi Udi, Brian,

Thanks so much 🙌  I'm now able to deploy the pipeline.


Lien


On Thu, 11 Jun 2020 at 00:02, Udi Meiri <eh...@google.com> wrote:

> I don't believe you need to register a coder unless you create your own
> coder class.
>
> Since the coder has a proto_message_type field, it could return that in
> to_type_hint, but I'll defer to Robert since I'm unfamiliar with that area.
>
> As a temporary workaround, please try overriding the output type like so:
>   Map(parse_message_blobs).with_output_types(typing.Any)
>
> That should prevent Beam from attempting to use the ProtoCoder.
>
> On Wed, Jun 10, 2020 at 1:47 PM Brian Hulette <bhule...@google.com> wrote:
>
>> +Udi Meiri <eh...@google.com> - do you have any suggestions to resolve
>> this?
>>
>> It looks like there are a couple things going wrong:
>> - ProtoCoder doesn't have a definition for to_type_hint
>> - DataflowRunner calls from_runner_api, which I believe is a workaround
>> we can eventually remove (+Robert Bradshaw <rober...@google.com>), which
>> in turn tries to get type hints for every coder in the pipeline
>>
>> Brian
>>
>> On Tue, Jun 9, 2020 at 1:50 AM Lien Michiels <lien.michi...@froomle.com>
>> wrote:
>>
>>> Hi Brian,
>>>
>>> Thanks so much for your quick response!
>>>
>>> I've tried with both Apache Beam 2.20.0 and 2.21.0, both result in the
>>> exact same error. Here is the full stacktrace:
>>>
>>> (metadata-persistor) ➜  metadata-persistor git:(feature/DEV-1249) ✗
>>> metadata_persistor --project XXXXX --environments XXXXX --window_size 1
>>> --input_subscription
>>> projects/XXXXXXX/subscriptions/mds-internal-item-metadata-subscription
>>> --runner DataflowRunner --temp_location gs://XXXXXXXX/item-metadata/temp
>>> --staging_location gs://XXXXXXXXXXX/item-metadata/staging
>>> /Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py:1431:
>>> BeamDeprecationWarning: options is deprecated since First stable release.
>>> References to <pipeline>.options will not be supported
>>>   experiments = p.options.view_as(DebugOptions).experiments or []
>>> WARNING:root:Make sure that locally built Python SDK docker image has
>>> Python 3.7 interpreter.
>>> Traceback (most recent call last):
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/bin/metadata_persistor",
>>> line 8, in <module>
>>>     sys.exit(run())
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/metadata_persistor/item_metadata_mds_persistor.py",
>>> line 246, in run
>>>     batch_size=500,
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 524, in __exit__
>>>     self.run().wait_until_finish()
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 497, in run
>>>     self._options).run(False)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 510, in run
>>>     return self.runner.run_pipeline(self, self._options)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>>> line 484, in run_pipeline
>>>     allow_proto_holders=True)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 858, in from_runner_api
>>>     p.transforms_stack =
>>> [context.transforms.get_by_id(root_transform_id)]
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 103, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1238, in from_runner_api
>>>     part = context.transforms.get_by_id(transform_id)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 103, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1244, in from_runner_api
>>>     id in proto.outputs.items()
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pipeline.py",
>>> line 1244, in <dictcomp>
>>>     id in proto.outputs.items()
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 103, in get_by_id
>>>     self._id_to_proto[id], self._pipeline_context)
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/pvalue.py",
>>> line 214, in from_runner_api
>>>     element_type=context.element_type_from_coder_id(proto.coder_id),
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>>> line 227, in element_type_from_coder_id
>>>     self.coders[coder_id].to_type_hint())
>>>   File
>>> "/Users/lienmichiels/.virtualenvs/metadata-persistor/lib/python3.7/site-packages/apache_beam/coders/coders.py",
>>> line 221, in to_type_hint
>>>     raise NotImplementedError('BEAM-2717')
>>> NotImplementedError: BEAM-2717
>>>
>>> When I was debugging and commenting out the different steps, I noticed
>>> the location in my code that supposedly throws the error changes. Here it
>>> complains about the WriteToBigQuery step (batch_size=500) but if I comment
>>> out that step it just moves on to the one above. It appears it's
>>> consistently thrown on the last run step (don't know if that's helpful,
>>> just thought I'd mention it).
>>>
>>> After adding beam.typehints.disable_type_annotations() it still throws
>>> the same error.
>>>
>>> Another thing I forgot to mention in my first email is that I registered
>>> a ProtoCoder as suggested at the bottom of this page (
>>> https://beam.apache.org/documentation/sdks/python-type-safety/) as:
>>>
>>> beam.coders.registry.register_coder(ActionWrapper, ProtoCoder)
>>>
>>> Thanks again, really appreciate your help!
>>> Lien
>>>
>>> On Mon, Jun 8, 2020 at 5:26 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Hi Lien,
>>>>
>>>> > First time writing the email list, so please tell me if I'm doing
>>>> this all wrong.
>>>> Not at all! This is exactly the kind of question this list is for
>>>>
>>>> I have a couple of questions that may help us debug:
>>>> - Can you share the full stacktrace?
>>>> - What version of Beam are you using?
>>>>
>>>> There were some changes to the way we use typehints in the most recent
>>>> Beam release (2.21) that might be causing this [1]. If you're using 2.21
>>>> could you try reverting to the old behavior (call
>>>> `apache_beam.typehints.disable_type_annotations()` before constructing the
>>>> pipeline) to see if that helps?
>>>>
>>>> Thanks,
>>>> Brian
>>>>
>>>> [1] https://beam.apache.org/blog/python-typing/
>>>>
>>>> On Mon, Jun 8, 2020 at 4:15 AM Lien Michiels <lien.michi...@froomle.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hi everyone,
>>>>>
>>>>> First time writing the email list, so please tell me if I'm doing this
>>>>> all wrong.
>>>>>
>>>>> I'm building a streaming pipeline to be run on the DataflowRunner that
>>>>> reads from PubSub and writes to BQ using the Python 3 SDK.
>>>>>
>>>>> I can get the pipeline started fine with the DirectRunner, but as soon
>>>>> as I try to deploy to DataFlow it throws the following error:
>>>>>
>>>>> File
>>>>> "/usr/local/lib/python3.7/site-packages/apache_beam/coders/coders.py", 
>>>>> line
>>>>> 221, in to_type_hint
>>>>>     raise NotImplementedError('BEAM-2717')
>>>>>
>>>>> I've tried narrowing down what exactly could be causing the issue and
>>>>> it appears to be caused by the second step in my pipeline, which 
>>>>> transforms
>>>>> the bytes read from PubSub to my own internal Proto format:
>>>>>
>>>>> def parse_message_blobs(x: bytes) -> ActionWrapper:
>>>>> action_wrapper = ActionWrapper()
>>>>> action_wrapper.ParseFromString(x)
>>>>>
>>>>> return action_wrapper
>>>>>
>>>>> which is applied as a Map step.
>>>>>
>>>>> I've added typehints to all downstream steps as follows:
>>>>> def partition_by_environment(
>>>>> x: ActionWrapper, num_partitions: int, environments: List[str]
>>>>> ) -> int:
>>>>>
>>>>> I'd really appreciate it if anyone could let me know what I'm doing
>>>>> wrong, or what exactly is the issue this error is referring to. I read the
>>>>> JIRA ticket, but did not understand how it is related to my issue here.
>>>>>
>>>>> Thanks!
>>>>> Kind regards,
>>>>> Lien
>>>>>
>>>> --

Lien Michiels
Data Scientist & Solutions Architect
FROOMLE
m: +32 483 71 87 36
w: froomle.ai  e: lien.michi...@froomle.com
<https://www.linkedin.com/in/lien-michiels/>

Reply via email to