Hi Kevin, Is it possible to provide a simple example to reproduce this issue?
PS: It will use pickle to perform the serialization/deserialization if you don't specify the type info. Regards, Dian On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise <ar...@apache.org> wrote: > Hi Kevin, > > yes I understood that, but then your Python class contains a Row field, > where no mapping exists. I'm assuming PyFlink tries to do a deep conversion > and fails to do so by ending in some infinite loop. > > On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <kevin....@shopify.com> wrote: > >> Thanks for the response Arvid! Point of clarification, *things do NOT >> OOM when I use the Row subclass*. Instead, the code that doesn't use the >> Row subclass is the code that OOMs (ie. the simple python class). >> >> >> >> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Kevin, >>> >>> I suspect that this is because Row is not supported as a Python field >>> [1]; it's supposed to be a dict that is mapped to a Row by Flink. >>> Maybe it runs in some infinite loop while trying serialize and hence the >>> OOM. >>> >>> Subclassing Row might be an undocumented feature. >>> >>> I'm also pulling in Dian who knows more about PyFlink. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html >>> >>> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote: >>> >>>> Hi all, >>>> >>>> I've encountered an interesting issue where I observe an OOM issue in >>>> my Flink Application when I use a DataStream of Python Objects, but when I >>>> make that Python Object a Subclass of pyflink.common.types.Row and provide >>>> TypeInformation, there is no issue. >>>> >>>> For the OOM scenario, no elements get processed, the application runs >>>> without printing output and then eventually crashes with >>>> java.lang.OutOfMemoryError: >>>> Java heap space >>>> >>>> Any insights into why this might be happening? Appreciate any >>>> help/suggestions. >>>> I've included some code that illustrates the two situations below [0]. >>>> >>>> Thanks in advance! >>>> >>>> [0]: >>>> >>>> Code Snippet A: OOM scenario >>>> >>>> class InputWrapper: >>>> """Helper class, used to make streams of the same type""" >>>> >>>> def __init__(self, key: str, contents: Row = None): >>>> self.key = key >>>> self.contents = contents >>>> >>>> x_ds = x_ds.map( >>>> lambda d: InputWrapper(key=d['key'], contents=d)) >>>> y_ds = y_ds.map( >>>> lambda o: InputWrapper(key=o['key'], contents=o)) >>>> union = x_ds.union(y_ds) >>>> union.print() >>>> >>>> Code Snippet B: Working scenario: >>>> >>>> class InputWrapper(Row): >>>> """Helper class, used to make streams of the same type""" >>>> >>>> def __init__(self, key: str, contents: Row = None): >>>> super().__init__(key, contents) >>>> >>>> x_ds = x_ds.map( >>>> lambda d: InputWrapper(key=d['key'], contents=d), >>>> output_type=InputWrapperTypeInfo()) >>>> y_ds = y_ds.map( >>>> lambda o: InputWrapper(key=o['key'], contents=o), >>>> output_type=InputWrapperTypeInfo()) >>>> union = x_ds.union(y_ds) >>>> union.print() >>>> >>>> >>>>