Hi Kevin,

I suspect that this is because Row is not supported as a Python field [1];
it's supposed to be a dict that is mapped to a Row by Flink.
Maybe it runs in some infinite loop while trying serialize and hence the
OOM.

Subclassing Row might be an undocumented feature.

I'm also pulling in Dian who knows more about PyFlink.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html

On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote:

> Hi all,
>
> I've encountered an interesting issue where I observe an OOM issue in my
> Flink Application when I use a DataStream of Python Objects, but when I
> make that Python Object a Subclass of pyflink.common.types.Row and provide
> TypeInformation, there is no issue.
>
> For the OOM scenario, no elements get processed, the application runs
> without printing output and then eventually crashes with 
> java.lang.OutOfMemoryError:
> Java heap space
>
> Any insights into why this might be happening? Appreciate any
> help/suggestions.
> I've included some code that illustrates the two situations below [0].
>
> Thanks in advance!
>
> [0]:
>
> Code Snippet A: OOM scenario
>
> class InputWrapper:
> """Helper class, used to make streams of the same type"""
>
> def __init__(self, key: str, contents: Row = None):
> self.key = key
> self.contents = contents
>
> x_ds = x_ds.map(
> lambda d: InputWrapper(key=d['key'], contents=d))
> y_ds = y_ds.map(
> lambda o: InputWrapper(key=o['key'], contents=o))
> union = x_ds.union(y_ds)
> union.print()
>
> Code Snippet B: Working scenario:
>
> class InputWrapper(Row):
> """Helper class, used to make streams of the same type"""
>
> def __init__(self, key: str, contents: Row = None):
> super().__init__(key, contents)
>
> x_ds = x_ds.map(
> lambda d: InputWrapper(key=d['key'], contents=d),
> output_type=InputWrapperTypeInfo())
> y_ds = y_ds.map(
> lambda o: InputWrapper(key=o['key'], contents=o),
> output_type=InputWrapperTypeInfo())
> union = x_ds.union(y_ds)
> union.print()
>
>
>

Reply via email to