pnowojski commented on a change in pull request #11687: URL: https://github.com/apache/flink/pull/11687#discussion_r411316537
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java ########## @@ -131,7 +135,12 @@ /** * Setup gate, potentially heavy-weight, blocking operation comparing to just creation. */ - public abstract void setup() throws IOException, InterruptedException; + public abstract void setup() throws IOException; + + public abstract void initializeStateAndRequestPartitions( Review comment: `readRecoveredStateAndRequestPartitions`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java ########## @@ -914,7 +914,9 @@ public void testInitializeResultPartitionState() throws Exception { MockEnvironment mockEnvironment = new MockEnvironmentBuilder().build(); mockEnvironment.addOutputs(Arrays.asList(partitions)); - StreamTask task = new MockStreamTaskBuilder(mockEnvironment).build(); + StreamConfig config = new StreamConfig(new Configuration()); + config.setUnalignedCheckpointsEnabled(true); Review comment: Do we want this change here? I would like to avoid introducing many different places that are manually disabling/enabling unaligned checkpoints. Maybe this should be handled generically as part of this ticket https://issues.apache.org/jira/browse/FLINK-17258 ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ########## @@ -162,6 +163,95 @@ void assignExclusiveSegments() throws IOException { // Consume // ------------------------------------------------------------------------ + void readRecoveredState(ChannelStateReader reader) throws IOException, InterruptedException { + beforeReadRecoveredState(); + + while (true) { + Buffer buffer; + synchronized (bufferQueue) { + buffer = bufferQueue.takeBuffer(); + if (buffer == null) { + if (isReleased()) { + return; + } + if (!isWaitingForFloatingBuffers) { + buffer = inputGate.getBufferPool().requestBuffer(); + if (buffer == null) { + inputGate.getBufferProvider().addBufferListener(this); + isWaitingForFloatingBuffers = true; + } + } + } + if (buffer == null) { + bufferQueue.wait(); + continue; + } + } + + ChannelStateReader.ReadResult result = internalReaderRecoveredState(reader, buffer); + if (result == ChannelStateReader.ReadResult.NO_MORE_DATA) { + return; + } + } + } + + private void beforeReadRecoveredState() { Review comment: `initializeCreditsForRecoveringState`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -222,19 +225,65 @@ public SingleInputGate( } @Override - public void setup() throws IOException, InterruptedException { + public void setup() throws IOException { checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool."); // assign exclusive buffers to input channels directly and use the rest for floating buffers assignExclusiveSegments(); BufferPool bufferPool = bufferPoolFactory.get(); setBufferPool(bufferPool); + } - requestPartitions(); + @Override + public void initializeStateAndRequestPartitions( + boolean hasStates, + @Nullable ExecutorService executor, + ChannelStateReader reader) throws Exception { + + if (hasStates) { + checkNotNull(executor); + readRecoveredStateBeforeRequestPartition(executor, reader); + } else { + requestPartitions(); + } Review comment: This doesn't look right - boolean `hasStates` and `@Nullable executor` - it looks like this check should be on a different layer. As this method looks like it's called only in one place, shouldn't it be just inlined? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ########## @@ -204,6 +205,9 @@ protected final MailboxProcessor mailboxProcessor; + @Nullable + private final ExecutorService channelStateUnspillingExecutor; Review comment: Add a `TODO` that it should be replaced by a global TaskManager ioExecutor? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org