[ 
https://issues.apache.org/jira/browse/FLINK-12245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16821407#comment-16821407
 ] 

Hwanju Kim commented on FLINK-12245:
------------------------------------

Thanks Till! So, FLINK-9635 indeed extracted the relevant subset for this issue 
from the scheduler refactoring in group-aware scheduler work. That's nice and 
what I wanted. I just cherry-picked the patch to 1.6.2 and retest worked fine. 

> Transient slot allocation failure on job recovery
> -------------------------------------------------
>
>                 Key: FLINK-12245
>                 URL: https://issues.apache.org/jira/browse/FLINK-12245
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.6.3
>         Environment: Flink 1.6.2 with Kubernetes
>            Reporter: Hwanju Kim
>            Priority: Major
>
> In 1.6.2, We have experienced that slot allocation is transiently failed on 
> job recovery especially when task manager (TM) is unavailable leading to 
> heartbeat failure. By transient, it means it fails once with slot allocation 
> timeout (by default 5min) and then next recovering restart is succeeded.
>  
> I found that each _Execution_ remembers previous allocations and tries to 
> prefer the last previous allocation for the sake of local state recovery from 
> the resolved slot candidates. If the previous allocation belongs to 
> unavailable TM, the candidates do not have this previous allocation, thereby 
> forcing slot provider to request a new slot to resource manager, which then 
> finds a new TM and its available slots. So far it is expected and fine, but 
> any next execution that also belonged to the unavailable TM and has the first 
> task as predecessor fails with the unavailable previous allocation as well. 
> Here it also requests another new slot since it never finds the gone previous 
> allocation from candidates. However, this behavior may make more slot 
> requests than available. For example, if two pipelined tasks shared one slot 
> in one TM, which is then crashed being replaced with a new TM, two new slot 
> requests are generated from the tasks. Since two slot requests cannot be 
> fulfilled by one slot TM, it hits slot allocation timeout and restarts the 
> job. 
>  
> {code:java}
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Could not allocate all requires slots within timeout of 300000 ms. Slots 
> required: 2, slots allocated: 1 {code}
>  
> At the next round of recovery, since the second execution failed to allocate 
> a new slot, its last previous allocation is _null_, then it falls back to 
> locality-based allocation strategy, which can find the slot allocated for the 
> first task, and thus succeeded. Although it is eventually succeeded, it 
> increases downtime by slot allocation timeout.
>  
> The reason of this behavior is 
> _PreviousAllocationSchedulingStrategy.findMatchWithLocality()_ immediately 
> returns _null_ if previous allocation is not empty and is not contained in 
> candidate list. I thought that if previous allocation is not in the 
> candidates, it can fall back to 
> _LocationPreferenceSchedulingStrategy.findMatchWithLocality()_ rather than 
> returning _null_. By doing so, it can avoid requesting more than available. 
> Requesting more slots could be fine in an environment where resource managers 
> can reactively spawn up more TMs (like Yarn/Mesos) although it could spawn 
> more than needed, but StandaloneResourceManager with statically provisioned 
> resource cannot help but failing to allocate requested slots.
>  
> Having looked at the mainline branch and 1.8.0, although I have not attempted 
> to reproduce this issue with mainline, the related code is changed to what I 
> have expected (falling back to locality-based strategy if previous allocation 
> is not in candidates): 
> PreviousAllocationSlotSelectionStrategy.selectBestSlotForProfile(). Those led 
> me to reading group-aware scheduling work 
> ([https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk]).
>   In addition, I checked in 1.6.2 _matchPreviousLocationNotAvailable_ test 
> expects the problematic behavior I described. So, I started wondering whether 
> the behavior of previous allocation strategy in non-mainline is by design or 
> not. I have a fix similar to the mainline and verified that the problem is 
> resolved, but I am bringing up the issue to have context around the behavior 
> and to discuss what would be the side-effect of the fix. I understand the 
> current vertex-by-vertex scheduling would be inefficient by letting an 
> execution that belonged to unavailable slot steal another task's previous 
> slot, but having slot allocation failure seems worse to me.
>  
> I searched with slot allocation failure term in existing issues, and couldn't 
> find the same issue, hence this issue. Please feel free to deduplicate it if 
> any.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to