pnowojski commented on a change in pull request #13539: URL: https://github.com/apache/flink/pull/13539#discussion_r499379001
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -247,7 +248,18 @@ public void setup() throws IOException { } @Override - public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) { + public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException { + synchronized (requestLock) { + if (closeFuture.isDone()) { + return FutureUtils.completedVoidFuture(); + } + for (InputChannel inputChannel : inputChannels.values()) { + if (inputChannel instanceof RemoteRecoveredInputChannel) { + ((RemoteRecoveredInputChannel) inputChannel).assignExclusiveSegments(); + } + } + } + Review comment: Why is this change relevant to the fix? Could you add some explanation to the commit message? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ########## @@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose() throws Exception { } }; - submitTasksAndWaitForResults(executor, new Callable[] {closeTask, readRecoveredStateTask, processStateTask}); - } finally { - executor.shutdown(); + executor.invokeAll(Arrays.asList(closeTask, readRecoveredStateTask, processStateTask)); + // wait until the internal channel state recover task finishes - executor.awaitTermination(60, TimeUnit.SECONDS); assertEquals(totalBuffers, environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments()); assertTrue(inputGate.getCloseFuture().isDone()); - - environment.close(); Review comment: Did you remove `awaitTermination` and `close` calls? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ########## @@ -59,26 +59,27 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; Review comment: Change first commit message to: > [FLINK-19027][test][network] Ensure SingleInputGateTest does not swallow exceptions during cleanup. ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java ########## @@ -42,8 +42,17 @@ TaskEventPublisher taskEventPublisher, int initialBackOff, int maxBackoff, + int networkBuffersPerChannel, InputChannelMetrics metrics) { - super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter()); + super( Review comment: I'm not sure if I understand this bug and the fix. Why is allocating exclusive buffers for `LocalRecoveredInputChannel` fixing the problem? Isn't it just reducing the window for the live lock to happen? What if downstream tasks are scheduled with a significant delay (exclusive buffers assignment happens after upstream tasks already acquired lot's of buffers). In other words, Isn't this a semi fix for this bug https://issues.apache.org/jira/browse/FLINK-13203 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org