rkhachatryan commented on a change in pull request #13351:
URL: https://github.com/apache/flink/pull/13351#discussion_r494498226



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -502,33 +503,24 @@ protected void beforeInvoke() throws Exception {
        }
 
        private void readRecoveredChannelState() throws IOException, 
InterruptedException {
-               ChannelStateReader reader = 
getEnvironment().getTaskStateManager().getChannelStateReader();
-               if (!reader.hasChannelStates()) {
-                       requestPartitions();
-                       return;
-               }
-
-               ResultPartitionWriter[] writers = 
getEnvironment().getAllWriters();
-               if (writers != null) {
-                       for (ResultPartitionWriter writer : writers) {
-                               if (writer instanceof 
CheckpointedResultPartition) {
-                                       ((CheckpointedResultPartition) 
writer).readRecoveredState(reader);
-                               } else {
-                                       throw new IllegalStateException(
-                                                       "Cannot restore state 
to a non-checkpointable partition type: " + writer);
+               SequentialChannelStateReader reader = 
getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
+               if (reader.hasChannelStates()) {
+                       reader.readOutputData(getEnvironment().getAllWriters());
+                       channelIOExecutor.execute(() -> {
+                               try {
+                                       
reader.readInputData(getEnvironment().getAllInputGates());
+                               } catch (Exception e) {
+                                       
asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
                                }
-                       }
-               }
+                       });
 
-               // It would get possible benefits to recovery input side after 
output side, which guarantees the
-               // output can request more floating buffers from global firstly.
-               InputGate[] inputGates = getEnvironment().getAllInputGates();
-               if (inputGates != null && inputGates.length > 0) {
-                       for (InputGate inputGate : inputGates) {
+                       for (InputGate inputGate : 
getEnvironment().getAllInputGates()) {
                                inputGate
-                                       .readRecoveredState(channelIOExecutor, 
reader)
+                                       .getStateConsumedFuture()
                                        .thenRun(() -> 
mainMailboxExecutor.execute(inputGate::requestPartitions, "Input gate request 
partitions"));
                        }
+               } else {
+                       requestPartitions();

Review comment:
       Yes, this is what I'm doing in `[FLINK-18989][task] Read channel state 
unconditionally` (but I'm trying to move this commit to #13467).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to