Hi team,

We use flink 1.16.0 with openjdk-11-jre mainly to run streaming jobs.
We do checkpoints with 2 min interval and savepoint when deploying new job
version.
We also use rocksdb state backend for most of them.

We had a streaming job running for long without any issue and during a new
deployment we could not launch it anymore, it was getting stuck on CREATING
on one task, then was failing and restarting and so on.
In this Flink job, we handle a large data stream using key-based grouping.
Inside a processFunction, we use MapState[Long, String] as our state
storage, which keeps data with associated time limits (TTL) of 30 days.

The most relevant error we got from the logs was :

2023-08-02 12:40:52,186 ERROR akka.remote.EndpointWriter
                [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to
Actor[akka.tcp://flink@10.1.1.5:31336/user/rpc/taskmanager_0#1435767402]:
max allowed size 10485760 bytes, actual size of encoded class
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 42582878
bytes.

The solution to solve this issue was to increase akka.framesize from
default (10MB) to 50MB
akka.framesize: 52428800b

After 16h of uptime, we wanted to move back the job to its initial cluster
as it was running fine since then, but after the savepoint done, we could
not launch it back and got this error :

2023-08-03 08:49:06,474 ERROR akka.remote.EndpointWriter
                [] - Transient association error (association remains live)
akka.remote.OversizedPayloadException: Discarding oversized payload sent to
Actor[akka.tcp://flink@10.1.1.5:31358/user/rpc/taskmanager_0#1492669447]:
max allowed size 52428800 bytes, actual size of encoded class
org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 679594586
bytes.

After some research it seems to be related to _metadata file written when
checkpointing/savepointing and this file has grown up amazingly in the past
16h from 50MB to more than 600MB if we compare the first ERROR and the last
one.

Since then we were unable to launch back the job.

Increasing akka.framesize from 50MB to 1GB permit to avoid the above
errors, but one task was remaining in CREATING state until failure.
We started to get java.lang.OutOfMemoryError: Java heap space on the
jobmanager, then timeout between the taskmanagers and jobmanager.
The heap size set to avoid the OOM on the jobmanager was from 2GB to 20GB.
Increasing timeouts lead to other errors, like java.lang.OutOfMemoryError:
Java heap space on the taskmanagers and so on to finally timeout and fail.

2023-08-03 10:28:32,191 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - The
heartbeat of ResourceManager with id be6b26eb0a0a54e636c9fbfc5f9815f3 timed
out.
2023-08-03 10:28:32,191 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close
ResourceManager connection be6b26eb0a0a54e636c9fbfc5f9815f3.
2023-08-03 10:28:32,191 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] -
Connecting to ResourceManager akka.tcp://
flink@10.1.1.3:46899/user/rpc/resourcemanager_0(a6fef33bff489d7e860c1017d2a34f50)
.
2023-08-03 10:28:38,411 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - The
heartbeat of JobManager with id 39d49002792d881da6a5e7266c8ee58b timed out.
2023-08-03 10:28:38,412 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close
JobManager connection for job f89626c718a36aa0240f8746b8b5a690.

As far as we understand, metadata should never become that huge unless we
are doing something wrong(?).
Does someone have an idea on how we can diagnose this ?
We have kept a copy of the culprit metadata files and are in the process of
taking a look at their content, but don't have a precise idea of what we
should look for.

Any suggestions are welcome.
Thx

-- 




You received this electronic message as part of a business or 
employment relationship with one or several Ask Locala entities. Its 
content is strictly confidential and is covered by the obligation of 
confidentiality and business secrecy. Any dissemination, copying, printing 
distribution, retention or use of the message’s content or any attachments 
that could be detrimental to Ask Locala is forbidden, even if it was 
forwarded by mailing lists. 
If you are not the intended recipient, please 
notify the sender of the error without delay and delete permanently this 
email and any files from your system and destroy any printed copies. 

Reply via email to