Hi Kevin,

yes I understood that, but then your Python class contains a Row field,
where no mapping exists. I'm assuming PyFlink tries to do a deep conversion
and fails to do so by ending in some infinite loop.

On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <kevin....@shopify.com> wrote:

> Thanks for the response Arvid! Point of clarification, *things do NOT OOM
> when I use the Row subclass*. Instead, the code that doesn't use the Row
> subclass is the code that OOMs (ie. the simple python class).
>
>
>
> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> I suspect that this is because Row is not supported as a Python field
>> [1]; it's supposed to be a dict that is mapped to a Row by Flink.
>> Maybe it runs in some infinite loop while trying serialize and hence the
>> OOM.
>>
>> Subclassing Row might be an undocumented feature.
>>
>> I'm also pulling in Dian who knows more about PyFlink.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html
>>
>> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote:
>>
>>> Hi all,
>>>
>>> I've encountered an interesting issue where I observe an OOM issue in my
>>> Flink Application when I use a DataStream of Python Objects, but when I
>>> make that Python Object a Subclass of pyflink.common.types.Row and provide
>>> TypeInformation, there is no issue.
>>>
>>> For the OOM scenario, no elements get processed, the application runs
>>> without printing output and then eventually crashes with 
>>> java.lang.OutOfMemoryError:
>>> Java heap space
>>>
>>> Any insights into why this might be happening? Appreciate any
>>> help/suggestions.
>>> I've included some code that illustrates the two situations below [0].
>>>
>>> Thanks in advance!
>>>
>>> [0]:
>>>
>>> Code Snippet A: OOM scenario
>>>
>>> class InputWrapper:
>>> """Helper class, used to make streams of the same type"""
>>>
>>> def __init__(self, key: str, contents: Row = None):
>>> self.key = key
>>> self.contents = contents
>>>
>>> x_ds = x_ds.map(
>>> lambda d: InputWrapper(key=d['key'], contents=d))
>>> y_ds = y_ds.map(
>>> lambda o: InputWrapper(key=o['key'], contents=o))
>>> union = x_ds.union(y_ds)
>>> union.print()
>>>
>>> Code Snippet B: Working scenario:
>>>
>>> class InputWrapper(Row):
>>> """Helper class, used to make streams of the same type"""
>>>
>>> def __init__(self, key: str, contents: Row = None):
>>> super().__init__(key, contents)
>>>
>>> x_ds = x_ds.map(
>>> lambda d: InputWrapper(key=d['key'], contents=d),
>>> output_type=InputWrapperTypeInfo())
>>> y_ds = y_ds.map(
>>> lambda o: InputWrapper(key=o['key'], contents=o),
>>> output_type=InputWrapperTypeInfo())
>>> union = x_ds.union(y_ds)
>>> union.print()
>>>
>>>
>>>

Reply via email to