zhijiangW commented on a change in pull request #11687: 
[FLINK-16536][network][checkpointing] Implement InputChannel state recovery for 
unaligned checkpoint
URL: https://github.com/apache/flink/pull/11687#discussion_r408574292
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##########
 @@ -149,6 +149,47 @@ void assignExclusiveSegments() throws IOException {
                }
        }
 
+       @Override
+       public void initializeState(ChannelStateReader reader) throws 
IOException, InterruptedException {
+               numRequiredBuffers = initialCredit + 
inputGate.getBufferPool().getMaxNumberOfMemorySegments();
+               unannouncedCredit.set(initialCredit);
+
+               while (true) {
+                       Buffer buffer;
+                       synchronized (bufferQueue) {
+                               buffer = bufferQueue.takeBuffer();
+                               if (buffer == null) {
+                                       if (isReleased()) {
+                                               return;
+                                       }
+
+                                       if (!isWaitingForFloatingBuffers) {
+                                               buffer = 
inputGate.getBufferPool().requestBuffer();
+                                               if (buffer == null) {
+                                                       
inputGate.getBufferProvider().addBufferListener(this);
+                                                       
isWaitingForFloatingBuffers = true;
+                                               }
+                                       }
+                               }
+                       }
+
+                       if (buffer == null) {
+                               wait(10);
 
 Review comment:
   TBH the current wait way is also not in my favor as I mentioned as 
unsatisfied points, because how long time to wait might be an issue.
   
   I also considered other ways, but the key problem is that we can not exit 
this runnable to let the thread execute other runables in advance, so it seems 
somehow as blocking way here. In contrast, the previous floating request from 
netty thread was non-blocking way, so we do not need the mechanism of 
`notifyAll`  inside `RemoteInputChannel#recycle` and 
`RemoteInputChannel#notifyBufferAvailable`.
   
   Maybe it is possible to add `notifyAll` in above two methods for compatible 
with the new process. WDYT?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to