Weijie Guo created FLINK-31330: ---------------------------------- Summary: Batch shuffle may deadlock for operator with priority input Key: FLINK-31330 URL: https://issues.apache.org/jira/browse/FLINK-31330 Project: Flink Issue Type: Technical Debt Components: Runtime / Network Affects Versions: 1.16.1 Reporter: Weijie Guo Assignee: Weijie Guo
For batch job, some operator's input have priority. For example, hash join operator has two inputs called {{build}} and {{probe}} respectively. Only after the build input is finished can the probe input start consuming. Unfortunately, the priority of input will not affect multiple inputs to request upstream data(i.e. request partition). In current implementation, when all states are restored, inputGate will start to request partition. This will enable the upstream {{IO scheduler}} to register readers for all downstream channels, so there is the possibility of deadlock. Assume that the build and probe input's upstream tasks of hash join are deployed in the same TM. Then the corresponding readers will be registered to an single {{IO scheduler}}, and they share the same {{BatchShuffleReadBufferPool}}. If the IO thread happens to load too many buffers for the probe reader, but the downstream will not consume the data, which will cause the build reader to be unable to request enough buffers. Therefore, deadlock occurs. In fact, we realized this problem at the beginning of the design of {{SortMergeShuffle}}, so we introduced a timeout mechanism when requesting read buffers. If this happens, the downstream task will trigger failover to avoid permanent blocking. However, under the default configuration, TPC-DS test with 10T data can easily cause the job to fail because of this reason. It seems that this problem needs to be solved more better. -- This message was sent by Atlassian Jira (v8.20.10#820010)