Thanks a lot for answering it in detail. This makes sense and cleared lots
of doubt.

On Fri, 9 Apr 2021 at 13:02 Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Dhanesh,
>
> The way local state works in Flink currently is the following: The user
> configures a `taskmanager.state.local.root-dirs` or the tmp directory is
> used where Flink creates a "localState" directory. This is the base
> directory for all local state. Within this directory a TaskManager creates
> for every allocation a sub directory using the `AllocationID`. Inside this
> directory, Flink then stores the local state artefacts.
>
> When Flink frees an allocation, then the corresponding directory is
> deleted. In case that the process is being killed via a SIGTERM signal,
> Flink also registers a shut down hook which tries to delete all directories
> for the known `AllocationIDs`. If the shut down hooks do not run (e.g.
> killed via SIGKILL), then Flink leaves some residual state.
>
> Now the problem is what happens if the TaskManager process is restarted on
> the same machine. In this case, Flink will simply use the same local state
> directory but it ignores existing allocation id sub directories. The reason
> is that Flink does not know whether these allocation id sub directories are
> not used by another Flink process running on the same machine. In order to
> make this decision Flink would have to know that it is the owner of these
> sub directories. This could work if each TaskManager process is started
> with a unique ID and if this ID is reused across restart attempts. This is
> currently not for every deployment the case.
>
> Long story short, it is currently expected that Flink can leave some
> residual state in case of a hard process stop. Cleaning this state up is at
> the moment unfortunately the responsibility of the user.
>
> Cheers,
> Till
>
> On Tue, Apr 6, 2021 at 4:55 PM dhanesh arole <davcdhane...@gmail.com>
> wrote:
>
>> Hey all,
>>
>> We are running a stateful stream processing job on k8s using per-job
>> standalone deployment entrypoint. Flink version: 1.12.1
>>
>> *Problem*: We have observed that whenever a task manager is either
>> gracefully shut down or killed ( due to OOM, k8s worker node drain out etc
>> ) it doesn't clean up the rocksdb state directories from the local disk.
>> But when the task manager restarts and it receives new task allocation from
>> the resource manager it rebuilds its local state for those tasks from the
>> previous completed checkpoint. Over the period of time after multiple
>> restarts, the task manager's local disk ends up accumulating lots of such
>> orphan rocksdb directories.
>>
>> *Questions*: This isn't causing any functional issues to us, but it adds
>> up lots of repeated ops overhead of cleaning these disks periodically. As a
>> workaround, we are thinking of cleaning the local rocksdb directories
>> except for the *taskmanager.state.local.root-dirs *before starting the
>> task manager java process. Since, during every task manager restart keyed
>> state backends for allocated tasks are anyway restored we feel it is the
>> safest option atm and will solve our problem of ever growing disk on task
>> manager pods. Is it safe to do so or are there any other consequences of
>> it? Is there any config or restart policy that takes care of cleaning up
>> such stale rocksdb directories during the statebackend restore process?.
>>
>> A sort of similar clean up is required when local task recovery is
>> enabled. Whenever the task manager is not shut down gracefully the old
>> localState doesn't get cleaned up on the next restart. This also causes
>> lots of disk space wastage. It's easier to delete rocksdb working
>> directories from previou run, but not so straightforward for the localState
>> as one has to figure out which one of them are actually stale allocation
>> IDs and clean only those one. Or check the latest completed checkpoint and
>> delete all localStates directories for older checkpoints and
>> allocation-ids. Is there any other solution to this problem? Also would
>> like to learn from other users how are you handling these operational tasks
>> currently?
>>
>> configurations:
>>
>> state.backend.local-recovery: true
>> taskmanager.state.local.root-dirs: /data/flink/
>>
>> RocksDb backend DB storage path:  /data/flink ( set programmatically )
>>
>>
>> -
>> Dhanesh Arole
>>
> --
- Dhanesh ( sent from my mobile device. Pardon me for any typos )

Reply via email to