[ https://issues.apache.org/jira/browse/FLINK-10966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
vinoyang closed FLINK-10966. ---------------------------- Resolution: Duplicate Release Note: The mechanism of FLINK-8871 can solve the problem of this issue. > Optimize the release blocking logic in BarrierBuffer > ---------------------------------------------------- > > Key: FLINK-10966 > URL: https://issues.apache.org/jira/browse/FLINK-10966 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Reporter: vinoyang > Assignee: vinoyang > Priority: Major > > Issue: > Currently, mixing CancelCheckpointMarker control events with data flow to > drive task to release blocking logic in BarrierBuffer may result in blocking > logic not being released in time, further leading to a large amount of data > being spilled to disk. > The source code for this problem is as follows: > {code:java} > BufferOrEvent bufferOrEvent = next.get(); > if (isBlocked(bufferOrEvent.getChannelIndex())) { //issue line > // if the channel is blocked we, we just store the BufferOrEvent > bufferBlocker.add(bufferOrEvent); > checkSizeLimit(); > } > else if (bufferOrEvent.isBuffer()) { > return bufferOrEvent; > } > else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { > if (!endOfStream) { > // process barriers only if there is a chance of the checkpoint > completing > processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), > bufferOrEvent.getChannelIndex()); > } > } > else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) > { > processCancellationBarrier((CancelCheckpointMarker) > bufferOrEvent.getEvent()); > } > else { > if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { > processEndOfPartition(); > } > return bufferOrEvent; > } > {code} > Scenarios: > Considering a simple DAG:source->map (network shuffle), the degree of > parallelism is 10. The checkpoint semantics is exactly once. > The first checkpoint: barriers of 9 source subtask are received by all map > subtask. One of the source subtasks is blocked, resulting in the failure to > send barrier. Eventually, the checkpoint will fail due to timeout. At this > point, 9 corresponding input channel are blocked because they have received > barrier. > Second checkpoint: At this point, the special source subtask is still blocked > and cannot send any events to downstream, while the nine input channels are > still blocked. From the current implementation, the data or events it > receives will not be processed, but will be stored directly. Therefore, the > barrier of the downstream task will not be released. The only hope is that > the cached data reaches the maximum limit. > I think the main problem here is that we should not store data which comes > from blocked input channels directly. Especially when one input channel is > blocked by upstream and nine input channels are marked as blocked, we may not > always be able to release the blocking. > A better mechanism might be that we send notifyCheckpointFailed callback via > CheckpointCoordinator, allowing each task to unblock itself. This mechanism > can make the release of the old checkpoint align independent of the trigger > of the new checkpoint. If the interval of the checkpoints are very long but > the timeout is very short, then the effect of the optimization will be more > obvious. > Ultimately, we want to reduce unnecessary blocking and data spill to disk. -- This message was sent by Atlassian JIRA (v7.6.3#76005)