Hi Dian,

I have unit tests for which both sets of code (Row subclass vs. custom
Python class) passes. The OOM occurs when reading a large amount of data
from a kafka topic.

At the moment I don't have a simple example to reproduce the issue, I'll
let you know.

On Tue, Mar 23, 2021 at 2:17 AM Dian Fu <dian0511...@gmail.com> wrote:

> Hi Kevin,
>
> Is it possible to provide a simple example to reproduce this issue?
>
> PS: It will use pickle to perform the serialization/deserialization if you
> don't specify the type info.
>
> Regards,
> Dian
>
>
> On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> yes I understood that, but then your Python class contains a Row field,
>> where no mapping exists. I'm assuming PyFlink tries to do a deep conversion
>> and fails to do so by ending in some infinite loop.
>>
>> On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <kevin....@shopify.com> wrote:
>>
>>> Thanks for the response Arvid! Point of clarification, *things do NOT
>>> OOM when I use the Row subclass*. Instead, the code that doesn't use
>>> the Row subclass is the code that OOMs (ie. the simple python class).
>>>
>>>
>>>
>>> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> I suspect that this is because Row is not supported as a Python field
>>>> [1]; it's supposed to be a dict that is mapped to a Row by Flink.
>>>> Maybe it runs in some infinite loop while trying serialize and hence
>>>> the OOM.
>>>>
>>>> Subclassing Row might be an undocumented feature.
>>>>
>>>> I'm also pulling in Dian who knows more about PyFlink.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html
>>>>
>>>> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I've encountered an interesting issue where I observe an OOM issue in
>>>>> my Flink Application when I use a DataStream of Python Objects, but when I
>>>>> make that Python Object a Subclass of pyflink.common.types.Row and provide
>>>>> TypeInformation, there is no issue.
>>>>>
>>>>> For the OOM scenario, no elements get processed, the application runs
>>>>> without printing output and then eventually crashes with 
>>>>> java.lang.OutOfMemoryError:
>>>>> Java heap space
>>>>>
>>>>> Any insights into why this might be happening? Appreciate any
>>>>> help/suggestions.
>>>>> I've included some code that illustrates the two situations below [0].
>>>>>
>>>>> Thanks in advance!
>>>>>
>>>>> [0]:
>>>>>
>>>>> Code Snippet A: OOM scenario
>>>>>
>>>>> class InputWrapper:
>>>>> """Helper class, used to make streams of the same type"""
>>>>>
>>>>> def __init__(self, key: str, contents: Row = None):
>>>>> self.key = key
>>>>> self.contents = contents
>>>>>
>>>>> x_ds = x_ds.map(
>>>>> lambda d: InputWrapper(key=d['key'], contents=d))
>>>>> y_ds = y_ds.map(
>>>>> lambda o: InputWrapper(key=o['key'], contents=o))
>>>>> union = x_ds.union(y_ds)
>>>>> union.print()
>>>>>
>>>>> Code Snippet B: Working scenario:
>>>>>
>>>>> class InputWrapper(Row):
>>>>> """Helper class, used to make streams of the same type"""
>>>>>
>>>>> def __init__(self, key: str, contents: Row = None):
>>>>> super().__init__(key, contents)
>>>>>
>>>>> x_ds = x_ds.map(
>>>>> lambda d: InputWrapper(key=d['key'], contents=d),
>>>>> output_type=InputWrapperTypeInfo())
>>>>> y_ds = y_ds.map(
>>>>> lambda o: InputWrapper(key=o['key'], contents=o),
>>>>> output_type=InputWrapperTypeInfo())
>>>>> union = x_ds.union(y_ds)
>>>>> union.print()
>>>>>
>>>>>
>>>>>

Reply via email to