Hi Till,

Thanks for getting back to me. Apologies for my delayed response.

Thanks for confirming that the slot ID (Allocation ID) is indeed necessary 
today for task local recovery to kick in, and thanks for your insights on how 
to make this work.

We are interested in exploring this disaggregation between local state storage 
and slots to allow potential reuse of local state even when TMs go down.

I'm planning to spend some time exploring the Flink code around local recovery 
and state persistence. I'm still new to Flink, so any guidance will be helpful. 
I think both of your ideas on how to make this happen are interesting and worth 
exploring. What's the procedure to collaborate or get guidance on this feature? 
Will a FLIP be required, or will opening a ticket do?

Thanks,
Sonam
________________________________
From: Till Rohrmann <trohrm...@apache.org>
Sent: Monday, April 26, 2021 10:24 AM
To: dev <d...@flink.apache.org>
Cc: user@flink.apache.org <user@flink.apache.org>; Sonam Mandal 
<soman...@linkedin.com>
Subject: Re: Task Local Recovery with mountable disks in the cloud

Hi Sonam,

sorry for the late reply. We were a bit caught in the midst of the feature 
freeze for the next major Flink release.

In general, I think it is a very good idea to disaggregate the local state 
storage to make it reusable across TaskManager failures. However, it is also 
not trivial to do.

Maybe let me first describe how the current task local recovery works and then 
see how we could improve it:

Flink creates for every slot allocation an AllocationID. The AllocationID 
associates a slot on a TaskExecutor with a job and is also used for scoping the 
lifetime of a slot wrt a job (theoretically, one and the same slot could be 
used to fulfill multiple slot requests of the same job if the slot allocation 
is freed in between). Note that the AllocationID is a random ID and, thus, 
changes whenever the ResourceManager allocates a new slot on a TaskExecutor for 
a job.

Task local recovery is effectively a state cache which is associated with an 
AllocationID. So for every checkpoint and every task, a TaskExecutor copies the 
state data and stores them in the task local recovery cache. The cache is 
maintained as long as the slot allocation is valid (e.g. the slot has not been 
freed by the JobMaster and the slot has not timed out). This makes the 
lifecycle management of the state data quite easy and makes sure that a process 
does not clutter local disks. On the JobMaster side, Flink remembers for every 
Execution, where it is deployed (it remembers the AllocationID). If a failover 
happens, then Flink tries to re-deploy the Executions into the slots they were 
running in before by matching the AllocationIDs.

The reason why we scoped the state cache to an AllocationID was for simplicity 
and because we couldn't guarantee that a failed TaskExecutor X will be 
restarted on the same machine again and thereby having access to the same local 
disk as before. That's also why Flink deletes the cache directory when a slot 
is freed or when the TaskExecutor is shut down gracefully.

With persistent volumes this changes and we can make the TaskExecutors 
"stateful" in the sense that we can reuse an already occupied cache. One rather 
simple idea could be to also persist the slot allocations of a TaskExecutor 
(which slot is allocated and what is its assigned AllocationID). This 
information could be used to re-initialize the TaskExecutor upon restart. That 
way, it does not have to register at the ResourceManager and wait for new slot 
allocations but could directly start offering its slots to the jobs it 
remembered. If the TaskExecutor cannot find the JobMasters for the respective 
jobs, it would then free the slots and clear the cache accordingly.

This could work as long as the ResourceManager does not start new TaskExecutors 
whose slots could be used to recover the job. If this is a problem, then one 
needs to answer the question how long to wait for the old TaskExecutors to come 
back and reusing their local state vs. starting quickly a fresh instance but 
having to restore state remotely.

An alternative solution proposal which is probably more powerful albeit also 
more complex would be to make the cache information explicit when registering 
the TaskExecutor at the ResourceManager and later offering slots to the 
JobMaster. For example, the TaskExecutor could tell the ResourceManager which 
states it has locally cached (it probably needs to contain key group ranges for 
every stored state) and this information could be used to decide from which 
TaskExecutor to allocate slots for a job. Similarly on the JobMaster side we 
could use this information to calculate the best mapping between Executions and 
slots. I think that mechanism could better deal with rescaling events where 
there is no perfect match between Executions and slots because of the changed 
key group ranges.

So to answer your question: There is currently no way to preserve AllocationIDs 
across restarts. However, we could use the persistent volume to store this 
information so that we can restore it on restart of a TaskExecutor. This could 
enable task local state recovery for cases where we lose a TaskExecutor and 
restart it with the same persistent volume.

Cheers,
Till

On Wed, Apr 21, 2021 at 7:26 PM Stephan Ewen 
<se...@apache.org<mailto:se...@apache.org>> wrote:
/cc dev@flink


On Tue, Apr 20, 2021 at 1:29 AM Sonam Mandal 
<soman...@linkedin.com<mailto:soman...@linkedin.com>> wrote:

> Hello,
>
> We've been experimenting with Task-local recovery using Kubernetes. We
> have a way to specify mounting the same disk across Task Manager
> restarts/deletions for when the pods get recreated. In this scenario, we
> noticed that task local recovery does not kick in (as expected based on the
> documentation).
>
> We did try to comment out the code on the shutdown path which cleaned up
> the task local directories before the pod went down / was restarted. We
> noticed that remote recovery kicked in even though the task local state was
> present. I noticed that the slot IDs changed, and was wondering if this is
> the main reason that the task local state didn't get used in this scenario?
>
> Since we're using this shared disk to store the local state across pod
> failures, would it make sense to allow keeping the task local state so that
> we can get faster recovery even for situations where the Task Manager
> itself dies? In some sense, the storage here is disaggregated from the pods
> and can potentially benefit from task local recovery. Any reason why this
> is a bad idea in general?
>
> Is there a way to preserve the slot IDs across restarts? We setup the Task
> Manager to pin the resource-id, but that didn't seem to help. My
> understanding is that the slot ID needs to be reused for task local
> recovery to kick in.
>
> Thanks,
> Sonam
>
>

Reply via email to