Hi all, I've encountered an interesting issue where I observe an OOM issue in my Flink Application when I use a DataStream of Python Objects, but when I make that Python Object a Subclass of pyflink.common.types.Row and provide TypeInformation, there is no issue.
For the OOM scenario, no elements get processed, the application runs without printing output and then eventually crashes with java.lang.OutOfMemoryError: Java heap space Any insights into why this might be happening? Appreciate any help/suggestions. I've included some code that illustrates the two situations below [0]. Thanks in advance! [0]: Code Snippet A: OOM scenario class InputWrapper: """Helper class, used to make streams of the same type""" def __init__(self, key: str, contents: Row = None): self.key = key self.contents = contents x_ds = x_ds.map( lambda d: InputWrapper(key=d['key'], contents=d)) y_ds = y_ds.map( lambda o: InputWrapper(key=o['key'], contents=o)) union = x_ds.union(y_ds) union.print() Code Snippet B: Working scenario: class InputWrapper(Row): """Helper class, used to make streams of the same type""" def __init__(self, key: str, contents: Row = None): super().__init__(key, contents) x_ds = x_ds.map( lambda d: InputWrapper(key=d['key'], contents=d), output_type=InputWrapperTypeInfo()) y_ds = y_ds.map( lambda o: InputWrapper(key=o['key'], contents=o), output_type=InputWrapperTypeInfo()) union = x_ds.union(y_ds) union.print()