pnowojski commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r420100908
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -464,6 +472,20 @@ protected void beforeInvoke() throws Exception { writer.readRecoveredState(getEnvironment().getTaskStateManager().getChannelStateReader()); } } + + // It would get possible benefits to recovery input side after output side, which guarantees the + // output can request more floating buffers from global firstly. + InputGate[] inputGates = getEnvironment().getAllInputGates(); + if (inputGates != null) { + for (InputGate inputGate : inputGates) { + inputGate.readRecoveredState(channelIOExecutor, getEnvironment().getTaskStateManager().getChannelStateReader()); + } + + // Note that we must request partition after all the single gate finishes recovery. + for (InputGate inputGate : inputGates) { + inputGate.requestPartitions(channelIOExecutor); + } Review comment: It would simplify a threading model, if this was executed from the main thread, via mailbox (in your PR there is already a race condition between processing data (receiving `EndOfPartitionEvent`/releasing resources from the main thread, vs `channelIOExecutor` requesting partitions). We could enqueue in the `channelIOExecutor` a simple job, that would enqueue a mail into the mailbox OR `inputGate.readRecoveredState` could return `Future`, and we could enqueue a mail into the mailbox once all futures are completed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org