Hello,

I was wondering if Flink has a size limit to serialize data. I have an object 
that stores a big 2D array and when I try to hand it over the next operator, I 
have the following error:

```
2024-07-10 10:14:51,983 ERROR 
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: 
Thread 'grpc-default-executor-1' produced an uncaught exception. If you want to 
fail on uncaught exceptions, then configure cluster.uncaught-exception-handling 
accordingly
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3745) ~[?:?]
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[?:?]
    at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95) 
~[?:?]
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156) 
~[?:?]
    at 
org.apache.beam.sdk.util.StreamUtils.getBytesWithoutClosing(StreamUtils.java:64)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:101) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:819)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:813)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:737)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:68)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:144)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:130)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
 
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 
~[?:?]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 
~[?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
2024-07-10 10:14:52,006 ERROR 
/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py:659
 [] - Failed to read inputs in the data plane. Traceback (most recent call 
last):
  File 
"/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py",
 line 652, in _read_inputs
    for elements in elements_iterator:
  File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 543, in 
__next__
    return self._next()
  File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 969, in 
_next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that 
terminated with:
    status = StatusCode.UNKNOWN
    details = ""
    debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:35379 
{created_time:"2024-07-10T10:14:51.986932366+00:00", grpc_status:2, 
grpc_message:""}"
>

2024-07-10 10:14:52,007 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - Exception in thread
2024-07-10 10:14:52,007 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - read_grpc_client_inputs
2024-07-10 10:14:52,007 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - :

2024-07-10 10:14:52,007 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - Traceback (most recent call last):

2024-07-10 10:14:52,007 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -   File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner

2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -
2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - self.run()
2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -   File "/usr/lib/python3.10/threading.py", line 953, in run

2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -
2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - self._target(*self._args, **self._kwargs)
2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -   File 
"/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py",
 line 669, in <lambda>

2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -
2024-07-10 10:14:52,009 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - target=lambda: self._read_inputs(elements_iterator),
2024-07-10 10:14:52,010 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -   File 
"/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py",
 line 652, in _read_inputs

2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -
2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - for elements in elements_iterator:
2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -   File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 
543, in __next__

2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -
2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - return self._next()
2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -   File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 
969, in _next

2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] -
2024-07-10 10:14:52,011 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - raise self
2024-07-10 10:14:52,012 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - grpc._channel
2024-07-10 10:14:52,012 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - .
2024-07-10 10:14:52,012 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - _MultiThreadedRendezvous
2024-07-10 10:14:52,012 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - :
2024-07-10 10:14:52,012 ERROR 
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
 [] - <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.UNKNOWN
    details = ""
    debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:35379 
{created_time:"2024-07-10T10:14:51.986932366+00:00", grpc_status:2, 
grpc_message:""}"
>
```

I think this error comes from the serialization since I have put a log at the 
start of the next operator which is never logged even though the one at the end 
of the upstream operator does. Moreover, I have tried to slice the array to see 
if the size coud be the issue and I don't have the error when the array is 
small. However, the array must be processed as a whole so I can't really split 
it... So is there a way to make Flink serialize bigger objects ?


Thanks in advance and best regards


Ky Alexandre

Reply via email to