pnowojski commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r422021656



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -477,14 +477,18 @@ protected void beforeInvoke() throws Exception {
                        // output can request more floating buffers from global 
firstly.
                        InputGate[] inputGates = 
getEnvironment().getAllInputGates();
                        if (inputGates != null) {
-                               for (InputGate inputGate : inputGates) {
-                                       
inputGate.readRecoveredState(channelIOExecutor, 
getEnvironment().getTaskStateManager().getChannelStateReader());
+                               CompletableFuture[] futures = new 
CompletableFuture[inputGates.length];
+                               for (int i = 0; i < inputGates.length; i++) {
+                                       futures[i] = 
inputGates[i].readRecoveredState(
+                                               channelIOExecutor, 
getEnvironment().getTaskStateManager().getChannelStateReader());
                                }
 
-                               // Note that we must request partition after 
all the single gate finishes recovery.
-                               for (InputGate inputGate : inputGates) {
-                                       
inputGate.requestPartitions(channelIOExecutor);
-                               }
+                               // Note that we must request partition after 
all the single gates finished recovery.
+                               
CompletableFuture.allOf(futures).thenRun(ThrowingRunnable.unchecked(() -> {
+                                       for (InputGate inputGate : inputGates) {
+                                               inputGate.requestPartitions();
+                                       }
+                               }));

Review comment:
       Hmmm, this is a bit fragile, as it implicitly assumes futures are 
completed from the task thread? Maybe add a `checkState(...)` asserting a 
thread to document this assumption?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -177,17 +178,18 @@ void sendTaskEvent(TaskEvent event) {
 
        @Override
        boolean isReleased() {
-               return isReleased;
+               return isReleased.get();
        }
 
        void releaseAllResources() throws IOException {
-               ArrayDeque<Buffer> releasedBuffers = new ArrayDeque<>();
-               synchronized (receivedBuffers) {
-                       releasedBuffers.addAll(receivedBuffers);
-                       receivedBuffers.clear();
-                       isReleased = true;
+               if (isReleased.compareAndSet(false, true)) {

Review comment:
       Why atomic? It could easily be checked and set under `synchronized 
(receivedBuffers)` lock, simplifying threading model a bit and it would also 
avoid extra `AtomicBoolean` check on the hot path.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java
##########
@@ -46,15 +49,19 @@
                        int maxBackoff,
                        InputChannelMetrics metrics) {
                super(inputGate, channelIndex, partitionId, initialBackoff, 
maxBackoff, metrics.getNumBytesInRemoteCounter(), 
metrics.getNumBuffersInRemoteCounter());
+
+               bufferManager = new 
BufferManager(inputGate.getMemorySegmentProvider(), this, 0);
        }
 
+       public abstract InputChannel toInputChannel() throws IOException;
+
        protected void readRecoveredState(ChannelStateReader reader) throws 
IOException, InterruptedException {
                ReadResult result = ReadResult.HAS_MORE_DATA;
                while (result == ReadResult.HAS_MORE_DATA) {
-                       Buffer buffer = 
getBufferManager().requestBufferBlocking();
+                       Buffer buffer = bufferManager.requestBufferBlocking();
                        result = internalReaderRecoveredState(reader, buffer);
                }
-               getBufferManager().releaseFloatingBuffers();
+               bufferManager.releaseFloatingBuffers();

Review comment:
       Maybe add
   ```
   LOG.debug("{}/{} Finished recovering input.", inputGate.getOwningTaskName(), 
channelInfo);
   ```
   ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
##########
@@ -63,6 +63,8 @@ protected AbstractRecordReader(InputGate inputGate, String[] 
tmpDirectories) {
        }
 
        protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
+               inputGate.requestPartitions();

Review comment:
       Could you explain why do you think:
   > also not sensitive for batch code path.
   
   ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to