[ 
https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang closed FLINK-10966.
----------------------------
      Resolution: Duplicate
    Release Note: The mechanism of FLINK-8871 can solve the problem of this 
issue.

> Optimize the release blocking logic in BarrierBuffer
> ----------------------------------------------------
>
>                 Key: FLINK-10966
>                 URL: https://issues.apache.org/jira/browse/FLINK-10966
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / State Backends
>            Reporter: vinoyang
>            Assignee: vinoyang
>            Priority: Major
>
> Issue:
> Currently, mixing CancelCheckpointMarker control events with data flow to 
> drive task to release blocking logic in BarrierBuffer may result in blocking 
> logic not being released in time, further leading to a large amount of data 
> being spilled to disk.
> The source code for this problem is as follows:
> {code:java}
> BufferOrEvent bufferOrEvent = next.get();
> if (isBlocked(bufferOrEvent.getChannelIndex())) {          //issue line
>    // if the channel is blocked we, we just store the BufferOrEvent
>    bufferBlocker.add(bufferOrEvent);
>    checkSizeLimit();
> }
> else if (bufferOrEvent.isBuffer()) {
>    return bufferOrEvent;
> }
> else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
>    if (!endOfStream) {
>       // process barriers only if there is a chance of the checkpoint 
> completing
>       processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), 
> bufferOrEvent.getChannelIndex());
>    }
> }
> else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) 
> {
>    processCancellationBarrier((CancelCheckpointMarker) 
> bufferOrEvent.getEvent());
> }
> else {
>    if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
>       processEndOfPartition();
>    }
>    return bufferOrEvent;
> }
> {code}
> Scenarios:
> Considering a simple DAG:source->map (network shuffle), the degree of 
> parallelism is 10. The checkpoint semantics is exactly once.
> The first checkpoint: barriers of 9 source subtask are received by all map 
> subtask. One of the source subtasks is blocked, resulting in the failure to 
> send barrier. Eventually, the checkpoint will fail due to timeout. At this 
> point, 9 corresponding input channel are blocked because they have received 
> barrier.
> Second checkpoint: At this point, the special source subtask is still blocked 
> and cannot send any events to downstream, while the nine input channels are 
> still blocked. From the current implementation, the data or events it 
> receives will not be processed, but will be stored directly. Therefore, the 
> barrier of the downstream task will not be released. The only hope is that 
> the cached data reaches the maximum limit.
> I think the main problem here is that we should not store data which comes 
> from blocked input channels directly. Especially when one input channel is 
> blocked by upstream and nine input channels are marked as blocked, we may not 
> always be able to release the blocking.
> A better mechanism might be that we send notifyCheckpointFailed callback via 
> CheckpointCoordinator, allowing each task to unblock itself. This mechanism 
> can make the release of the old checkpoint align independent of the trigger 
> of the new checkpoint. If the interval of the checkpoints are very long but 
> the timeout is very short, then the effect of the optimization will be more 
> obvious.
> Ultimately, we want to reduce unnecessary blocking and data spill to disk.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to