[ 
https://issues.apache.org/jira/browse/FLINK-38698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ari updated FLINK-38698:
------------------------
    Priority: Critical  (was: Major)

> TaskInformation blobs accumulate without cleanup causing storage exhaustion
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-38698
>                 URL: https://issues.apache.org/jira/browse/FLINK-38698
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.14.3, 1.15.4, 1.16.3, 1.17.2, 1.18.1, 1.19.3, 1.20.3
>            Reporter: ari
>            Priority: Critical
>
> h1. Summary
> When adaptive scheduler is enabled, TaskInformation blob files accumulate on 
> persistent storage without proper cleanup, eventually leading to storage 
> exhaustion and job stalling.
> h1. Problem Details
> On each job deployment and restart (including JM failures), 
> {{ExecutionJobVertex.getTaskInformationOrBlobKey() }}reconstructs 
> TaskInformation objects from the JobGraph. These TaskInformation objects can 
> be extremely large (>200MB) as they can contain serialized UDFs within the 
> task configuration.
> Specifically, the Configuration field in TaskInformation includes a 
> {{SERIALIZED_UDF}} entry.
> When these objects exceed the {{blob.offload-minsize}} configuration, they 
> are offloaded as permanent blobs to avoid hitting RPC framesize limits. 
> However:
>  # Blob keys are not reused across failures - each restart creates a new blob 
> with a different key (same content hash)
>  # No cleanup mechanism until global termination - Permanent blobs are only 
> cleaned up when the job reaches a globally terminated state (a state that 
> doesn’t get reached during internal restarts)
>  # JobJar blobs ARE reused - In contrast, job JAR blobs stored in the 
> JobGraph have their keys persisted and are correctly reused
>  
> h1. Impact
>  * Storage Exhaustion in storage directory (specifically a problem for 
> high-availability storage directory since there could be hard storage limits)
>  * Job Stalling when storage limit is reached as a restart occurs but because 
> it cant offload the blob it sends it over RPC causing it to hit the framesize 
> limit causing checkpoints to never trigger.
>  * Particularly severe with
>  * Complex streaming jobs with large/many serialized UDFs in task config
>  * Frequent TM failures requiring restarts
>  * High parallelism (each parallelized vertex creates its on TaskInformation 
> blob)
> h1. Reproduction Steps
>  # Enable adaptive scheduler
>  # Se {{blob.offload-minsize: 0}} (forces all TaskInformation objects to be 
> offloaded)
>  # Run {{kubectl delete pod \{task-manager-pod-name}}} to trigger job restart
>  # Wait for job to restart and process records
>  # {{kubectl exec -it \{job-manager-pod-name} -- bash}}
>  # cd to blobstore directory and run {{ls && stat *}}
>  # Observe: Every file except the job JAR blob is duplicated after each 
> restart
> h1. Expected vs Actual Behavior
> Expected: On a restart if content hash is the same, use the previously 
> created task information object. However, from Flink-7140 it seems that a 
> random key was introduced to the blob key to prevent hash-collisions. 
> Otherwise, delete the ones that are no longer needed and then generate the 
> new one.
> Actual: New TaskInformation blobs are created on every restart, there is no 
> cleanup until job reaches globally terminated state, and there is unbounded 
> blob accumulation over time



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to