zhijiangW commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r420178858
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -464,6 +472,20 @@ protected void beforeInvoke() throws Exception { writer.readRecoveredState(getEnvironment().getTaskStateManager().getChannelStateReader()); } } + + // It would get possible benefits to recovery input side after output side, which guarantees the + // output can request more floating buffers from global firstly. + InputGate[] inputGates = getEnvironment().getAllInputGates(); + if (inputGates != null) { + for (InputGate inputGate : inputGates) { + inputGate.readRecoveredState(channelIOExecutor, getEnvironment().getTaskStateManager().getChannelStateReader()); + } + + // Note that we must request partition after all the single gate finishes recovery. + for (InputGate inputGate : inputGates) { + inputGate.requestPartitions(channelIOExecutor); + } Review comment: Yeah, actually I also considered the way of requesting partition by mailbox thread after all futures completed returned by `inputGate.readRecoveredState`. But I also thought of another existing case to execute partition request by non-task main thread. During `SingleInputGate#updateInputChannel`, when the unknown channel transform into local or remote channel, then it would request partition directly by rpc thread. If this case makes sense, then my assumption was that partition request actually can be executed by any other threads without race condition issues. So I take the current way instead to save some efforts. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org