Weijie Guo created FLINK-31330:
----------------------------------

             Summary: Batch shuffle may deadlock for operator with priority 
input
                 Key: FLINK-31330
                 URL: https://issues.apache.org/jira/browse/FLINK-31330
             Project: Flink
          Issue Type: Technical Debt
          Components: Runtime / Network
    Affects Versions: 1.16.1
            Reporter: Weijie Guo
            Assignee: Weijie Guo


For batch job, some operator's input have priority. For example, hash join 
operator has two inputs called {{build}} and {{probe}} respectively. Only after 
the build input is finished can the probe input start consuming. Unfortunately, 
the priority of input will not affect multiple inputs to request upstream 
data(i.e. request partition). In current implementation, when all states are 
restored, inputGate will start to request partition. This will enable the 
upstream {{IO scheduler}} to register readers for all downstream channels, so 
there is the possibility of deadlock.
Assume that the build and probe input's upstream tasks of hash join are 
deployed in the same TM. Then the corresponding readers will be registered to 
an single {{IO scheduler}}, and they share the same 
{{BatchShuffleReadBufferPool}}.  If the IO thread happens to load too many 
buffers for the probe reader, but the downstream will not consume the data, 
which will cause the build reader to be unable to request enough buffers. 
Therefore, deadlock occurs.
In fact, we realized this problem at the beginning of the design of 
{{SortMergeShuffle}}, so we introduced a timeout mechanism when requesting read 
buffers. If this happens, the downstream task will trigger failover to avoid 
permanent blocking. However, under the default configuration, TPC-DS test with 
10T data can easily cause the job to fail because of this reason. It seems that 
this problem needs to be solved more better.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to