Hi Frederic, I’ve once (upon a time 😊) had a similar situation when we changed from Flink 1.8 to Flink 1.13 … It took me a long time to figure out. Some hints where to start to look:
* _metadata file is used for * Job manager state * Smallish keyed state (in order to avoid too many small state files) * Operator state (non-keyed) * Does the operator that is getting blocked in initialization use operator state? * Look for some condition that might cause it growing * In my case back then, a minor condition caused the operator state being duplicated per operator parallelism when loading from a savepoint, which caused exponential growth per savepoint cycle * You can obtain a local copy of this savepoint and try to load it by means of the state-processor-api * Breaking into the debugger, at some point the _metadata file gets loaded and allows to determine which state actually had the run-away and what might have caused duplication I hope this helps Thias From: Frederic Leger <frederic.le...@asklocala.com> Sent: Monday, August 28, 2023 12:30 PM To: user@flink.apache.org Subject: Checkpoint/savepoint _metadata ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi team, We use flink 1.16.0 with openjdk-11-jre mainly to run streaming jobs. We do checkpoints with 2 min interval and savepoint when deploying new job version. We also use rocksdb state backend for most of them. We had a streaming job running for long without any issue and during a new deployment we could not launch it anymore, it was getting stuck on CREATING on one task, then was failing and restarting and so on. In this Flink job, we handle a large data stream using key-based grouping. Inside a processFunction, we use MapState[Long, String] as our state storage, which keeps data with associated time limits (TTL) of 30 days. The most relevant error we got from the logs was : 2023-08-02 12:40:52,186 ERROR akka.remote.EndpointWriter [] - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@10.1.1.5:31336/user/rpc/taskmanager_0#1435767402<http://flink@10.1.1.5:31336/user/rpc/taskmanager_0#1435767402>]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 42582878 bytes. The solution to solve this issue was to increase akka.framesize from default (10MB) to 50MB akka.framesize: 52428800b After 16h of uptime, we wanted to move back the job to its initial cluster as it was running fine since then, but after the savepoint done, we could not launch it back and got this error : 2023-08-03 08:49:06,474 ERROR akka.remote.EndpointWriter [] - Transient association error (association remains live) akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@10.1.1.5:31358/user/rpc/taskmanager_0#1492669447<http://flink@10.1.1.5:31358/user/rpc/taskmanager_0#1492669447>]: max allowed size 52428800 bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 679594586 bytes. After some research it seems to be related to _metadata file written when checkpointing/savepointing and this file has grown up amazingly in the past 16h from 50MB to more than 600MB if we compare the first ERROR and the last one. Since then we were unable to launch back the job. Increasing akka.framesize from 50MB to 1GB permit to avoid the above errors, but one task was remaining in CREATING state until failure. We started to get java.lang.OutOfMemoryError: Java heap space on the jobmanager, then timeout between the taskmanagers and jobmanager. The heap size set to avoid the OOM on the jobmanager was from 2GB to 20GB. Increasing timeouts lead to other errors, like java.lang.OutOfMemoryError: Java heap space on the taskmanagers and so on to finally timeout and fail. 2023-08-03 10:28:32,191 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - The heartbeat of ResourceManager with id be6b26eb0a0a54e636c9fbfc5f9815f3 timed out. 2023-08-03 10:28:32,191 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close ResourceManager connection be6b26eb0a0a54e636c9fbfc5f9815f3. 2023-08-03 10:28:32,191 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager akka.tcp://flink@10.1.1.3:46899/user/rpc/resourcemanager_0(a6fef33bff489d7e860c1017d2a34f50)<http://flink@10.1.1.3:46899/user/rpc/resourcemanager_0(a6fef33bff489d7e860c1017d2a34f50)>. 2023-08-03 10:28:38,411 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - The heartbeat of JobManager with id 39d49002792d881da6a5e7266c8ee58b timed out. 2023-08-03 10:28:38,412 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job f89626c718a36aa0240f8746b8b5a690. As far as we understand, metadata should never become that huge unless we are doing something wrong(?). Does someone have an idea on how we can diagnose this ? We have kept a copy of the culprit metadata files and are in the process of taking a look at their content, but don't have a precise idea of what we should look for. Any suggestions are welcome. Thx ________________________________ You received this electronic message as part of a business or employment relationship with one or several Ask Locala entities. Its content is strictly confidential and is covered by the obligation of confidentiality and business secrecy. Any dissemination, copying, printing distribution, retention or use of the message’s content or any attachments that could be detrimental to Ask Locala is forbidden, even if it was forwarded by mailing lists. If you are not the intended recipient, please notify the sender of the error without delay and delete permanently this email and any files from your system and destroy any printed copies. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.