Hello, I was wondering if Flink has a size limit to serialize data. I have an object that stores a big 2D array and when I try to hand it over the next operator, I have the following error:
``` 2024-07-10 10:14:51,983 ERROR org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: Thread 'grpc-default-executor-1' produced an uncaught exception. If you want to fail on uncaught exceptions, then configure cluster.uncaught-exception-handling accordingly java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3745) ~[?:?] at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[?:?] at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) ~[?:?] at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) ~[?:?] at org.apache.beam.sdk.util.StreamUtils.getBytesWithoutClosing(StreamUtils.java:64) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:101) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:819) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:813) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:737) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:68) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:144) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:130) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) ~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?] at java.lang.Thread.run(Thread.java:829) [?:?] 2024-07-10 10:14:52,006 ERROR /usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py:659 [] - Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 543, in __next__ return self._next() File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 969, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "" debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:35379 {created_time:"2024-07-10T10:14:51.986932366+00:00", grpc_status:2, grpc_message:""}" > 2024-07-10 10:14:52,007 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Exception in thread 2024-07-10 10:14:52,007 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - read_grpc_client_inputs 2024-07-10 10:14:52,007 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - : 2024-07-10 10:14:52,007 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Traceback (most recent call last): 2024-07-10 10:14:52,007 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - self.run() 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - File "/usr/lib/python3.10/threading.py", line 953, in run 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - self._target(*self._args, **self._kwargs) 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py", line 669, in <lambda> 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - 2024-07-10 10:14:52,009 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - target=lambda: self._read_inputs(elements_iterator), 2024-07-10 10:14:52,010 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - for elements in elements_iterator: 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 543, in __next__ 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - return self._next() 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 969, in _next 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - 2024-07-10 10:14:52,011 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - raise self 2024-07-10 10:14:52,012 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - grpc._channel 2024-07-10 10:14:52,012 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - . 2024-07-10 10:14:52,012 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - _MultiThreadedRendezvous 2024-07-10 10:14:52,012 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - : 2024-07-10 10:14:52,012 ERROR /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "" debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:35379 {created_time:"2024-07-10T10:14:51.986932366+00:00", grpc_status:2, grpc_message:""}" > ``` I think this error comes from the serialization since I have put a log at the start of the next operator which is never logged even though the one at the end of the upstream operator does. Moreover, I have tried to slice the array to see if the size coud be the issue and I don't have the error when the array is small. However, the array must be processed as a whole so I can't really split it... So is there a way to make Flink serialize bigger objects ? Thanks in advance and best regards Ky Alexandre