pnowojski commented on a change in pull request #13539:
URL: https://github.com/apache/flink/pull/13539#discussion_r499379001



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -247,7 +248,18 @@ public void setup() throws IOException {
        }
 
        @Override
-       public CompletableFuture<?> readRecoveredState(ExecutorService 
executor, ChannelStateReader reader) {
+       public CompletableFuture<?> readRecoveredState(ExecutorService 
executor, ChannelStateReader reader) throws IOException {
+               synchronized (requestLock) {
+                       if (closeFuture.isDone()) {
+                               return FutureUtils.completedVoidFuture();
+                       }
+                       for (InputChannel inputChannel : 
inputChannels.values()) {
+                               if (inputChannel instanceof 
RemoteRecoveredInputChannel) {
+                                       ((RemoteRecoveredInputChannel) 
inputChannel).assignExclusiveSegments();
+                               }
+                       }
+               }
+

Review comment:
       Why is this change relevant to the fix? Could you add some explanation 
to the commit message?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose() 
throws Exception {
                                }
                        };
 
-                       submitTasksAndWaitForResults(executor, new Callable[] 
{closeTask, readRecoveredStateTask, processStateTask});
-               } finally {
-                       executor.shutdown();
+                       executor.invokeAll(Arrays.asList(closeTask, 
readRecoveredStateTask, processStateTask));
+
                        // wait until the internal channel state recover task 
finishes
-                       executor.awaitTermination(60, TimeUnit.SECONDS);
                        assertEquals(totalBuffers, 
environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments());
                        assertTrue(inputGate.getCloseFuture().isDone());
-
-                       environment.close();

Review comment:
       Did you remove `awaitTermination` and `close` calls?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
##########
@@ -59,26 +59,27 @@
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;

Review comment:
       Change first commit message to:
   > [FLINK-19027][test][network] Ensure SingleInputGateTest does not swallow 
exceptions during cleanup.
   
   ?
   

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java
##########
@@ -42,8 +42,17 @@
                        TaskEventPublisher taskEventPublisher,
                        int initialBackOff,
                        int maxBackoff,
+                       int networkBuffersPerChannel,
                        InputChannelMetrics metrics) {
-               super(inputGate, channelIndex, partitionId, initialBackOff, 
maxBackoff, metrics.getNumBytesInLocalCounter(), 
metrics.getNumBuffersInLocalCounter());
+               super(

Review comment:
       I'm not sure if I understand this bug and the fix. Why is allocating 
exclusive buffers for `LocalRecoveredInputChannel` fixing the problem? Isn't it 
just reducing the window for the live lock to happen? What if downstream tasks 
are scheduled with a significant delay (exclusive buffers assignment happens 
after upstream tasks already acquired lot's of buffers).
   
   In other words, Isn't this a semi fix for this bug 
https://issues.apache.org/jira/browse/FLINK-13203




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to