AHeise commented on a change in pull request #13539: URL: https://github.com/apache/flink/pull/13539#discussion_r499424128
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ########## @@ -247,7 +248,18 @@ public void setup() throws IOException { } @Override - public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) { + public CompletableFuture<?> readRecoveredState(ExecutorService executor, ChannelStateReader reader) throws IOException { + synchronized (requestLock) { + if (closeFuture.isDone()) { + return FutureUtils.completedVoidFuture(); + } + for (InputChannel inputChannel : inputChannels.values()) { + if (inputChannel instanceof RemoteRecoveredInputChannel) { + ((RemoteRecoveredInputChannel) inputChannel).assignExclusiveSegments(); + } + } + } + Review comment: I'll add. In short, the #number of required buffers is now higher than a few tests (and possibly production setups) assume. Without the lazy initialization, you cannot simulate backpressure in a few scenarios as easily. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ########## @@ -217,15 +221,11 @@ public void testConcurrentReadStateAndProcessAndClose() throws Exception { } }; - submitTasksAndWaitForResults(executor, new Callable[] {closeTask, readRecoveredStateTask, processStateTask}); - } finally { - executor.shutdown(); + executor.invokeAll(Arrays.asList(closeTask, readRecoveredStateTask, processStateTask)); + // wait until the internal channel state recover task finishes - executor.awaitTermination(60, TimeUnit.SECONDS); assertEquals(totalBuffers, environment.getNetworkBufferPool().getNumberOfAvailableMemorySegments()); assertTrue(inputGate.getCloseFuture().isDone()); - - environment.close(); Review comment: `close` is called by the `Closer`. `shutdown` + `awaitTermination` is simply the wrong method. `invokeAll` is doing what was intended. Could be an extra commit. However, it should then probably be done on all 10 places that use `submitTasksAndWaitForResults`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalRecoveredInputChannel.java ########## @@ -42,8 +42,17 @@ TaskEventPublisher taskEventPublisher, int initialBackOff, int maxBackoff, + int networkBuffersPerChannel, InputChannelMetrics metrics) { - super(inputGate, channelIndex, partitionId, initialBackOff, maxBackoff, metrics.getNumBytesInLocalCounter(), metrics.getNumBuffersInLocalCounter()); + super( Review comment: Hm you are right, it doesn't solve it completely after having read the ticket. However, without a solution for FLINK-13203, there will also not be a real solution here. On the other hand, it's inherently wrong to treat local and remote channels differently during recovery (they even share the same implementation). So this commit is still fixing the issue in a best effort manner and certainly helps to improve build stability, which is an improvement of its own. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ########## @@ -59,26 +59,27 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; Review comment: I didn't even know that double-tags are a thing. :p ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org