pnowojski commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r411316537



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
##########
@@ -131,7 +135,12 @@
        /**
         * Setup gate, potentially heavy-weight, blocking operation comparing 
to just creation.
         */
-       public abstract void setup() throws IOException, InterruptedException;
+       public abstract void setup() throws IOException;
+
+       public abstract void initializeStateAndRequestPartitions(

Review comment:
       `readRecoveredStateAndRequestPartitions`?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -914,7 +914,9 @@ public void testInitializeResultPartitionState() throws 
Exception {
 
                MockEnvironment mockEnvironment = new 
MockEnvironmentBuilder().build();
                mockEnvironment.addOutputs(Arrays.asList(partitions));
-               StreamTask task = new 
MockStreamTaskBuilder(mockEnvironment).build();
+               StreamConfig config = new StreamConfig(new Configuration());
+               config.setUnalignedCheckpointsEnabled(true);

Review comment:
       Do we want this change here? I would like to avoid introducing many 
different places that are manually disabling/enabling unaligned checkpoints. 
Maybe this should be handled generically as part of this ticket 
https://issues.apache.org/jira/browse/FLINK-17258 ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
##########
@@ -162,6 +163,95 @@ void assignExclusiveSegments() throws IOException {
        // Consume
        // 
------------------------------------------------------------------------
 
+       void readRecoveredState(ChannelStateReader reader) throws IOException, 
InterruptedException {
+               beforeReadRecoveredState();
+
+               while (true) {
+                       Buffer buffer;
+                       synchronized (bufferQueue) {
+                               buffer = bufferQueue.takeBuffer();
+                               if (buffer == null) {
+                                       if (isReleased()) {
+                                               return;
+                                       }
+                                       if (!isWaitingForFloatingBuffers) {
+                                               buffer = 
inputGate.getBufferPool().requestBuffer();
+                                               if (buffer == null) {
+                                                       
inputGate.getBufferProvider().addBufferListener(this);
+                                                       
isWaitingForFloatingBuffers = true;
+                                               }
+                                       }
+                               }
+                               if (buffer == null) {
+                                       bufferQueue.wait();
+                                       continue;
+                               }
+                       }
+
+                       ChannelStateReader.ReadResult result = 
internalReaderRecoveredState(reader, buffer);
+                       if (result == 
ChannelStateReader.ReadResult.NO_MORE_DATA) {
+                               return;
+                       }
+               }
+       }
+
+       private void beforeReadRecoveredState() {

Review comment:
       `initializeCreditsForRecoveringState`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -222,19 +225,65 @@ public SingleInputGate(
        }
 
        @Override
-       public void setup() throws IOException, InterruptedException {
+       public void setup() throws IOException {
                checkState(this.bufferPool == null, "Bug in input gate setup 
logic: Already registered buffer pool.");
                // assign exclusive buffers to input channels directly and use 
the rest for floating buffers
                assignExclusiveSegments();
 
                BufferPool bufferPool = bufferPoolFactory.get();
                setBufferPool(bufferPool);
+       }
 
-               requestPartitions();
+       @Override
+       public void initializeStateAndRequestPartitions(
+                       boolean hasStates,
+                       @Nullable ExecutorService executor,
+                       ChannelStateReader reader) throws Exception {
+
+               if (hasStates) {
+                       checkNotNull(executor);
+                       readRecoveredStateBeforeRequestPartition(executor, 
reader);
+               } else {
+                       requestPartitions();
+               }

Review comment:
       This doesn't look right - boolean `hasStates` and `@Nullable executor` - 
it looks like this check should be on a different layer. As this method looks 
like it's called only in one place, shouldn't it be just inlined?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -204,6 +205,9 @@
 
        protected final MailboxProcessor mailboxProcessor;
 
+       @Nullable
+       private final ExecutorService channelStateUnspillingExecutor;

Review comment:
       Add a `TODO` that it should be replaced by a global TaskManager 
ioExecutor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to