zhijiangW commented on a change in pull request #11687: [FLINK-16536][network][checkpointing] Implement InputChannel state recovery for unaligned checkpoint URL: https://github.com/apache/flink/pull/11687#discussion_r409283520
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -149,6 +149,47 @@ void assignExclusiveSegments() throws IOException { } } + @Override + public void initializeState(ChannelStateReader reader) throws IOException, InterruptedException { + numRequiredBuffers = initialCredit + inputGate.getBufferPool().getMaxNumberOfMemorySegments(); + unannouncedCredit.set(initialCredit); + + while (true) { + Buffer buffer; + synchronized (bufferQueue) { + buffer = bufferQueue.takeBuffer(); + if (buffer == null) { + if (isReleased()) { + return; + } + + if (!isWaitingForFloatingBuffers) { + buffer = inputGate.getBufferPool().requestBuffer(); + if (buffer == null) { + inputGate.getBufferProvider().addBufferListener(this); + isWaitingForFloatingBuffers = true; + } + } + } + } + + if (buffer == null) { + wait(10); Review comment: Yes, we have the same issue in another option of `SpilledInputChannel`/`SpilledInputGate`. In conclusion, there may be several solutions: - `wait()` if unavailable buffers: block the unspilling thread always and it fits our current requirements. We should not exit current channel to switch another channel temporarily, which might bring random IO. And it requires the wakeup mechanism when buffer available again. - `wait(timeout)` if unavailable buffers: more or less the same with above `wait()`, but wakeup mechanism is not a mandatory, can be regarded as somehow improvement to wakeup eerily. - unblocking way: terminate the current channel and unspill another channel with available buffers. It would bring random IO as mentioned above and not the current suggestion. Based on the current situation with custom thread for unspilling, and allow only one thread to unspill channel one by one to avoid random IO, so I choose the unblocking option 1 or 2. Regarding the option 2, wakeup is not necessary, then we do not need to touch the previous processes `RemoteInputChannel#recycle` and `RemoteInputChannel#notifyBufferAvailable`. If we want to add the wakeup mechanism, option 1 also makes sense. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services