pnowojski commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r414697432
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java ########## @@ -131,7 +133,18 @@ /** * Setup gate, potentially heavy-weight, blocking operation comparing to just creation. */ - public abstract void setup() throws IOException, InterruptedException; + public abstract void setup() throws IOException; + + /** + * It is only performed for unaligned checkpoint mode together with internal requesting partitions afterwards. + * Otherwise only {@link #requestPartitions()} is performed for other checkpoint modes. + * + * @param executor the dedicated executor for performing the recovery state for all the internal channels. + * @param reader the dedicated reader for unspilling the respective channel state from stored snapshots. + */ + public abstract void readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException; + + public abstract void requestPartitions() throws IOException; Review comment: `requestPartitions()` requires similar java doc as `readRecoveredState()` explaining when it should be called. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ########## @@ -63,6 +63,8 @@ protected AbstractRecordReader(InputGate inputGate, String[] tmpDirectories) { } protected boolean getNextRecord(T target) throws IOException, InterruptedException { + inputGate.requestPartitions(); Review comment: Why do we have to `requestPartitions()` once per every record? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org