Hey all, We are running a stateful stream processing job on k8s using per-job standalone deployment entrypoint. Flink version: 1.12.1
*Problem*: We have observed that whenever a task manager is either gracefully shut down or killed ( due to OOM, k8s worker node drain out etc ) it doesn't clean up the rocksdb state directories from the local disk. But when the task manager restarts and it receives new task allocation from the resource manager it rebuilds its local state for those tasks from the previous completed checkpoint. Over the period of time after multiple restarts, the task manager's local disk ends up accumulating lots of such orphan rocksdb directories. *Questions*: This isn't causing any functional issues to us, but it adds up lots of repeated ops overhead of cleaning these disks periodically. As a workaround, we are thinking of cleaning the local rocksdb directories except for the *taskmanager.state.local.root-dirs *before starting the task manager java process. Since, during every task manager restart keyed state backends for allocated tasks are anyway restored we feel it is the safest option atm and will solve our problem of ever growing disk on task manager pods. Is it safe to do so or are there any other consequences of it? Is there any config or restart policy that takes care of cleaning up such stale rocksdb directories during the statebackend restore process?. A sort of similar clean up is required when local task recovery is enabled. Whenever the task manager is not shut down gracefully the old localState doesn't get cleaned up on the next restart. This also causes lots of disk space wastage. It's easier to delete rocksdb working directories from previou run, but not so straightforward for the localState as one has to figure out which one of them are actually stale allocation IDs and clean only those one. Or check the latest completed checkpoint and delete all localStates directories for older checkpoints and allocation-ids. Is there any other solution to this problem? Also would like to learn from other users how are you handling these operational tasks currently? configurations: state.backend.local-recovery: true taskmanager.state.local.root-dirs: /data/flink/ RocksDb backend DB storage path: /data/flink ( set programmatically ) - Dhanesh Arole