Thanks for the response Arvid! Point of clarification, *things do NOT OOM when I use the Row subclass*. Instead, the code that doesn't use the Row subclass is the code that OOMs (ie. the simple python class).
On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote: > Hi Kevin, > > I suspect that this is because Row is not supported as a Python field [1]; > it's supposed to be a dict that is mapped to a Row by Flink. > Maybe it runs in some infinite loop while trying serialize and hence the > OOM. > > Subclassing Row might be an undocumented feature. > > I'm also pulling in Dian who knows more about PyFlink. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html > > On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote: > >> Hi all, >> >> I've encountered an interesting issue where I observe an OOM issue in my >> Flink Application when I use a DataStream of Python Objects, but when I >> make that Python Object a Subclass of pyflink.common.types.Row and provide >> TypeInformation, there is no issue. >> >> For the OOM scenario, no elements get processed, the application runs >> without printing output and then eventually crashes with >> java.lang.OutOfMemoryError: >> Java heap space >> >> Any insights into why this might be happening? Appreciate any >> help/suggestions. >> I've included some code that illustrates the two situations below [0]. >> >> Thanks in advance! >> >> [0]: >> >> Code Snippet A: OOM scenario >> >> class InputWrapper: >> """Helper class, used to make streams of the same type""" >> >> def __init__(self, key: str, contents: Row = None): >> self.key = key >> self.contents = contents >> >> x_ds = x_ds.map( >> lambda d: InputWrapper(key=d['key'], contents=d)) >> y_ds = y_ds.map( >> lambda o: InputWrapper(key=o['key'], contents=o)) >> union = x_ds.union(y_ds) >> union.print() >> >> Code Snippet B: Working scenario: >> >> class InputWrapper(Row): >> """Helper class, used to make streams of the same type""" >> >> def __init__(self, key: str, contents: Row = None): >> super().__init__(key, contents) >> >> x_ds = x_ds.map( >> lambda d: InputWrapper(key=d['key'], contents=d), >> output_type=InputWrapperTypeInfo()) >> y_ds = y_ds.map( >> lambda o: InputWrapper(key=o['key'], contents=o), >> output_type=InputWrapperTypeInfo()) >> union = x_ds.union(y_ds) >> union.print() >> >> >>