zhijiangW commented on a change in pull request #11687: [FLINK-16536][network][checkpointing] Implement InputChannel state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11687#discussion_r408569272
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -243,7 +249,23 @@ void requestPartitions() throws IOException, InterruptedException { } for (InputChannel inputChannel : inputChannels.values()) { - inputChannel.requestSubpartition(); + executor.submit(() -> { + try { + inputChannel.initializeState(reader); + } catch (Throwable t) { + inputChannel.setError(t); + } + }); + } + + for (InputChannel inputChannel : inputChannels.values()) { + executor.submit(() -> { + try { + inputChannel.requestSubpartition(); Review comment: Yes, it has another issue here. In order to compatible with previous process, we can divide the previous `requestSubpartition` into two steps. The first step is to create client which can be done during `InputGate#setup` as before. The second step is to actual request partition which is triggered after reading channel state as done in this PR change. To do so, this check is still valid, but I guess it is still breaking the previous assumption which actually wants to guarantee the partition is really requested before calling `getNextBuffer`. But now the existing client does not mean the partition was requested already. So another option might remove this `checkState`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services