Hi Dhanesh,

The way local state works in Flink currently is the following: The user
configures a `taskmanager.state.local.root-dirs` or the tmp directory is
used where Flink creates a "localState" directory. This is the base
directory for all local state. Within this directory a TaskManager creates
for every allocation a sub directory using the `AllocationID`. Inside this
directory, Flink then stores the local state artefacts.

When Flink frees an allocation, then the corresponding directory is
deleted. In case that the process is being killed via a SIGTERM signal,
Flink also registers a shut down hook which tries to delete all directories
for the known `AllocationIDs`. If the shut down hooks do not run (e.g.
killed via SIGKILL), then Flink leaves some residual state.

Now the problem is what happens if the TaskManager process is restarted on
the same machine. In this case, Flink will simply use the same local state
directory but it ignores existing allocation id sub directories. The reason
is that Flink does not know whether these allocation id sub directories are
not used by another Flink process running on the same machine. In order to
make this decision Flink would have to know that it is the owner of these
sub directories. This could work if each TaskManager process is started
with a unique ID and if this ID is reused across restart attempts. This is
currently not for every deployment the case.

Long story short, it is currently expected that Flink can leave some
residual state in case of a hard process stop. Cleaning this state up is at
the moment unfortunately the responsibility of the user.


On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole <davcdhane...@gmail.com> wrote:

> Hey all,
> We are running a stateful stream processing job on k8s using per-job
> standalone deployment entrypoint. Flink version: 1.12.1
> *Problem*: We have observed that whenever a task manager is either
> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
> ) it doesn't clean up the rocksdb state directories from the local disk.
> But when the task manager restarts and it receives new task allocation from
> the resource manager it rebuilds its local state for those tasks from the
> previous completed checkpoint. Over the period of time after multiple
> restarts, the task manager's local disk ends up accumulating lots of such
> orphan rocksdb directories.
> *Questions*: This isn't causing any functional issues to us, but it adds
> up lots of repeated ops overhead of cleaning these disks periodically. As a
> workaround, we are thinking of cleaning the local rocksdb directories
> except for the *taskmanager.state.local.root-dirs *before starting the
> task manager java process. Since, during every task manager restart keyed
> state backends for allocated tasks are anyway restored we feel it is the
> safest option atm and will solve our problem of ever growing disk on task
> manager pods. Is it safe to do so or are there any other consequences of
> it? Is there any config or restart policy that takes care of cleaning up
> such stale rocksdb directories during the statebackend restore process?.
> A sort of similar clean up is required when local task recovery is
> enabled. Whenever the task manager is not shut down gracefully the old
> localState doesn't get cleaned up on the next restart. This also causes
> lots of disk space wastage. It's easier to delete rocksdb working
> directories from previou run, but not so straightforward for the localState
> as one has to figure out which one of them are actually stale allocation
> IDs and clean only those one. Or check the latest completed checkpoint and
> delete all localStates directories for older checkpoints and
> allocation-ids. Is there any other solution to this problem? Also would
> like to learn from other users how are you handling these operational tasks
> currently?
> configurations:
> state.backend.local-recovery: true
> taskmanager.state.local.root-dirs: /data/flink/
> RocksDb backend DB storage path:  /data/flink ( set programmatically )
> -
> Dhanesh Arole

Reply via email to