Hi Kevin, yes I understood that, but then your Python class contains a Row field, where no mapping exists. I'm assuming PyFlink tries to do a deep conversion and fails to do so by ending in some infinite loop.
On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <kevin....@shopify.com> wrote: > Thanks for the response Arvid! Point of clarification, *things do NOT OOM > when I use the Row subclass*. Instead, the code that doesn't use the Row > subclass is the code that OOMs (ie. the simple python class). > > > > On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Kevin, >> >> I suspect that this is because Row is not supported as a Python field >> [1]; it's supposed to be a dict that is mapped to a Row by Flink. >> Maybe it runs in some infinite loop while trying serialize and hence the >> OOM. >> >> Subclassing Row might be an undocumented feature. >> >> I'm also pulling in Dian who knows more about PyFlink. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html >> >> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote: >> >>> Hi all, >>> >>> I've encountered an interesting issue where I observe an OOM issue in my >>> Flink Application when I use a DataStream of Python Objects, but when I >>> make that Python Object a Subclass of pyflink.common.types.Row and provide >>> TypeInformation, there is no issue. >>> >>> For the OOM scenario, no elements get processed, the application runs >>> without printing output and then eventually crashes with >>> java.lang.OutOfMemoryError: >>> Java heap space >>> >>> Any insights into why this might be happening? Appreciate any >>> help/suggestions. >>> I've included some code that illustrates the two situations below [0]. >>> >>> Thanks in advance! >>> >>> [0]: >>> >>> Code Snippet A: OOM scenario >>> >>> class InputWrapper: >>> """Helper class, used to make streams of the same type""" >>> >>> def __init__(self, key: str, contents: Row = None): >>> self.key = key >>> self.contents = contents >>> >>> x_ds = x_ds.map( >>> lambda d: InputWrapper(key=d['key'], contents=d)) >>> y_ds = y_ds.map( >>> lambda o: InputWrapper(key=o['key'], contents=o)) >>> union = x_ds.union(y_ds) >>> union.print() >>> >>> Code Snippet B: Working scenario: >>> >>> class InputWrapper(Row): >>> """Helper class, used to make streams of the same type""" >>> >>> def __init__(self, key: str, contents: Row = None): >>> super().__init__(key, contents) >>> >>> x_ds = x_ds.map( >>> lambda d: InputWrapper(key=d['key'], contents=d), >>> output_type=InputWrapperTypeInfo()) >>> y_ds = y_ds.map( >>> lambda o: InputWrapper(key=o['key'], contents=o), >>> output_type=InputWrapperTypeInfo()) >>> union = x_ds.union(y_ds) >>> union.print() >>> >>> >>>