pnowojski commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r420100908



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -464,6 +472,20 @@ protected void beforeInvoke() throws Exception {
                                        
writer.readRecoveredState(getEnvironment().getTaskStateManager().getChannelStateReader());
                                }
                        }
+
+                       // It would get possible benefits to recovery input 
side after output side, which guarantees the
+                       // output can request more floating buffers from global 
firstly.
+                       InputGate[] inputGates = 
getEnvironment().getAllInputGates();
+                       if (inputGates != null) {
+                               for (InputGate inputGate : inputGates) {
+                                       
inputGate.readRecoveredState(channelIOExecutor, 
getEnvironment().getTaskStateManager().getChannelStateReader());
+                               }
+
+                               // Note that we must request partition after 
all the single gate finishes recovery.
+                               for (InputGate inputGate : inputGates) {
+                                       
inputGate.requestPartitions(channelIOExecutor);
+                               }

Review comment:
       It would simplify a threading model, if this was executed from the main 
thread, via mailbox (in your PR there is already a race condition between 
processing data (receiving 
`EndOfPartitionEvent`/`InputChannel#releaseAllResources` from the main thread, 
vs `channelIOExecutor` requesting partitions). Example failure: 
https://dev.azure.com/pnowojski/Flink/_build/results?buildId=4&view=logs&j=a1590513-d0ea-59c3-3c7b-aad756c48f25&t=d62215ae-261e-5cec-c84f-5abb77c78ded
 (this happened on a modified version of this code, but I think it's a valid 
failure on your version as well)
   
   We could enqueue in the `channelIOExecutor` a simple job, that would enqueue 
a mail into the mailbox OR `inputGate.readRecoveredState` could return 
`Future`, and we could enqueue a mail into the mailbox once all futures are 
completed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to