Thanks for the response Arvid! Point of clarification, *things do NOT OOM
when I use the Row subclass*. Instead, the code that doesn't use the Row
subclass is the code that OOMs (ie. the simple python class).



On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote:

> Hi Kevin,
>
> I suspect that this is because Row is not supported as a Python field [1];
> it's supposed to be a dict that is mapped to a Row by Flink.
> Maybe it runs in some infinite loop while trying serialize and hence the
> OOM.
>
> Subclassing Row might be an undocumented feature.
>
> I'm also pulling in Dian who knows more about PyFlink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html
>
> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Hi all,
>>
>> I've encountered an interesting issue where I observe an OOM issue in my
>> Flink Application when I use a DataStream of Python Objects, but when I
>> make that Python Object a Subclass of pyflink.common.types.Row and provide
>> TypeInformation, there is no issue.
>>
>> For the OOM scenario, no elements get processed, the application runs
>> without printing output and then eventually crashes with 
>> java.lang.OutOfMemoryError:
>> Java heap space
>>
>> Any insights into why this might be happening? Appreciate any
>> help/suggestions.
>> I've included some code that illustrates the two situations below [0].
>>
>> Thanks in advance!
>>
>> [0]:
>>
>> Code Snippet A: OOM scenario
>>
>> class InputWrapper:
>> """Helper class, used to make streams of the same type"""
>>
>> def __init__(self, key: str, contents: Row = None):
>> self.key = key
>> self.contents = contents
>>
>> x_ds = x_ds.map(
>> lambda d: InputWrapper(key=d['key'], contents=d))
>> y_ds = y_ds.map(
>> lambda o: InputWrapper(key=o['key'], contents=o))
>> union = x_ds.union(y_ds)
>> union.print()
>>
>> Code Snippet B: Working scenario:
>>
>> class InputWrapper(Row):
>> """Helper class, used to make streams of the same type"""
>>
>> def __init__(self, key: str, contents: Row = None):
>> super().__init__(key, contents)
>>
>> x_ds = x_ds.map(
>> lambda d: InputWrapper(key=d['key'], contents=d),
>> output_type=InputWrapperTypeInfo())
>> y_ds = y_ds.map(
>> lambda o: InputWrapper(key=o['key'], contents=o),
>> output_type=InputWrapperTypeInfo())
>> union = x_ds.union(y_ds)
>> union.print()
>>
>>
>>

Reply via email to