Hi Dian, I have unit tests for which both sets of code (Row subclass vs. custom Python class) passes. The OOM occurs when reading a large amount of data from a kafka topic.
At the moment I don't have a simple example to reproduce the issue, I'll let you know. On Tue, Mar 23, 2021 at 2:17 AM Dian Fu <dian0511...@gmail.com> wrote: > Hi Kevin, > > Is it possible to provide a simple example to reproduce this issue? > > PS: It will use pickle to perform the serialization/deserialization if you > don't specify the type info. > > Regards, > Dian > > > On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Kevin, >> >> yes I understood that, but then your Python class contains a Row field, >> where no mapping exists. I'm assuming PyFlink tries to do a deep conversion >> and fails to do so by ending in some infinite loop. >> >> On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <kevin....@shopify.com> wrote: >> >>> Thanks for the response Arvid! Point of clarification, *things do NOT >>> OOM when I use the Row subclass*. Instead, the code that doesn't use >>> the Row subclass is the code that OOMs (ie. the simple python class). >>> >>> >>> >>> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Kevin, >>>> >>>> I suspect that this is because Row is not supported as a Python field >>>> [1]; it's supposed to be a dict that is mapped to a Row by Flink. >>>> Maybe it runs in some infinite loop while trying serialize and hence >>>> the OOM. >>>> >>>> Subclassing Row might be an undocumented feature. >>>> >>>> I'm also pulling in Dian who knows more about PyFlink. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html >>>> >>>> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I've encountered an interesting issue where I observe an OOM issue in >>>>> my Flink Application when I use a DataStream of Python Objects, but when I >>>>> make that Python Object a Subclass of pyflink.common.types.Row and provide >>>>> TypeInformation, there is no issue. >>>>> >>>>> For the OOM scenario, no elements get processed, the application runs >>>>> without printing output and then eventually crashes with >>>>> java.lang.OutOfMemoryError: >>>>> Java heap space >>>>> >>>>> Any insights into why this might be happening? Appreciate any >>>>> help/suggestions. >>>>> I've included some code that illustrates the two situations below [0]. >>>>> >>>>> Thanks in advance! >>>>> >>>>> [0]: >>>>> >>>>> Code Snippet A: OOM scenario >>>>> >>>>> class InputWrapper: >>>>> """Helper class, used to make streams of the same type""" >>>>> >>>>> def __init__(self, key: str, contents: Row = None): >>>>> self.key = key >>>>> self.contents = contents >>>>> >>>>> x_ds = x_ds.map( >>>>> lambda d: InputWrapper(key=d['key'], contents=d)) >>>>> y_ds = y_ds.map( >>>>> lambda o: InputWrapper(key=o['key'], contents=o)) >>>>> union = x_ds.union(y_ds) >>>>> union.print() >>>>> >>>>> Code Snippet B: Working scenario: >>>>> >>>>> class InputWrapper(Row): >>>>> """Helper class, used to make streams of the same type""" >>>>> >>>>> def __init__(self, key: str, contents: Row = None): >>>>> super().__init__(key, contents) >>>>> >>>>> x_ds = x_ds.map( >>>>> lambda d: InputWrapper(key=d['key'], contents=d), >>>>> output_type=InputWrapperTypeInfo()) >>>>> y_ds = y_ds.map( >>>>> lambda o: InputWrapper(key=o['key'], contents=o), >>>>> output_type=InputWrapperTypeInfo()) >>>>> union = x_ds.union(y_ds) >>>>> union.print() >>>>> >>>>> >>>>>