Thanks a lot for answering it in detail. This makes sense and cleared lots of doubt.
On Fri, 9 Apr 2021 at 13:02 Till Rohrmann <trohrm...@apache.org> wrote: > Hi Dhanesh, > > The way local state works in Flink currently is the following: The user > configures a `taskmanager.state.local.root-dirs` or the tmp directory is > used where Flink creates a "localState" directory. This is the base > directory for all local state. Within this directory a TaskManager creates > for every allocation a sub directory using the `AllocationID`. Inside this > directory, Flink then stores the local state artefacts. > > When Flink frees an allocation, then the corresponding directory is > deleted. In case that the process is being killed via a SIGTERM signal, > Flink also registers a shut down hook which tries to delete all directories > for the known `AllocationIDs`. If the shut down hooks do not run (e.g. > killed via SIGKILL), then Flink leaves some residual state. > > Now the problem is what happens if the TaskManager process is restarted on > the same machine. In this case, Flink will simply use the same local state > directory but it ignores existing allocation id sub directories. The reason > is that Flink does not know whether these allocation id sub directories are > not used by another Flink process running on the same machine. In order to > make this decision Flink would have to know that it is the owner of these > sub directories. This could work if each TaskManager process is started > with a unique ID and if this ID is reused across restart attempts. This is > currently not for every deployment the case. > > Long story short, it is currently expected that Flink can leave some > residual state in case of a hard process stop. Cleaning this state up is at > the moment unfortunately the responsibility of the user. > > Cheers, > Till > > On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole <davcdhane...@gmail.com> > wrote: > >> Hey all, >> >> We are running a stateful stream processing job on k8s using per-job >> standalone deployment entrypoint. Flink version: 1.12.1 >> >> *Problem*: We have observed that whenever a task manager is either >> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc >> ) it doesn't clean up the rocksdb state directories from the local disk. >> But when the task manager restarts and it receives new task allocation from >> the resource manager it rebuilds its local state for those tasks from the >> previous completed checkpoint. Over the period of time after multiple >> restarts, the task manager's local disk ends up accumulating lots of such >> orphan rocksdb directories. >> >> *Questions*: This isn't causing any functional issues to us, but it adds >> up lots of repeated ops overhead of cleaning these disks periodically. As a >> workaround, we are thinking of cleaning the local rocksdb directories >> except for the *taskmanager.state.local.root-dirs *before starting the >> task manager java process. Since, during every task manager restart keyed >> state backends for allocated tasks are anyway restored we feel it is the >> safest option atm and will solve our problem of ever growing disk on task >> manager pods. Is it safe to do so or are there any other consequences of >> it? Is there any config or restart policy that takes care of cleaning up >> such stale rocksdb directories during the statebackend restore process?. >> >> A sort of similar clean up is required when local task recovery is >> enabled. Whenever the task manager is not shut down gracefully the old >> localState doesn't get cleaned up on the next restart. This also causes >> lots of disk space wastage. It's easier to delete rocksdb working >> directories from previou run, but not so straightforward for the localState >> as one has to figure out which one of them are actually stale allocation >> IDs and clean only those one. Or check the latest completed checkpoint and >> delete all localStates directories for older checkpoints and >> allocation-ids. Is there any other solution to this problem? Also would >> like to learn from other users how are you handling these operational tasks >> currently? >> >> configurations: >> >> state.backend.local-recovery: true >> taskmanager.state.local.root-dirs: /data/flink/ >> >> RocksDb backend DB storage path: /data/flink ( set programmatically ) >> >> >> - >> Dhanesh Arole >> > -- - Dhanesh ( sent from my mobile device. Pardon me for any typos )