zhijiangW commented on a change in pull request #11687: 
[FLINK-16536][network][checkpointing] Implement InputChannel state recovery for 
unaligned checkpoint
URL: https://github.com/apache/flink/pull/11687#discussion_r409283520
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##########
 @@ -149,6 +149,47 @@ void assignExclusiveSegments() throws IOException {
                }
        }
 
+       @Override
+       public void initializeState(ChannelStateReader reader) throws 
IOException, InterruptedException {
+               numRequiredBuffers = initialCredit + 
inputGate.getBufferPool().getMaxNumberOfMemorySegments();
+               unannouncedCredit.set(initialCredit);
+
+               while (true) {
+                       Buffer buffer;
+                       synchronized (bufferQueue) {
+                               buffer = bufferQueue.takeBuffer();
+                               if (buffer == null) {
+                                       if (isReleased()) {
+                                               return;
+                                       }
+
+                                       if (!isWaitingForFloatingBuffers) {
+                                               buffer = 
inputGate.getBufferPool().requestBuffer();
+                                               if (buffer == null) {
+                                                       
inputGate.getBufferProvider().addBufferListener(this);
+                                                       
isWaitingForFloatingBuffers = true;
+                                               }
+                                       }
+                               }
+                       }
+
+                       if (buffer == null) {
+                               wait(10);
 
 Review comment:
   Yes, we have the same issue in another option of 
`SpilledInputChannel`/`SpilledInputGate`.
   
   In conclusion, there may be several solutions:
   
   - `wait()` if unavailable buffers: block the unspilling thread always and it 
fits our current requirements. We should not exit current channel to switch 
another channel temporarily, which might bring random IO. And it requires the 
wakeup mechanism when buffer available again.
   
   - `wait(timeout)` if unavailable buffers: more or less the same with above 
`wait()`, but wakeup mechanism is not a mandatory, can be regarded as  somehow 
improvement to wakeup eerily. 
   
   - unblocking way: terminate the current channel and unspill another channel 
with available buffers. It would bring random IO as mentioned above and not the 
current suggestion.
   
   Based on the current situation with custom thread for unspilling, and allow 
only one thread to unspill channel one by one to avoid random IO, so I choose 
the unblocking option 1 or 2. 
   
   Regarding the option 2, wakeup is not necessary, then we do not need to 
touch the previous processes `RemoteInputChannel#recycle` and 
`RemoteInputChannel#notifyBufferAvailable`. If we want to add the wakeup 
mechanism, option 1 also makes sense.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to